This is an automated email from the ASF dual-hosted git repository.

leonbao pushed a commit to branch json_split_two
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git


The following commit(s) were added to refs/heads/json_split_two by this push:
     new ee4e64a  [Feature][JsonSplit-api] refactor method of task save (#6067)
ee4e64a is described below

commit ee4e64a9a05751a5bccd7ef729c97a75d08033c1
Author: JinyLeeChina <[email protected]>
AuthorDate: Tue Aug 31 15:17:06 2021 +0800

    [Feature][JsonSplit-api] refactor method of task save (#6067)
    
    * refactor method of task save
    
    * fix ut
    
    * fix ut
    
    Co-authored-by: JinyLeeChina <[email protected]>
---
 .../api/controller/ProcessInstanceController.java  |   5 +-
 .../apache/dolphinscheduler/api/enums/Status.java  |   1 +
 .../api/service/ProcessInstanceService.java        |   2 +
 .../service/impl/ProcessDefinitionServiceImpl.java | 112 +++++++++++++--------
 .../service/impl/ProcessInstanceServiceImpl.java   |  46 ++++++---
 .../service/impl/TaskDefinitionServiceImpl.java    |  83 +++------------
 .../api/service/ProcessInstanceServiceTest.java    |  20 ++--
 .../api/service/TaskDefinitionServiceImplTest.java |  17 +++-
 .../dolphinscheduler/common/utils/JSONUtils.java   |   1 -
 .../dao/entity/ProcessTaskRelation.java            |  33 +++---
 .../dao/entity/TaskDefinition.java                 |  10 +-
 .../service/process/ProcessService.java            |  88 ++++++++++++++--
 12 files changed, 255 insertions(+), 163 deletions(-)

diff --git 
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ProcessInstanceController.java
 
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ProcessInstanceController.java
index f46e7d8..bd1fa85 100644
--- 
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ProcessInstanceController.java
+++ 
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ProcessInstanceController.java
@@ -162,6 +162,7 @@ public class ProcessInstanceController extends 
BaseController {
      * @param loginUser login user
      * @param projectCode project code
      * @param taskRelationJson process task relation json
+     * @param taskDefinitionJson taskDefinitionJson
      * @param processInstanceId process instance id
      * @param scheduleTime schedule time
      * @param syncDefine sync define
@@ -172,6 +173,7 @@ public class ProcessInstanceController extends 
BaseController {
     @ApiOperation(value = "updateProcessInstance", notes = 
"UPDATE_PROCESS_INSTANCE_NOTES")
     @ApiImplicitParams({
         @ApiImplicitParam(name = "taskRelationJson", value = 
"TASK_RELATION_JSON", type = "String"),
+        @ApiImplicitParam(name = "taskDefinitionJson", value = 
"TASK_DEFINITION_JSON", type = "String"),
         @ApiImplicitParam(name = "processInstanceId", value = 
"PROCESS_INSTANCE_ID", required = true, dataType = "Int", example = "100"),
         @ApiImplicitParam(name = "scheduleTime", value = "SCHEDULE_TIME", type 
= "String"),
         @ApiImplicitParam(name = "syncDefine", value = "SYNC_DEFINE", required 
= true, type = "Boolean"),
@@ -187,6 +189,7 @@ public class ProcessInstanceController extends 
BaseController {
     public Result updateProcessInstance(@ApiIgnore @RequestAttribute(value = 
Constants.SESSION_USER) User loginUser,
                                         @ApiParam(name = "projectCode", value 
= "PROJECT_CODE", required = true) @PathVariable long projectCode,
                                         @RequestParam(value = 
"taskRelationJson", required = true) String taskRelationJson,
+                                        @RequestParam(value = 
"taskDefinitionJson", required = true) String taskDefinitionJson,
                                         @RequestParam(value = 
"processInstanceId") Integer processInstanceId,
                                         @RequestParam(value = "scheduleTime", 
required = false) String scheduleTime,
                                         @RequestParam(value = "syncDefine", 
required = true) Boolean syncDefine,
@@ -195,7 +198,7 @@ public class ProcessInstanceController extends 
BaseController {
                                         @RequestParam(value = "timeout", 
required = false, defaultValue = "0") int timeout,
                                         @RequestParam(value = "tenantCode", 
required = true) String tenantCode) {
         Map<String, Object> result = 
processInstanceService.updateProcessInstance(loginUser, projectCode, 
processInstanceId,
-            taskRelationJson, scheduleTime, syncDefine, globalParams, 
locations, timeout, tenantCode);
+            taskRelationJson, taskDefinitionJson, scheduleTime, syncDefine, 
globalParams, locations, timeout, tenantCode);
         return returnDataList(result);
     }
 
diff --git 
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/enums/Status.java
 
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/enums/Status.java
index ac3293c..ca60fd8 100644
--- 
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/enums/Status.java
+++ 
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/enums/Status.java
@@ -276,6 +276,7 @@ public enum Status {
     DELETE_TASK_DEFINE_BY_CODE_ERROR(50042, "delete task definition by code 
error", "删除任务定义错误"),
     QUERY_DETAIL_OF_TASK_DEFINITION_ERROR(50043, "query detail of task 
definition error", "查询任务详细信息错误"),
     QUERY_TASK_DEFINITION_LIST_PAGING_ERROR(50044, "query task definition list 
paging error", "分页查询任务定义列表错误"),
+    TASK_DEFINITION_NAME_EXISTED(50045, "task definition name [{0}] already 
exists", "任务定义名称[{0}]已经存在"),
     HDFS_NOT_STARTUP(60001, "hdfs not startup", "hdfs未启用"),
 
     /**
diff --git 
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProcessInstanceService.java
 
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProcessInstanceService.java
index 0190d17..3dbf46d 100644
--- 
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProcessInstanceService.java
+++ 
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProcessInstanceService.java
@@ -116,6 +116,7 @@ public interface ProcessInstanceService {
      * @param loginUser login user
      * @param projectCode project code
      * @param taskRelationJson process task relation json
+     * @param taskDefinitionJson taskDefinitionJson
      * @param processInstanceId process instance id
      * @param scheduleTime schedule time
      * @param syncDefine sync define
@@ -129,6 +130,7 @@ public interface ProcessInstanceService {
                                               long projectCode,
                                               Integer processInstanceId,
                                               String taskRelationJson,
+                                              String taskDefinitionJson,
                                               String scheduleTime,
                                               Boolean syncDefine,
                                               String globalParams,
diff --git 
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java
 
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java
index c4f9114..1750098 100644
--- 
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java
+++ 
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java
@@ -27,7 +27,6 @@ import 
org.apache.dolphinscheduler.api.service.ProcessDefinitionService;
 import org.apache.dolphinscheduler.api.service.ProcessInstanceService;
 import org.apache.dolphinscheduler.api.service.ProjectService;
 import org.apache.dolphinscheduler.api.service.SchedulerService;
-import org.apache.dolphinscheduler.api.service.TaskDefinitionService;
 import org.apache.dolphinscheduler.api.utils.CheckUtils;
 import org.apache.dolphinscheduler.api.utils.FileUtils;
 import org.apache.dolphinscheduler.api.utils.PageInfo;
@@ -61,7 +60,6 @@ import org.apache.dolphinscheduler.dao.entity.Tenant;
 import org.apache.dolphinscheduler.dao.entity.User;
 import org.apache.dolphinscheduler.dao.mapper.ProcessDefinitionLogMapper;
 import org.apache.dolphinscheduler.dao.mapper.ProcessDefinitionMapper;
-import org.apache.dolphinscheduler.dao.mapper.ProcessTaskRelationLogMapper;
 import org.apache.dolphinscheduler.dao.mapper.ProcessTaskRelationMapper;
 import org.apache.dolphinscheduler.dao.mapper.ProjectMapper;
 import org.apache.dolphinscheduler.dao.mapper.ScheduleMapper;
@@ -121,9 +119,6 @@ public class ProcessDefinitionServiceImpl extends 
BaseServiceImpl implements Pro
     private ProjectService projectService;
 
     @Autowired
-    private TaskDefinitionService taskDefinitionService;
-
-    @Autowired
     private UserMapper userMapper;
 
     @Autowired
@@ -147,21 +142,6 @@ public class ProcessDefinitionServiceImpl extends 
BaseServiceImpl implements Pro
     @Autowired
     private ProcessTaskRelationMapper processTaskRelationMapper;
 
-    @Autowired
-    private ProcessTaskRelationLogMapper processTaskRelationLogMapper;
-
-    @Autowired
-    TaskDefinitionLogMapper taskDefinitionLogMapper;
-
-    @Autowired
-    private TaskDefinitionMapper taskDefinitionMapper;
-
-    @Autowired
-    private SchedulerService schedulerService;
-
-    @Autowired
-    private TenantMapper tenantMapper;
-
     /**
      * create process definition
      *
@@ -178,7 +158,7 @@ public class ProcessDefinitionServiceImpl extends 
BaseServiceImpl implements Pro
      * @return create result code
      */
     @Override
-    @Transactional(rollbackFor = Exception.class)
+    @Transactional(rollbackFor = RuntimeException.class)
     public Map<String, Object> createProcessDefinition(User loginUser,
                                                        long projectCode,
                                                        String name,
@@ -202,9 +182,13 @@ public class ProcessDefinitionServiceImpl extends 
BaseServiceImpl implements Pro
             putMsg(result, Status.PROCESS_DEFINITION_NAME_EXIST, name);
             return result;
         }
-
+        List<TaskDefinitionLog> taskDefinitionLogs = 
JSONUtils.toList(taskDefinitionJson, TaskDefinitionLog.class);
+        createTaskDefinition(result, loginUser, projectCode, 
taskDefinitionLogs, taskDefinitionJson);
+        if (result.get(Constants.STATUS) != Status.SUCCESS) {
+            return result;
+        }
         List<ProcessTaskRelationLog> taskRelationList = 
JSONUtils.toList(taskRelationJson, ProcessTaskRelationLog.class);
-        Map<String, Object> checkRelationJson = 
checkTaskRelationList(taskRelationList, taskRelationJson);
+        Map<String, Object> checkRelationJson = 
checkTaskRelationList(taskRelationList, taskRelationJson, taskDefinitionLogs);
         if (checkRelationJson.get(Constants.STATUS) != Status.SUCCESS) {
             return checkRelationJson;
         }
@@ -215,8 +199,6 @@ public class ProcessDefinitionServiceImpl extends 
BaseServiceImpl implements Pro
             return result;
         }
 
-        taskDefinitionService.createTaskDefinition(loginUser, projectCode, 
taskDefinitionJson);
-
         long processDefinitionCode;
         try {
             processDefinitionCode = SnowFlakeUtils.getInstance().nextId();
@@ -227,16 +209,59 @@ public class ProcessDefinitionServiceImpl extends 
BaseServiceImpl implements Pro
         ProcessDefinition processDefinition = new 
ProcessDefinition(projectCode, name, processDefinitionCode, description,
             globalParams, locations, timeout, loginUser.getId(), 
tenant.getId());
 
-        return createProcessDefine(loginUser, result, taskRelationList, 
processDefinition);
+        return createProcessDefine(loginUser, result, taskRelationList, 
processDefinition, taskDefinitionLogs);
+    }
+
+    @Autowired
+    TaskDefinitionLogMapper taskDefinitionLogMapper;
+
+    @Autowired
+    private TaskDefinitionMapper taskDefinitionMapper;
+
+    @Autowired
+    private SchedulerService schedulerService;
+
+    @Autowired
+    private TenantMapper tenantMapper;
+
+    private void createTaskDefinition(Map<String, Object> result,
+                                      User loginUser,
+                                      long projectCode,
+                                      List<TaskDefinitionLog> 
taskDefinitionLogs,
+                                      String taskDefinitionJson) {
+        if (taskDefinitionLogs.isEmpty()) {
+            logger.error("taskDefinitionJson invalid: {}", taskDefinitionJson);
+            putMsg(result, Status.DATA_IS_NOT_VALID, taskDefinitionJson);
+            return;
+        }
+        for (TaskDefinitionLog taskDefinitionLog : taskDefinitionLogs) {
+            if (!CheckUtils.checkTaskDefinitionParameters(taskDefinitionLog)) {
+                logger.error("task definition {} parameter invalid", 
taskDefinitionLog.getName());
+                putMsg(result, Status.PROCESS_NODE_S_PARAMETER_INVALID, 
taskDefinitionLog.getName());
+                return;
+            }
+            TaskDefinition taskDefinition = 
taskDefinitionMapper.queryByName(projectCode, taskDefinitionLog.getName());
+            if (taskDefinition != null) {
+                logger.error("task definition name {} already exists", 
taskDefinitionLog.getName());
+                putMsg(result, Status.TASK_DEFINITION_NAME_EXISTED, 
taskDefinitionLog.getName());
+                return;
+            }
+        }
+        if (processService.saveTaskDefine(loginUser, projectCode, 
taskDefinitionLogs)) {
+            putMsg(result, Status.SUCCESS);
+        } else {
+            putMsg(result, Status.CREATE_TASK_DEFINITION_ERROR);
+        }
     }
 
     private Map<String, Object> createProcessDefine(User loginUser,
                                                     Map<String, Object> result,
                                                     
List<ProcessTaskRelationLog> taskRelationList,
-                                                    ProcessDefinition 
processDefinition) {
+                                                    ProcessDefinition 
processDefinition,
+                                                    List<TaskDefinitionLog> 
taskDefinitionLogs) {
         int insertVersion = processService.saveProcessDefine(loginUser, 
processDefinition, true);
         if (insertVersion > 0) {
-            int insertResult = processService.saveTaskRelation(loginUser, 
processDefinition.getProjectCode(), processDefinition.getCode(), insertVersion, 
taskRelationList);
+            int insertResult = processService.saveTaskRelation(loginUser, 
processDefinition.getProjectCode(), processDefinition.getCode(), insertVersion, 
taskRelationList, taskDefinitionLogs);
             if (insertResult > 0) {
                 putMsg(result, Status.SUCCESS);
                 // return processDefinitionCode
@@ -250,7 +275,7 @@ public class ProcessDefinitionServiceImpl extends 
BaseServiceImpl implements Pro
         return result;
     }
 
-    private Map<String, Object> 
checkTaskRelationList(List<ProcessTaskRelationLog> taskRelationList, String 
taskRelationJson) {
+    private Map<String, Object> 
checkTaskRelationList(List<ProcessTaskRelationLog> taskRelationList, String 
taskRelationJson, List<TaskDefinitionLog> taskDefinitionLogs) {
         Map<String, Object> result = new HashMap<>();
         try {
             if (taskRelationList == null || taskRelationList.isEmpty()) {
@@ -259,7 +284,7 @@ public class ProcessDefinitionServiceImpl extends 
BaseServiceImpl implements Pro
                 return result;
             }
 
-            List<TaskNode> taskNodeList = 
processService.transformTask(taskRelationList);
+            List<TaskNode> taskNodeList = 
processService.transformTask(taskRelationList, taskDefinitionLogs);
             if (taskNodeList.size() != taskRelationList.size()) {
                 Set<Long> postTaskCodes = 
taskRelationList.stream().map(ProcessTaskRelationLog::getPostTaskCode).collect(Collectors.toSet());
                 Set<Long> taskNodeCodes = 
taskNodeList.stream().map(TaskNode::getCode).collect(Collectors.toSet());
@@ -276,7 +301,7 @@ public class ProcessDefinitionServiceImpl extends 
BaseServiceImpl implements Pro
 
             // check whether the task relation json is normal
             for (ProcessTaskRelationLog processTaskRelationLog : 
taskRelationList) {
-                if (processTaskRelationLog.getPostTaskCode() == 0 || 
processTaskRelationLog.getPostTaskVersion() == 0) {
+                if (processTaskRelationLog.getPostTaskCode() == 0) {
                     logger.error("the post_task_code or post_task_version 
can't be zero");
                     putMsg(result, Status.CHECK_PROCESS_TASK_RELATION_ERROR);
                     return result;
@@ -419,7 +444,7 @@ public class ProcessDefinitionServiceImpl extends 
BaseServiceImpl implements Pro
      * @param taskDefinitionJson taskDefinitionJson
      * @return update result code
      */
-    @Transactional(rollbackFor = Exception.class)
+    @Transactional(rollbackFor = RuntimeException.class)
     @Override
     public Map<String, Object> updateProcessDefinition(User loginUser,
                                                        long projectCode,
@@ -439,8 +464,13 @@ public class ProcessDefinitionServiceImpl extends 
BaseServiceImpl implements Pro
             return result;
         }
 
+        List<TaskDefinitionLog> taskDefinitionLogs = 
JSONUtils.toList(taskDefinitionJson, TaskDefinitionLog.class);
+        createTaskDefinition(result, loginUser, projectCode, 
taskDefinitionLogs, taskDefinitionJson);
+        if (result.get(Constants.STATUS) != Status.SUCCESS) {
+            return result;
+        }
         List<ProcessTaskRelationLog> taskRelationList = 
JSONUtils.toList(taskRelationJson, ProcessTaskRelationLog.class);
-        Map<String, Object> checkRelationJson = 
checkTaskRelationList(taskRelationList, taskRelationJson);
+        Map<String, Object> checkRelationJson = 
checkTaskRelationList(taskRelationList, taskRelationJson, taskDefinitionLogs);
         if (checkRelationJson.get(Constants.STATUS) != Status.SUCCESS) {
             return checkRelationJson;
         }
@@ -470,20 +500,20 @@ public class ProcessDefinitionServiceImpl extends 
BaseServiceImpl implements Pro
                 return result;
             }
         }
-        taskDefinitionService.createTaskDefinition(loginUser, projectCode, 
taskDefinitionJson);
         processDefinition.set(projectCode, name, description, globalParams, 
locations, timeout, tenant.getId());
-        return updateProcessDefine(loginUser, result, taskRelationList, 
processDefinition);
+        return updateProcessDefine(loginUser, result, taskRelationList, 
processDefinition, taskDefinitionLogs);
     }
 
     private Map<String, Object> updateProcessDefine(User loginUser,
                                                     Map<String, Object> result,
                                                     
List<ProcessTaskRelationLog> taskRelationList,
-                                                    ProcessDefinition 
processDefinition) {
+                                                    ProcessDefinition 
processDefinition,
+                                                    List<TaskDefinitionLog> 
taskDefinitionLogs) {
         processDefinition.setUpdateTime(new Date());
         int insertVersion = processService.saveProcessDefine(loginUser, 
processDefinition, true);
         if (insertVersion > 0) {
             int insertResult = processService.saveTaskRelation(loginUser, 
processDefinition.getProjectCode(),
-                processDefinition.getCode(), insertVersion, taskRelationList);
+                processDefinition.getCode(), insertVersion, taskRelationList, 
taskDefinitionLogs);
             if (insertResult > 0) {
                 putMsg(result, Status.SUCCESS);
                 result.put(Constants.DATA_LIST, processDefinition);
@@ -818,7 +848,7 @@ public class ProcessDefinitionServiceImpl extends 
BaseServiceImpl implements Pro
             processTaskRelationLog.setPreTaskVersion(Constants.VERSION_FIRST);
             processTaskRelationLog.setPostTaskVersion(Constants.VERSION_FIRST);
         });
-        Map<String, Object> createProcessResult = 
createProcessDefine(loginUser, result, taskRelationList, processDefinition);
+        Map<String, Object> createProcessResult = 
createProcessDefine(loginUser, result, taskRelationList, processDefinition, 
null);
         if (Status.SUCCESS.equals(createProcessResult.get(Constants.STATUS))) {
             putMsg(createProcessResult, Status.SUCCESS);
         } else {
@@ -894,7 +924,7 @@ public class ProcessDefinitionServiceImpl extends 
BaseServiceImpl implements Pro
 
             List<ProcessTaskRelationLog> taskRelationList = 
JSONUtils.toList(processTaskRelationJson, ProcessTaskRelationLog.class);
             // Check whether the task node is normal
-            List<TaskNode> taskNodes = 
processService.transformTask(taskRelationList);
+            List<TaskNode> taskNodes = 
processService.transformTask(taskRelationList, Lists.newArrayList());
 
             if (CollectionUtils.isEmpty(taskNodes)) {
                 logger.error("process node info is empty");
@@ -1254,9 +1284,9 @@ public class ProcessDefinitionServiceImpl extends 
BaseServiceImpl implements Pro
             processDefinition.setProjectCode(targetProjectCode);
             if (isCopy) {
                 processDefinition.setName(processDefinition.getName() + 
"_copy_" + DateUtils.getCurrentTimeStamp());
-                createProcessDefine(loginUser, result, taskRelationList, 
processDefinition);
+                createProcessDefine(loginUser, result, taskRelationList, 
processDefinition, Lists.newArrayList());
             } else {
-                updateProcessDefine(loginUser, result, taskRelationList, 
processDefinition);
+                updateProcessDefine(loginUser, result, taskRelationList, 
processDefinition, Lists.newArrayList());
             }
             if (result.get(Constants.STATUS) != Status.SUCCESS) {
                 failedProcessList.add(processDefinition.getCode() + "[" + 
processDefinition.getName() + "]");
diff --git 
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessInstanceServiceImpl.java
 
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessInstanceServiceImpl.java
index 9a6fa3e..9007e00 100644
--- 
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessInstanceServiceImpl.java
+++ 
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessInstanceServiceImpl.java
@@ -33,6 +33,7 @@ import 
org.apache.dolphinscheduler.api.service.ProcessDefinitionService;
 import org.apache.dolphinscheduler.api.service.ProcessInstanceService;
 import org.apache.dolphinscheduler.api.service.ProjectService;
 import org.apache.dolphinscheduler.api.service.UsersService;
+import org.apache.dolphinscheduler.api.utils.CheckUtils;
 import org.apache.dolphinscheduler.api.utils.PageInfo;
 import org.apache.dolphinscheduler.api.utils.Result;
 import org.apache.dolphinscheduler.common.Constants;
@@ -401,6 +402,7 @@ public class ProcessInstanceServiceImpl extends 
BaseServiceImpl implements Proce
      * @param loginUser login user
      * @param projectCode project code
      * @param taskRelationJson process task relation json
+     * @param taskDefinitionJson taskDefinitionJson
      * @param processInstanceId process instance id
      * @param scheduleTime schedule time
      * @param syncDefine sync define
@@ -413,7 +415,7 @@ public class ProcessInstanceServiceImpl extends 
BaseServiceImpl implements Proce
     @Transactional
     @Override
     public Map<String, Object> updateProcessInstance(User loginUser, long 
projectCode, Integer processInstanceId, String taskRelationJson,
-                                                     String scheduleTime, 
Boolean syncDefine, String globalParams,
+                                                     String 
taskDefinitionJson, String scheduleTime, Boolean syncDefine, String 
globalParams,
                                                      String locations, int 
timeout, String tenantCode) {
         Project project = projectMapper.queryByCode(projectCode);
         //check user access for project
@@ -433,26 +435,42 @@ public class ProcessInstanceServiceImpl extends 
BaseServiceImpl implements Proce
                 processInstance.getName(), 
processInstance.getState().toString(), "update");
             return result;
         }
-        ProcessDefinition processDefinition = 
processDefineMapper.queryByCode(processInstance.getProcessDefinitionCode());
-        List<ProcessTaskRelationLog> taskRelationList = 
JSONUtils.toList(taskRelationJson, ProcessTaskRelationLog.class);
-        //check workflow json is valid
-        result = 
processDefinitionService.checkProcessNodeList(taskRelationJson);
-        if (result.get(Constants.STATUS) != Status.SUCCESS) {
-            return result;
-        }
-        Tenant tenant = tenantMapper.queryByTenantCode(tenantCode);
-        if (tenant == null) {
-            putMsg(result, Status.TENANT_NOT_EXIST);
-            return result;
-        }
         setProcessInstance(processInstance, tenantCode, scheduleTime, 
globalParams, timeout);
         if (Boolean.TRUE.equals(syncDefine)) {
+            List<TaskDefinitionLog> taskDefinitionLogs = 
JSONUtils.toList(taskDefinitionJson, TaskDefinitionLog.class);
+            if (taskDefinitionLogs.isEmpty()) {
+                putMsg(result, Status.DATA_IS_NOT_VALID, taskDefinitionJson);
+                return result;
+            }
+            for (TaskDefinitionLog taskDefinitionLog : taskDefinitionLogs) {
+                if 
(!CheckUtils.checkTaskDefinitionParameters(taskDefinitionLog)) {
+                    putMsg(result, Status.PROCESS_NODE_S_PARAMETER_INVALID, 
taskDefinitionLog.getName());
+                    return result;
+                }
+            }
+            if (!processService.saveTaskDefine(loginUser, projectCode, 
taskDefinitionLogs)) {
+                putMsg(result, Status.CREATE_TASK_DEFINITION_ERROR);
+                return result;
+            }
+            ProcessDefinition processDefinition = 
processDefineMapper.queryByCode(processInstance.getProcessDefinitionCode());
+            List<ProcessTaskRelationLog> taskRelationList = 
JSONUtils.toList(taskRelationJson, ProcessTaskRelationLog.class);
+            //check workflow json is valid
+            result = 
processDefinitionService.checkProcessNodeList(taskRelationJson);
+            if (result.get(Constants.STATUS) != Status.SUCCESS) {
+                return result;
+            }
+            Tenant tenant = tenantMapper.queryByTenantCode(tenantCode);
+            if (tenant == null) {
+                putMsg(result, Status.TENANT_NOT_EXIST);
+                return result;
+            }
+
             processDefinition.set(projectCode, processDefinition.getName(), 
processDefinition.getDescription(), globalParams, locations, timeout, 
tenant.getId());
             processDefinition.setUpdateTime(new Date());
             int insertVersion = processService.saveProcessDefine(loginUser, 
processDefinition, false);
             if (insertVersion > 0) {
                 int insertResult = processService.saveTaskRelation(loginUser, 
processDefinition.getProjectCode(),
-                    processDefinition.getCode(), insertVersion, 
taskRelationList);
+                    processDefinition.getCode(), insertVersion, 
taskRelationList, taskDefinitionLogs);
                 if (insertResult > 0) {
                     putMsg(result, Status.SUCCESS);
                     result.put(Constants.DATA_LIST, processDefinition);
diff --git 
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskDefinitionServiceImpl.java
 
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskDefinitionServiceImpl.java
index 1a078dc..35f3f1e 100644
--- 
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskDefinitionServiceImpl.java
+++ 
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskDefinitionServiceImpl.java
@@ -111,85 +111,28 @@ public class TaskDefinitionServiceImpl extends 
BaseServiceImpl implements TaskDe
             putMsg(result, Status.DATA_IS_NOT_VALID, taskDefinitionJson);
             return result;
         }
-        int totalSuccessNumber = 0;
-        List<Long> totalSuccessCode = new ArrayList<>();
-        Date now = new Date();
-        List<TaskDefinitionLog> newTaskDefinitionLogs = new ArrayList<>();
-        List<TaskDefinitionLog> updateTaskDefinitionLogs = new ArrayList<>();
         for (TaskDefinitionLog taskDefinitionLog : taskDefinitionLogs) {
             if (!CheckUtils.checkTaskDefinitionParameters(taskDefinitionLog)) {
                 logger.error("task definition {} parameter invalid", 
taskDefinitionLog.getName());
                 putMsg(result, Status.PROCESS_NODE_S_PARAMETER_INVALID, 
taskDefinitionLog.getName());
                 return result;
             }
-            taskDefinitionLog.setProjectCode(projectCode);
-            taskDefinitionLog.setUpdateTime(now);
-            taskDefinitionLog.setOperateTime(now);
-            taskDefinitionLog.setOperator(loginUser.getId());
-            if (taskDefinitionLog.getCode() > 0 && 
taskDefinitionLog.getVersion() > 0) {
-                TaskDefinitionLog definitionCodeAndVersion = 
taskDefinitionLogMapper
-                    
.queryByDefinitionCodeAndVersion(taskDefinitionLog.getCode(), 
taskDefinitionLog.getVersion());
-                if (definitionCodeAndVersion != null) {
-                    if (!taskDefinitionLog.equals(definitionCodeAndVersion)) {
-                        
taskDefinitionLog.setUserId(definitionCodeAndVersion.getUserId());
-                        Integer version = 
taskDefinitionLogMapper.queryMaxVersionForDefinition(taskDefinitionLog.getCode());
-                        if (version == null || version == 0) {
-                            putMsg(result, Status.DATA_IS_NOT_VALID, 
taskDefinitionLog.getCode());
-                            return result;
-                        }
-                        taskDefinitionLog.setVersion(version + 1);
-                        
taskDefinitionLog.setCreateTime(definitionCodeAndVersion.getCreateTime());
-                        updateTaskDefinitionLogs.add(taskDefinitionLog);
-                        totalSuccessCode.add(taskDefinitionLog.getCode());
-                    }
-                    continue;
-                }
-            }
-            taskDefinitionLog.setUserId(loginUser.getId());
-            taskDefinitionLog.setVersion(1);
-            taskDefinitionLog.setCreateTime(now);
-            if (taskDefinitionLog.getCode() == 0) {
-                long code;
-                try {
-                    code = SnowFlakeUtils.getInstance().nextId();
-                    taskDefinitionLog.setVersion(1);
-                    taskDefinitionLog.setCode(code);
-                } catch (SnowFlakeException e) {
-                    logger.error("Task code get error, ", e);
-                    putMsg(result, Status.INTERNAL_SERVER_ERROR_ARGS, "Error 
generating task definition code");
-                    return result;
-                }
-            }
-            totalSuccessCode.add(taskDefinitionLog.getCode());
-            newTaskDefinitionLogs.add(taskDefinitionLog);
-            totalSuccessNumber++;
-        }
-        for (TaskDefinitionLog taskDefinitionToUpdate : 
updateTaskDefinitionLogs) {
-            TaskDefinition task = 
taskDefinitionMapper.queryByCode(taskDefinitionToUpdate.getCode());
-            if (task == null) {
-                newTaskDefinitionLogs.add(taskDefinitionToUpdate);
-            } else {
-                int update = 
taskDefinitionMapper.updateById(taskDefinitionToUpdate);
-                int insert = 
taskDefinitionLogMapper.insert(taskDefinitionToUpdate);
-                if ((update & insert) != 1) {
-                    putMsg(result, Status.CREATE_TASK_DEFINITION_ERROR);
-                    return result;
-                }
-            }
-        }
-        if (!newTaskDefinitionLogs.isEmpty()) {
-            int insert = 
taskDefinitionMapper.batchInsert(newTaskDefinitionLogs);
-            int logInsert = 
taskDefinitionLogMapper.batchInsert(newTaskDefinitionLogs);
-            if ((logInsert & insert) == 0) {
-                putMsg(result, Status.CREATE_TASK_DEFINITION_ERROR);
+            TaskDefinition taskDefinition = 
taskDefinitionMapper.queryByName(projectCode, taskDefinitionLog.getName());
+            if (taskDefinition != null) {
+                logger.error("task definition name {} already exists", 
taskDefinitionLog.getName());
+                putMsg(result, Status.TASK_DEFINITION_NAME_EXISTED, 
taskDefinitionLog.getName());
                 return result;
             }
         }
-        Map<String, Object> resData = new HashMap<>();
-        resData.put("total", totalSuccessNumber);
-        resData.put("code", totalSuccessCode);
-        putMsg(result, Status.SUCCESS);
-        result.put(Constants.DATA_LIST, resData);
+        if (processService.saveTaskDefine(loginUser, projectCode, 
taskDefinitionLogs)) {
+            Map<String, Object> resData = new HashMap<>();
+            resData.put("total", taskDefinitionLogs.size());
+            resData.put("code", 
StringUtils.join(taskDefinitionLogs.stream().map(TaskDefinition::getCode).collect(Collectors.toList()),
 ","));
+            putMsg(result, Status.SUCCESS);
+            result.put(Constants.DATA_LIST, resData);
+        } else {
+            putMsg(result, Status.CREATE_TASK_DEFINITION_ERROR);
+        }
         return result;
     }
 
diff --git 
a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessInstanceServiceTest.java
 
b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessInstanceServiceTest.java
index 820d5fb..fa68525 100644
--- 
a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessInstanceServiceTest.java
+++ 
b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessInstanceServiceTest.java
@@ -116,6 +116,14 @@ public class ProcessInstanceServiceTest {
         + 
"\"postTaskVersion\":1,\"conditionType\":0,\"conditionParams\":\"{}\"},{\"name\":\"\",\"preTaskCode\":123456789,"
         + 
"\"preTaskVersion\":1,\"postTaskCode\":123451234,\"postTaskVersion\":1,\"conditionType\":0,\"conditionParams\":\"{}\"}]";
 
+    private String taskJson = 
"[{\"name\":\"shell1\",\"description\":\"\",\"taskType\":\"SHELL\",\"taskParams\":{\"resourceList\":[],"
+        + "\"localParams\":[],\"rawScript\":\"echo 
1\",\"conditionResult\":{\"successNode\":[\"\"],\"failedNode\":[\"\"]},\"dependence\":{}},"
+        + 
"\"flag\":\"NORMAL\",\"taskPriority\":\"MEDIUM\",\"workerGroup\":\"default\",\"failRetryTimes\":\"0\",\"failRetryInterval\":\"1\","
+        + 
"\"timeoutFlag\":\"CLOSE\",\"timeoutNotifyStrategy\":\"\",\"timeout\":null,\"delayTime\":\"0\"},{\"name\":\"shell2\",\"description\":\"\","
+        + 
"\"taskType\":\"SHELL\",\"taskParams\":{\"resourceList\":[],\"localParams\":[],\"rawScript\":\"echo
 2\",\"conditionResult\":{\"successNode\""
+        + 
":[\"\"],\"failedNode\":[\"\"]},\"dependence\":{}},\"flag\":\"NORMAL\",\"taskPriority\":\"MEDIUM\",\"workerGroup\":\"default\","
+        + 
"\"failRetryTimes\":\"0\",\"failRetryInterval\":\"1\",\"timeoutFlag\":\"CLOSE\",\"timeoutNotifyStrategy\":\"\",\"timeout\":null,\"delayTime\":\"0\"}]";
+
     @Test
     public void testQueryProcessInstanceList() {
         long projectCode = 1L;
@@ -372,7 +380,7 @@ public class ProcessInstanceServiceTest {
         when(projectMapper.queryByCode(projectCode)).thenReturn(project);
         when(projectService.checkProjectAndAuth(loginUser, project, 
projectCode)).thenReturn(result);
         Map<String, Object> proejctAuthFailRes = 
processInstanceService.updateProcessInstance(loginUser, projectCode, 1,
-            shellJson, "2020-02-21 00:00:00", true, "", "", 0, "");
+            shellJson, taskJson, "2020-02-21 00:00:00", true, "", "", 0, "");
         Assert.assertEquals(Status.PROJECT_NOT_FOUNT, 
proejctAuthFailRes.get(Constants.STATUS));
 
         //process instance null
@@ -382,7 +390,7 @@ public class ProcessInstanceServiceTest {
         when(projectService.checkProjectAndAuth(loginUser, project, 
projectCode)).thenReturn(result);
         when(processService.findProcessInstanceDetailById(1)).thenReturn(null);
         Map<String, Object> processInstanceNullRes = 
processInstanceService.updateProcessInstance(loginUser, projectCode, 1,
-            shellJson, "2020-02-21 00:00:00", true, "", "", 0, "");
+            shellJson, taskJson,"2020-02-21 00:00:00", true, "", "", 0, "");
         Assert.assertEquals(Status.PROCESS_INSTANCE_NOT_EXIST, 
processInstanceNullRes.get(Constants.STATUS));
 
         //process instance not finish
@@ -390,7 +398,7 @@ public class ProcessInstanceServiceTest {
         processInstance.setState(ExecutionStatus.RUNNING_EXECUTION);
         putMsg(result, Status.SUCCESS, projectCode);
         Map<String, Object> processInstanceNotFinishRes = 
processInstanceService.updateProcessInstance(loginUser, projectCode, 1,
-            shellJson, "2020-02-21 00:00:00", true, "", "", 0, "");
+            shellJson, taskJson,"2020-02-21 00:00:00", true, "", "", 0, "");
         Assert.assertEquals(Status.PROCESS_INSTANCE_STATE_OPERATION_ERROR, 
processInstanceNotFinishRes.get(Constants.STATUS));
 
         //process instance finish
@@ -410,15 +418,15 @@ public class ProcessInstanceServiceTest {
         
when(processDefinitionService.checkProcessNodeList(shellJson)).thenReturn(result);
         putMsg(result, Status.SUCCESS, projectCode);
         Map<String, Object> processInstanceFinishRes = 
processInstanceService.updateProcessInstance(loginUser, projectCode, 1,
-            shellJson, "2020-02-21 00:00:00", true, "", "", 0, "root");
-        Assert.assertEquals(Status.UPDATE_PROCESS_DEFINITION_ERROR, 
processInstanceFinishRes.get(Constants.STATUS));
+            shellJson, taskJson,"2020-02-21 00:00:00", true, "", "", 0, 
"root");
+        Assert.assertEquals(Status.CREATE_TASK_DEFINITION_ERROR, 
processInstanceFinishRes.get(Constants.STATUS));
 
         //success
         
when(processDefineMapper.queryByCode(46L)).thenReturn(processDefinition);
         putMsg(result, Status.SUCCESS, projectCode);
 
         Map<String, Object> successRes = 
processInstanceService.updateProcessInstance(loginUser, projectCode, 1,
-            shellJson, "2020-02-21 00:00:00", false, "", "", 0, "root");
+            shellJson, taskJson,"2020-02-21 00:00:00", false, "", "", 0, 
"root");
         Assert.assertEquals(Status.SUCCESS, successRes.get(Constants.STATUS));
     }
 
diff --git 
a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/TaskDefinitionServiceImplTest.java
 
b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/TaskDefinitionServiceImplTest.java
index 72dbe39..18d4214 100644
--- 
a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/TaskDefinitionServiceImplTest.java
+++ 
b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/TaskDefinitionServiceImplTest.java
@@ -95,9 +95,8 @@ public class TaskDefinitionServiceImplTest {
             + 
"\\\\\\\"failedNode\\\\\\\":[\\\\\\\"\\\\\\\"]}\\\",\\\"dependence\\\":{}}\",\"flag\":0,\"taskPriority\":0,"
             + 
"\"workerGroup\":\"default\",\"failRetryTimes\":0,\"failRetryInterval\":0,\"timeoutFlag\":0,"
             + 
"\"timeoutNotifyStrategy\":0,\"timeout\":0,\"delayTime\":0,\"resourceIds\":\"\"}]";
-        List<TaskDefinition> taskDefinitions = 
JSONUtils.toList(createTaskDefinitionJson, TaskDefinition.class);
-        
Mockito.when(taskDefinitionMapper.batchInsert(Mockito.anyList())).thenReturn(1);
-        
Mockito.when(taskDefinitionLogMapper.batchInsert(Mockito.anyList())).thenReturn(1);
+        List<TaskDefinitionLog> taskDefinitions = 
JSONUtils.toList(createTaskDefinitionJson, TaskDefinitionLog.class);
+        Mockito.when(processService.saveTaskDefine(loginUser, projectCode, 
taskDefinitions)).thenReturn(true);
         Map<String, Object> relation = taskDefinitionService
             .createTaskDefinition(loginUser, projectCode, 
createTaskDefinitionJson);
         Assert.assertEquals(Status.SUCCESS, relation.get(Constants.STATUS));
@@ -250,11 +249,23 @@ public class TaskDefinitionServiceImplTest {
             + 
"\"timeoutNotifyStrategy\":0,\"timeout\":0,\"delayTime\":0,\"resourceIds\":\"\"}]";
         List<TaskDefinitionLog> taskDefinitionLogs = 
JSONUtils.toList(taskDefinitionJson, TaskDefinitionLog.class);
         Assert.assertFalse(taskDefinitionLogs.isEmpty());
+        String taskJson = 
"[{\"name\":\"shell1\",\"description\":\"\",\"taskType\":\"SHELL\",\"taskParams\":{\"resourceList\":[],"
+            + "\"localParams\":[],\"rawScript\":\"echo 
1\",\"conditionResult\":{\"successNode\":[\"\"],\"failedNode\":[\"\"]},\"dependence\":{}},"
+            + 
"\"flag\":\"NORMAL\",\"taskPriority\":\"MEDIUM\",\"workerGroup\":\"default\",\"failRetryTimes\":\"0\",\"failRetryInterval\":\"1\","
+            + 
"\"timeoutFlag\":\"CLOSE\",\"timeoutNotifyStrategy\":\"\",\"timeout\":null,\"delayTime\":\"0\"},{\"name\":\"shell2\",\"description\":\"\","
+            + 
"\"taskType\":\"SHELL\",\"taskParams\":{\"resourceList\":[],\"localParams\":[],\"rawScript\":\"echo
 2\",\"conditionResult\":{\"successNode\""
+            + 
":[\"\"],\"failedNode\":[\"\"]},\"dependence\":{}},\"flag\":\"NORMAL\",\"taskPriority\":\"MEDIUM\",\"workerGroup\":\"default\","
+            + 
"\"failRetryTimes\":\"0\",\"failRetryInterval\":\"1\",\"timeoutFlag\":\"CLOSE\",\"timeoutNotifyStrategy\":\"\",\"timeout\":null,\"delayTime\":\"0\"}]";
+        taskDefinitionLogs = JSONUtils.toList(taskJson, 
TaskDefinitionLog.class);
+        Assert.assertFalse(taskDefinitionLogs.isEmpty());
         String taskParams = 
"{\"resourceList\":[],\"localParams\":[{\"prop\":\"datetime\",\"direct\":\"IN\",\"type\":\"VARCHAR\","
             + "\"value\":\"${system.datetime}\"}],\"rawScript\":\"echo 
${datetime}\",\"conditionResult\":\"{\\\"successNode\\\":[\\\"\\\"],"
             + "\\\"failedNode\\\":[\\\"\\\"]}\",\"dependence\":{}}";
         ShellParameters parameters = JSONUtils.parseObject(taskParams, 
ShellParameters.class);
         Assert.assertNotNull(parameters);
+        String params = 
"{\"resourceList\":[],\"localParams\":[],\"rawScript\":\"echo 
1\",\"conditionResult\":{\"successNode\":[\"\"],\"failedNode\":[\"\"]},\"dependence\":{}}";
+        ShellParameters parameters1 = JSONUtils.parseObject(params, 
ShellParameters.class);
+        Assert.assertNotNull(parameters1);
     }
 
     @Test
diff --git 
a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/JSONUtils.java
 
b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/JSONUtils.java
index 2a1e2ac..0b1edec 100644
--- 
a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/JSONUtils.java
+++ 
b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/JSONUtils.java
@@ -159,7 +159,6 @@ public class JSONUtils {
         }
 
         try {
-
             CollectionType listType = 
objectMapper.getTypeFactory().constructCollectionType(ArrayList.class, clazz);
             return objectMapper.readValue(json, listType);
         } catch (Exception e) {
diff --git 
a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/ProcessTaskRelation.java
 
b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/ProcessTaskRelation.java
index 1dae5d8..27162d0 100644
--- 
a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/ProcessTaskRelation.java
+++ 
b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/ProcessTaskRelation.java
@@ -18,6 +18,7 @@
 package org.apache.dolphinscheduler.dao.entity;
 
 import org.apache.dolphinscheduler.common.enums.ConditionType;
+import org.apache.dolphinscheduler.common.utils.JSONUtils;
 
 import java.util.Date;
 
@@ -25,6 +26,8 @@ import com.baomidou.mybatisplus.annotation.IdType;
 import com.baomidou.mybatisplus.annotation.TableId;
 import com.baomidou.mybatisplus.annotation.TableName;
 import com.fasterxml.jackson.annotation.JsonFormat;
+import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
+import com.fasterxml.jackson.databind.annotation.JsonSerialize;
 
 /**
  * process task relation
@@ -86,6 +89,8 @@ public class ProcessTaskRelation {
     /**
      * condition parameters
      */
+    @JsonDeserialize(using = JSONUtils.JsonDataDeserializer.class)
+    @JsonSerialize(using = JSONUtils.JsonDataSerializer.class)
     private String conditionParams;
 
     /**
@@ -236,19 +241,19 @@ public class ProcessTaskRelation {
     @Override
     public String toString() {
         return "ProcessTaskRelation{"
-                + "id=" + id
-                + ", name='" + name + '\''
-                + ", processDefinitionVersion=" + processDefinitionVersion
-                + ", projectCode=" + projectCode
-                + ", processDefinitionCode=" + processDefinitionCode
-                + ", preTaskCode=" + preTaskCode
-                + ", preTaskVersion=" + preTaskVersion
-                + ", postTaskCode=" + postTaskCode
-                + ", postTaskVersion=" + postTaskVersion
-                + ", conditionType=" + conditionType
-                + ", conditionParams='" + conditionParams + '\''
-                + ", createTime=" + createTime
-                + ", updateTime=" + updateTime
-                + '}';
+            + "id=" + id
+            + ", name='" + name + '\''
+            + ", processDefinitionVersion=" + processDefinitionVersion
+            + ", projectCode=" + projectCode
+            + ", processDefinitionCode=" + processDefinitionCode
+            + ", preTaskCode=" + preTaskCode
+            + ", preTaskVersion=" + preTaskVersion
+            + ", postTaskCode=" + postTaskCode
+            + ", postTaskVersion=" + postTaskVersion
+            + ", conditionType=" + conditionType
+            + ", conditionParams='" + conditionParams + '\''
+            + ", createTime=" + createTime
+            + ", updateTime=" + updateTime
+            + '}';
     }
 }
diff --git 
a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskDefinition.java
 
b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskDefinition.java
index a8a5ccd..1e479c8 100644
--- 
a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskDefinition.java
+++ 
b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskDefinition.java
@@ -38,6 +38,8 @@ import com.baomidou.mybatisplus.annotation.TableName;
 import com.baomidou.mybatisplus.core.toolkit.StringUtils;
 import com.fasterxml.jackson.annotation.JsonFormat;
 import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
+import com.fasterxml.jackson.databind.annotation.JsonSerialize;
 
 /**
  * task definition
@@ -89,6 +91,8 @@ public class TaskDefinition {
     /**
      * user defined parameters
      */
+    @JsonDeserialize(using = JSONUtils.JsonDataDeserializer.class)
+    @JsonSerialize(using = JSONUtils.JsonDataSerializer.class)
     private String taskParams;
 
     /**
@@ -438,12 +442,6 @@ public class TaskDefinition {
     }
 
     @Override
-    public int hashCode() {
-        return Objects.hash(name, description, taskType, taskParams, flag, 
taskPriority, workerGroup, failRetryTimes,
-            failRetryInterval, timeoutFlag, timeoutNotifyStrategy, timeout, 
delayTime, resourceIds);
-    }
-
-    @Override
     public String toString() {
         return "TaskDefinition{"
             + "id=" + id
diff --git 
a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
 
b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
index 48c144d..9f0f312 100644
--- 
a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
+++ 
b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
@@ -57,6 +57,8 @@ import 
org.apache.dolphinscheduler.common.utils.CollectionUtils;
 import org.apache.dolphinscheduler.common.utils.DateUtils;
 import org.apache.dolphinscheduler.common.utils.JSONUtils;
 import org.apache.dolphinscheduler.common.utils.ParameterUtils;
+import org.apache.dolphinscheduler.common.utils.SnowFlakeUtils;
+import 
org.apache.dolphinscheduler.common.utils.SnowFlakeUtils.SnowFlakeException;
 import org.apache.dolphinscheduler.common.utils.StringUtils;
 import org.apache.dolphinscheduler.common.utils.TaskParametersUtils;
 import org.apache.dolphinscheduler.dao.entity.Command;
@@ -123,6 +125,7 @@ import 
org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Component;
 import org.springframework.transaction.annotation.Transactional;
 
+import com.facebook.presto.jdbc.internal.guava.collect.Lists;
 import com.fasterxml.jackson.databind.JsonNode;
 import com.fasterxml.jackson.databind.node.ObjectNode;
 
@@ -2090,6 +2093,63 @@ public class ProcessService {
         return StringUtils.join(resourceIds, ",");
     }
 
+    public boolean saveTaskDefine(User operator, long projectCode, 
List<TaskDefinitionLog> taskDefinitionLogs) {
+        Date now = new Date();
+        List<TaskDefinitionLog> newTaskDefinitionLogs = new ArrayList<>();
+        List<TaskDefinitionLog> updateTaskDefinitionLogs = new ArrayList<>();
+        for (TaskDefinitionLog taskDefinitionLog : taskDefinitionLogs) {
+            taskDefinitionLog.setProjectCode(projectCode);
+            taskDefinitionLog.setUpdateTime(now);
+            taskDefinitionLog.setOperateTime(now);
+            taskDefinitionLog.setOperator(operator.getId());
+            
taskDefinitionLog.setResourceIds(getResourceIds(taskDefinitionLog));
+            if (taskDefinitionLog.getCode() > 0 && 
taskDefinitionLog.getVersion() > 0) {
+                TaskDefinitionLog definitionCodeAndVersion = 
taskDefinitionLogMapper
+                    
.queryByDefinitionCodeAndVersion(taskDefinitionLog.getCode(), 
taskDefinitionLog.getVersion());
+                if (definitionCodeAndVersion != null) {
+                    if (!taskDefinitionLog.equals(definitionCodeAndVersion)) {
+                        
taskDefinitionLog.setUserId(definitionCodeAndVersion.getUserId());
+                        Integer version = 
taskDefinitionLogMapper.queryMaxVersionForDefinition(taskDefinitionLog.getCode());
+                        taskDefinitionLog.setVersion(version + 1);
+                        
taskDefinitionLog.setCreateTime(definitionCodeAndVersion.getCreateTime());
+                        updateTaskDefinitionLogs.add(taskDefinitionLog);
+                    }
+                    continue;
+                }
+            }
+            taskDefinitionLog.setUserId(operator.getId());
+            taskDefinitionLog.setVersion(Constants.VERSION_FIRST);
+            taskDefinitionLog.setCreateTime(now);
+            if (taskDefinitionLog.getCode() == 0) {
+                try {
+                    
taskDefinitionLog.setCode(SnowFlakeUtils.getInstance().nextId());
+                } catch (SnowFlakeException e) {
+                    logger.error("Task code get error, ", e);
+                    return false;
+                }
+            }
+            newTaskDefinitionLogs.add(taskDefinitionLog);
+        }
+        for (TaskDefinitionLog taskDefinitionToUpdate : 
updateTaskDefinitionLogs) {
+            TaskDefinition task = 
taskDefinitionMapper.queryByCode(taskDefinitionToUpdate.getCode());
+            if (task == null) {
+                newTaskDefinitionLogs.add(taskDefinitionToUpdate);
+            } else {
+                int update = 
taskDefinitionMapper.updateById(taskDefinitionToUpdate);
+                int insert = 
taskDefinitionLogMapper.insert(taskDefinitionToUpdate);
+                if ((update & insert) != 1) {
+                    return false;
+                }
+            }
+        }
+        if (!newTaskDefinitionLogs.isEmpty()) {
+            int insert = 
taskDefinitionMapper.batchInsert(newTaskDefinitionLogs);
+            int logInsert = 
taskDefinitionLogMapper.batchInsert(newTaskDefinitionLogs);
+            return (logInsert & insert) != 0;
+        }
+        return true;
+    }
+
     /**
      * save processDefinition (including create or update processDefinition)
      */
@@ -2116,21 +2176,33 @@ public class ProcessService {
      * save task relations
      */
     public int saveTaskRelation(User operator, long projectCode, long 
processDefinitionCode, int processDefinitionVersion,
-                                List<ProcessTaskRelationLog> taskRelationList) 
{
+                                List<ProcessTaskRelationLog> taskRelationList, 
List<TaskDefinitionLog> taskDefinitionLogs) {
         List<ProcessTaskRelation> processTaskRelationList = 
processTaskRelationMapper.queryByProcessCode(projectCode, 
processDefinitionCode);
         if (!processTaskRelationList.isEmpty()) {
             processTaskRelationMapper.deleteByCode(projectCode, 
processDefinitionCode);
         }
+        Map<Long, TaskDefinitionLog> taskDefinitionLogMap = null;
+        if (CollectionUtils.isNotEmpty(taskDefinitionLogs)) {
+            taskDefinitionLogMap = taskDefinitionLogs.stream()
+                .collect(Collectors.toMap(TaskDefinition::getCode, 
taskDefinitionLog -> taskDefinitionLog));
+        }
         Date now = new Date();
-        taskRelationList.forEach(processTaskRelationLog -> {
+        for (ProcessTaskRelationLog processTaskRelationLog : taskRelationList) 
{
             processTaskRelationLog.setProjectCode(projectCode);
             
processTaskRelationLog.setProcessDefinitionCode(processDefinitionCode);
             
processTaskRelationLog.setProcessDefinitionVersion(processDefinitionVersion);
+            if (taskDefinitionLogMap != null) {
+                TaskDefinitionLog taskDefinitionLog = 
taskDefinitionLogMap.get(processTaskRelationLog.getPreTaskCode());
+                if (taskDefinitionLog != null) {
+                    
processTaskRelationLog.setPreTaskVersion(taskDefinitionLog.getVersion());
+                }
+                
processTaskRelationLog.setPostTaskVersion(taskDefinitionLogMap.get(processTaskRelationLog.getPostTaskCode()).getVersion());
+            }
             processTaskRelationLog.setCreateTime(now);
             processTaskRelationLog.setUpdateTime(now);
             processTaskRelationLog.setOperator(operator.getId());
             processTaskRelationLog.setOperateTime(now);
-        });
+        }
         int result = processTaskRelationMapper.batchInsert(taskRelationList);
         int resultLog = 
processTaskRelationLogMapper.batchInsert(taskRelationList);
         return result & resultLog;
@@ -2162,7 +2234,7 @@ public class ProcessService {
      */
     public DAG<String, TaskNode, TaskNodeRelation> 
genDagGraph(ProcessDefinition processDefinition) {
         List<ProcessTaskRelationLog> processTaskRelations = 
processTaskRelationLogMapper.queryByProcessCodeAndVersion(processDefinition.getCode(),
 processDefinition.getVersion());
-        List<TaskNode> taskNodeList = transformTask(processTaskRelations);
+        List<TaskNode> taskNodeList = transformTask(processTaskRelations, 
Lists.newArrayList());
         ProcessDag processDag = DagHelper.getProcessDag(taskNodeList, new 
ArrayList<>(processTaskRelations));
         // Generate concrete Dag to be executed
         return DagHelper.buildDagGraph(processDag);
@@ -2288,7 +2360,7 @@ public class ProcessService {
     /**
      * Use temporarily before refactoring taskNode
      */
-    public List<TaskNode> transformTask(List<ProcessTaskRelationLog> 
taskRelationList) {
+    public List<TaskNode> transformTask(List<ProcessTaskRelationLog> 
taskRelationList, List<TaskDefinitionLog> taskDefinitionLogs) {
         Map<Long, List<Long>> taskCodeMap = new HashMap<>();
         for (ProcessTaskRelationLog processTaskRelation : taskRelationList) {
             taskCodeMap.compute(processTaskRelation.getPostTaskCode(), (k, v) 
-> {
@@ -2301,7 +2373,9 @@ public class ProcessService {
                 return v;
             });
         }
-        List<TaskDefinitionLog> taskDefinitionLogs = 
genTaskDefineList(taskRelationList);
+        if (CollectionUtils.isEmpty(taskDefinitionLogs)) {
+            taskDefinitionLogs = genTaskDefineList(taskRelationList);
+        }
         Map<Long, TaskDefinitionLog> taskDefinitionLogMap = 
taskDefinitionLogs.stream()
             .collect(Collectors.toMap(TaskDefinitionLog::getCode, 
taskDefinitionLog -> taskDefinitionLog));
         List<TaskNode> taskNodeList = new ArrayList<>();
@@ -2318,7 +2392,7 @@ public class ProcessService {
                 
taskNode.setMaxRetryTimes(taskDefinitionLog.getFailRetryTimes());
                 
taskNode.setRetryInterval(taskDefinitionLog.getFailRetryInterval());
                 Map<String, Object> taskParamsMap = 
taskNode.taskParamsToJsonObj(taskDefinitionLog.getTaskParams());
-                taskNode.setConditionResult((String) 
taskParamsMap.get(Constants.CONDITION_RESULT));
+                
taskNode.setConditionResult(JSONUtils.toJsonString(taskParamsMap.get(Constants.CONDITION_RESULT)));
                 
taskNode.setDependence(JSONUtils.toJsonString(taskParamsMap.get(Constants.DEPENDENCE)));
                 taskParamsMap.remove(Constants.CONDITION_RESULT);
                 taskParamsMap.remove(Constants.DEPENDENCE);

Reply via email to