This is an automated email from the ASF dual-hosted git repository.

leonbao pushed a commit to branch json_split
in repository https://gitbox.apache.org/repos/asf/incubator-dolphinscheduler.git


The following commit(s) were added to refs/heads/json_split by this push:
     new 86ce8f1  [JsonSplit-4417][master/worker/api]json split- remove json in 
task instance. (#5160)
86ce8f1 is described below

commit 86ce8f133a42a2464a4916c6e0b42dc74628c70d
Author: bao liang <[email protected]>
AuthorDate: Sat Apr 3 17:58:04 2021 +0800

    [JsonSplit-4417][master/worker/api]json split- remove json in task 
instance. (#5160)
    
    * [json split] refactor json in task instance
    
    * add task_pramas in task_instance
    
    * feature #4417 refactor json split in task instance
    
    * code style
    
    * code style
    
    * code style
    
    * code style
    
    * code style
    
    * update
    
    * update
    
    * update unit test
    
    * update
---
 .../service/impl/ProcessDefinitionServiceImpl.java |  15 +-
 .../api/service/ProcessDefinitionServiceTest.java  |  33 +---
 .../common/enums/TaskTimeoutStrategy.java          |   1 +
 .../dolphinscheduler/dao/entity/TaskInstance.java  |  33 ++--
 .../dao/mapper/ProcessTaskRelationLogMapper.java   |   5 +
 .../dao/mapper/TaskDefinitionLogMapper.java        |   7 +-
 .../dao/mapper/ProcessTaskRelationLogMapper.xml    |  10 ++
 .../dao/entity/TaskInstanceTest.java               |  37 +++--
 .../dao/mapper/TaskInstanceMapperTest.java         |   1 -
 .../builder/TaskExecutionContextBuilder.java       |  13 +-
 .../cache/impl/TaskInstanceCacheManagerImpl.java   |   1 -
 .../master/consumer/TaskPriorityQueueConsumer.java |  41 +++--
 .../master/runner/ConditionsTaskExecThread.java    |   2 +-
 .../master/runner/DependentTaskExecThread.java     |   3 +-
 .../master/runner/MasterBaseTaskExecThread.java    |  12 +-
 .../server/master/runner/MasterExecThread.java     |   9 +-
 .../server/worker/runner/TaskExecuteThread.java    |  39 +----
 .../server/master/ConditionsTaskTest.java          |   1 -
 .../server/master/DependentTaskTest.java           |  22 ++-
 .../server/master/SubProcessTaskTest.java          |   3 +-
 .../consumer/TaskPriorityQueueConsumerTest.java    | 182 +--------------------
 .../master/runner/MasterTaskExecThreadTest.java    |   1 -
 .../worker/shell/ShellCommandExecutorTest.java     |   1 -
 .../server/worker/sql/SqlExecutorTest.java         |   4 +-
 .../service/process/ProcessService.java            |  93 +++++++++--
 .../service/process/ProcessServiceTest.java        |   2 +-
 sql/dolphinscheduler_mysql.sql                     |   1 +
 sql/dolphinscheduler_postgre.sql                   |   1 +
 28 files changed, 238 insertions(+), 335 deletions(-)

diff --git 
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java
 
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java
index 319c636..13048c5 100644
--- 
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java
+++ 
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java
@@ -57,6 +57,8 @@ import 
org.apache.dolphinscheduler.dao.entity.ProcessDefinitionLog;
 import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
 import org.apache.dolphinscheduler.dao.entity.Project;
 import org.apache.dolphinscheduler.dao.entity.Schedule;
+import org.apache.dolphinscheduler.dao.entity.TaskDefinition;
+import org.apache.dolphinscheduler.dao.entity.TaskDefinitionLog;
 import org.apache.dolphinscheduler.dao.entity.TaskInstance;
 import org.apache.dolphinscheduler.dao.entity.User;
 import org.apache.dolphinscheduler.dao.mapper.ProcessDefinitionLogMapper;
@@ -70,6 +72,8 @@ import org.apache.dolphinscheduler.dao.mapper.UserMapper;
 import org.apache.dolphinscheduler.service.permission.PermissionCheck;
 import org.apache.dolphinscheduler.service.process.ProcessService;
 
+import org.apache.commons.collections.map.HashedMap;
+
 import java.io.BufferedOutputStream;
 import java.io.IOException;
 import java.nio.charset.StandardCharsets;
@@ -1249,6 +1253,10 @@ public class ProcessDefinitionServiceImpl extends 
BaseServiceImpl implements Pro
          * List of process instances
          */
         List<ProcessInstance> processInstanceList = 
processInstanceService.queryByProcessDefineCode(processDefinition.getCode(), 
limit);
+        List<TaskDefinitionLog> taskDefinitionList = 
processService.queryTaskDefinitionList(processDefinition.getCode(),
+                processDefinition.getVersion());
+        Map<Long, TaskDefinition> taskDefinitionMap = new HashedMap();
+        taskDefinitionList.forEach(taskDefinitionLog -> 
taskDefinitionMap.put(taskDefinitionLog.getCode(), taskDefinitionLog));
 
         for (ProcessInstance processInstance : processInstanceList) {
             
processInstance.setDuration(DateUtils.format2Duration(processInstance.getStartTime(),
 processInstance.getEndTime()));
@@ -1305,11 +1313,10 @@ public class ProcessDefinitionServiceImpl extends 
BaseServiceImpl implements Pro
                         /**
                          * if process is sub process, the return sub id, or 
sub id=0
                          */
-                        if 
(taskInstance.getTaskType().equals(TaskType.SUB_PROCESS.name())) {
-                            String taskJson = taskInstance.getTaskJson();
-                            taskNode = JSONUtils.parseObject(taskJson, 
TaskNode.class);
+                        if (taskInstance.isSubProcess()) {
+                            TaskDefinition taskDefinition = 
taskDefinitionMap.get(taskInstance.getTaskCode());
                             subProcessId = 
Integer.parseInt(JSONUtils.parseObject(
-                                    
taskNode.getParams()).path(CMD_PARAM_SUB_PROCESS_DEFINE_ID).asText());
+                                    
taskDefinition.getTaskParams()).path(CMD_PARAM_SUB_PROCESS_DEFINE_ID).asText());
                         }
                         treeViewDto.getInstances().add(new 
Instance(taskInstance.getId(), taskInstance.getName(), 
taskInstance.getTaskType(), taskInstance.getState().toString()
                                 , taskInstance.getStartTime(), 
taskInstance.getEndTime(), taskInstance.getHost(), 
DateUtils.format2Readable(endTime.getTime() - startTime.getTime()), 
subProcessId));
diff --git 
a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionServiceTest.java
 
b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionServiceTest.java
index 234dd4d..9735512 100644
--- 
a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionServiceTest.java
+++ 
b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionServiceTest.java
@@ -837,38 +837,7 @@ public class ProcessDefinitionServiceTest {
         taskInstance.setName("test_task_instance");
         taskInstance.setState(ExecutionStatus.RUNNING_EXECUTION);
         taskInstance.setHost("192.168.xx.xx");
-        taskInstance.setTaskJson("{\n"
-                + "  \"conditionResult\": {\n"
-                + "    \"failedNode\": [\n"
-                + "      \"\"\n"
-                + "    ],\n"
-                + "    \"successNode\": [\n"
-                + "      \"\"\n"
-                + "    ]\n"
-                + "  },\n"
-                + "  \"delayTime\": \"0\",\n"
-                + "  \"dependence\": {},\n"
-                + "  \"description\": \"\",\n"
-                + "  \"id\": \"1\",\n"
-                + "  \"maxRetryTimes\": \"0\",\n"
-                + "  \"name\": \"test_task_instance\",\n"
-                + "  \"params\": {\n"
-                + "    \"processDefinitionId\": \"222\",\n"
-                + "    \"resourceList\": []\n"
-                + "  },\n"
-                + "  \"preTasks\": [],\n"
-                + "  \"retryInterval\": \"1\",\n"
-                + "  \"runFlag\": \"NORMAL\",\n"
-                + "  \"taskInstancePriority\": \"MEDIUM\",\n"
-                + "  \"timeout\": {\n"
-                + "    \"enable\": false,\n"
-                + "    \"interval\": null,\n"
-                + "    \"strategy\": \"\"\n"
-                + "  },\n"
-                + "  \"type\": \"SUB_PROCESS\",\n"
-                + "  \"workerGroup\": \"default\"\n"
-                + "}");
-        //task instance exist
+        taskInstance.setTaskParams("\"processDefinitionId\": \"222\",\n");
         
Mockito.when(processDefineMapper.selectById(46)).thenReturn(processDefinition);
         
Mockito.when(processService.genDagGraph(processDefinition)).thenReturn(new 
DAG<>());
         Mockito.when(processInstanceService.queryByProcessDefineCode(46L, 
10)).thenReturn(processInstanceList);
diff --git 
a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/TaskTimeoutStrategy.java
 
b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/TaskTimeoutStrategy.java
index a8bd325..335b986 100644
--- 
a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/TaskTimeoutStrategy.java
+++ 
b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/TaskTimeoutStrategy.java
@@ -57,4 +57,5 @@ public enum TaskTimeoutStrategy {
         }
         throw new IllegalArgumentException("invalid status : " + status);
     }
+
 }
diff --git 
a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskInstance.java
 
b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskInstance.java
index 73d4bca..00f2b9a 100644
--- 
a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskInstance.java
+++ 
b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskInstance.java
@@ -22,6 +22,7 @@ import org.apache.dolphinscheduler.common.enums.Flag;
 import org.apache.dolphinscheduler.common.enums.Priority;
 import org.apache.dolphinscheduler.common.enums.TaskType;
 import org.apache.dolphinscheduler.common.model.TaskNode;
+import org.apache.dolphinscheduler.common.task.dependent.DependentParameters;
 import org.apache.dolphinscheduler.common.utils.JSONUtils;
 
 import java.io.Serializable;
@@ -75,7 +76,7 @@ public class TaskInstance implements Serializable {
     private long taskCode;
 
     /**
-     *  process definition code
+     * process definition code
      */
     private long processDefinitionCode;
 
@@ -184,7 +185,7 @@ public class TaskInstance implements Serializable {
      * dependency
      */
     @TableField(exist = false)
-    private String dependency;
+    private DependentParameters dependency;
 
     /**
      * duration
@@ -235,7 +236,7 @@ public class TaskInstance implements Serializable {
      * varPool string
      */
     private String varPool;
-    
+
     /**
      * executor name
      */
@@ -251,6 +252,11 @@ public class TaskInstance implements Serializable {
      */
     private int delayTime;
 
+    /**
+     * task params
+     */
+    private String taskParams;
+
     public void init(String host, Date startTime, String executePath) {
         this.host = host;
         this.startTime = startTime;
@@ -264,7 +270,7 @@ public class TaskInstance implements Serializable {
     public void setVarPool(String varPool) {
         this.varPool = varPool;
     }
-    
+
     public ProcessInstance getProcessInstance() {
         return processInstance;
     }
@@ -429,15 +435,14 @@ public class TaskInstance implements Serializable {
         this.appLink = appLink;
     }
 
-    public String getDependency() {
-        if (this.dependency != null) {
-            return this.dependency;
+    public DependentParameters getDependency() {
+        if (this.dependency == null) {
+            this.dependency = JSONUtils.parseObject(this.getTaskParams(), 
DependentParameters.class);
         }
-        TaskNode taskNode = JSONUtils.parseObject(taskJson, TaskNode.class);
-        return taskNode == null ? null : taskNode.getDependence();
+        return this.dependency;
     }
 
-    public void setDependency(String dependency) {
+    public void setDependency(DependentParameters dependency) {
         this.dependency = dependency;
     }
 
@@ -644,4 +649,12 @@ public class TaskInstance implements Serializable {
     public void setTaskDefinitionVersion(int taskDefinitionVersion) {
         this.taskDefinitionVersion = taskDefinitionVersion;
     }
+
+    public String getTaskParams() {
+        return taskParams;
+    }
+
+    public void setTaskParams(String taskParams) {
+        this.taskParams = taskParams;
+    }
 }
diff --git 
a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ProcessTaskRelationLogMapper.java
 
b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ProcessTaskRelationLogMapper.java
index deda046..9183b0f 100644
--- 
a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ProcessTaskRelationLogMapper.java
+++ 
b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ProcessTaskRelationLogMapper.java
@@ -39,4 +39,9 @@ public interface ProcessTaskRelationLogMapper extends 
BaseMapper<ProcessTaskRela
      */
     List<ProcessTaskRelationLog> 
queryByProcessCodeAndVersion(@Param("processCode") long processCode,
                                                               
@Param("processVersion") int processVersion);
+
+    List<ProcessTaskRelationLog> queryByTaskRelationList(@Param("processCode") 
long processCode,
+                                                         
@Param("processVersion") int processVersion,
+                                                         @Param("taskCode") 
long taskCode,
+                                                         @Param("taskVersion") 
long taskVersion);
 }
diff --git 
a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/TaskDefinitionLogMapper.java
 
b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/TaskDefinitionLogMapper.java
index 59ea5c5..1ad40e0 100644
--- 
a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/TaskDefinitionLogMapper.java
+++ 
b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/TaskDefinitionLogMapper.java
@@ -59,11 +59,12 @@ public interface TaskDefinitionLogMapper extends 
BaseMapper<TaskDefinitionLog> {
     TaskDefinitionLog 
queryByDefinitionCodeAndVersion(@Param("taskDefinitionCode") long 
taskDefinitionCode,
                                                       @Param("version") int 
version);
 
+
     /**
-     * query task definition log
      *
-     * @param taskDefinitions taskDefinition collection
-     * @return task definition log
+     * @param taskDefinitions
+     * @return
      */
     List<TaskDefinitionLog> queryByTaskDefinitions(@Param("taskDefinitions") 
Collection<TaskDefinition> taskDefinitions);
+
 }
diff --git 
a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ProcessTaskRelationLogMapper.xml
 
b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ProcessTaskRelationLogMapper.xml
index b604a1d..e7e4a12 100644
--- 
a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ProcessTaskRelationLogMapper.xml
+++ 
b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ProcessTaskRelationLogMapper.xml
@@ -29,4 +29,14 @@
         WHERE process_definition_code = #{processCode}
         and process_definition_version = #{processVersion}
     </select>
+    <select id="queryByTaskRelationList"
+            
resultType="org.apache.dolphinscheduler.dao.entity.ProcessTaskRelationLog">
+        select
+        <include refid="baseSql"/>
+        from t_ds_process_task_relation_log
+        WHERE process_definition_code = #{processCode}
+        and process_definition_version = #{processVersion}
+        and post_task_code = #{taskCode}
+        and post_task_version = #{taskVersion}
+    </select>
 </mapper>
diff --git 
a/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/entity/TaskInstanceTest.java
 
b/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/entity/TaskInstanceTest.java
index 5742c95..d2117dc 100644
--- 
a/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/entity/TaskInstanceTest.java
+++ 
b/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/entity/TaskInstanceTest.java
@@ -16,9 +16,16 @@
  */
 package org.apache.dolphinscheduler.dao.entity;
 
+import org.apache.dolphinscheduler.common.enums.DependentRelation;
+import org.apache.dolphinscheduler.common.model.DependentItem;
+import org.apache.dolphinscheduler.common.model.DependentTaskModel;
 import org.apache.dolphinscheduler.common.model.TaskNode;
+import org.apache.dolphinscheduler.common.task.dependent.DependentParameters;
 import org.apache.dolphinscheduler.common.utils.JSONUtils;
 
+import java.util.ArrayList;
+import java.util.List;
+
 import org.junit.Assert;
 import org.junit.Test;
 
@@ -57,25 +64,33 @@ public class TaskInstanceTest {
         TaskNode taskNode;
 
         taskInstance = new TaskInstance();
-        taskInstance.setTaskJson(null);
         Assert.assertNull(taskInstance.getDependency());
 
         taskInstance = new TaskInstance();
         taskNode = new TaskNode();
         taskNode.setDependence(null);
-        taskInstance.setTaskJson(JSONUtils.toJsonString(taskNode));
         Assert.assertNull(taskInstance.getDependency());
 
         taskInstance = new TaskInstance();
-        taskNode = new TaskNode();
-        // expect a JSON here, and will be unwrap when toJsonString
-        taskNode.setDependence("\"A\"");
-        taskInstance.setTaskJson(JSONUtils.toJsonString(taskNode));
-        Assert.assertEquals("A", taskInstance.getDependency());
+        
taskInstance.setTaskParams(JSONUtils.toJsonString(getDependentParameters()));
+        taskInstance.getDependency();
+    }
 
-        taskInstance = new TaskInstance();
-        taskInstance.setTaskJson(null);
-        taskInstance.setDependency("{}");
-        Assert.assertEquals("{}", taskInstance.getDependency());
+    /**
+     *
+     * @return
+     */
+    private DependentParameters getDependentParameters() {
+        DependentParameters dependentParameters = new DependentParameters();
+        List<DependentTaskModel> dependTaskList = new ArrayList<>();
+        List<DependentItem> dependentItems = new ArrayList<>();
+        DependentItem dependentItem = new DependentItem();
+        dependentItem.setDepTasks("A");
+        dependentItem.setDefinitionCode(222L);
+        dependentItem.setCycle("today");
+        dependentItems.add(dependentItem);
+        dependentParameters.setDependTaskList(dependTaskList);
+        dependentParameters.setRelation(DependentRelation.AND);
+        return dependentParameters;
     }
 }
diff --git 
a/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/TaskInstanceMapperTest.java
 
b/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/TaskInstanceMapperTest.java
index 6dc348c..ee57a70 100644
--- 
a/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/TaskInstanceMapperTest.java
+++ 
b/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/TaskInstanceMapperTest.java
@@ -107,7 +107,6 @@ public class TaskInstanceMapperTest {
         taskInstance.setState(state);
         taskInstance.setStartTime(new Date());
         taskInstance.setEndTime(new Date());
-        taskInstance.setTaskJson("{}");
         taskInstance.setProcessInstanceId(processInstanceId);
         taskInstance.setTaskType(taskType);
         taskInstance.setProcessDefinitionCode(1L);
diff --git 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/builder/TaskExecutionContextBuilder.java
 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/builder/TaskExecutionContextBuilder.java
index 7bfd9a0..620100f 100644
--- 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/builder/TaskExecutionContextBuilder.java
+++ 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/builder/TaskExecutionContextBuilder.java
@@ -17,6 +17,8 @@
 
 package org.apache.dolphinscheduler.server.builder;
 
+import static 
org.apache.dolphinscheduler.common.Constants.SEC_2_MINUTES_TIME_UNIT;
+
 import org.apache.dolphinscheduler.dao.entity.*;
 import org.apache.dolphinscheduler.server.entity.*;
 
@@ -44,7 +46,6 @@ public class TaskExecutionContextBuilder {
         taskExecutionContext.setStartTime(taskInstance.getStartTime());
         taskExecutionContext.setTaskType(taskInstance.getTaskType());
         taskExecutionContext.setLogPath(taskInstance.getLogPath());
-        taskExecutionContext.setTaskJson(taskInstance.getTaskJson());
         taskExecutionContext.setWorkerGroup(taskInstance.getWorkerGroup());
         taskExecutionContext.setHost(taskInstance.getHost());
         taskExecutionContext.setResources(taskInstance.getResources());
@@ -52,6 +53,16 @@ public class TaskExecutionContextBuilder {
         return this;
     }
 
+    public TaskExecutionContextBuilder 
buildTaskDefinitionRelatedInfo(TaskDefinition taskDefinition) {
+        int timeoutSeconds = taskDefinition.getTimeout() * 
SEC_2_MINUTES_TIME_UNIT;
+        if (timeoutSeconds >= Integer.MAX_VALUE) {
+            timeoutSeconds = Integer.MAX_VALUE;
+        }
+        
taskExecutionContext.setTaskTimeoutStrategy(taskDefinition.getTimeoutNotifyStrategy().getCode());
+        taskExecutionContext.setTaskTimeout(timeoutSeconds);
+        return this;
+    }
+
 
     /**
      * build processInstance related info
diff --git 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/cache/impl/TaskInstanceCacheManagerImpl.java
 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/cache/impl/TaskInstanceCacheManagerImpl.java
index 4d55490..0dc7035 100644
--- 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/cache/impl/TaskInstanceCacheManagerImpl.java
+++ 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/cache/impl/TaskInstanceCacheManagerImpl.java
@@ -76,7 +76,6 @@ public class TaskInstanceCacheManagerImpl implements 
TaskInstanceCacheManager {
         taskInstance.setStartTime(taskExecutionContext.getStartTime());
         taskInstance.setTaskType(taskInstance.getTaskType());
         taskInstance.setExecutePath(taskInstance.getExecutePath());
-        taskInstance.setTaskJson(taskInstance.getTaskJson());
         taskInstanceCache.put(taskExecutionContext.getTaskInstanceId(), 
taskInstance);
     }
 
diff --git 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumer.java
 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumer.java
index 69058a4..d198292 100644
--- 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumer.java
+++ 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumer.java
@@ -145,7 +145,6 @@ public class TaskPriorityQueueConsumer extends Thread {
                         
TimeUnit.MILLISECONDS.sleep(Constants.SLEEP_TIME_MILLIS);
                     }
                 }
-
             } catch (Exception e) {
                 logger.error("dispatcher task error", e);
             }
@@ -201,8 +200,6 @@ public class TaskPriorityQueueConsumer extends Thread {
         // task type
         TaskType taskType = TaskType.valueOf(taskInstance.getTaskType());
 
-        // task node
-        TaskNode taskNode = JSONUtils.parseObject(taskInstance.getTaskJson(), 
TaskNode.class);
 
         Integer userId = taskInstance.getProcessDefine() == null ? 0 : 
taskInstance.getProcessDefine().getUserId();
         Tenant tenant = 
processService.getTenantForProcess(taskInstance.getProcessInstance().getTenantId(),
 userId);
@@ -221,7 +218,7 @@ public class TaskPriorityQueueConsumer extends Thread {
         String userQueue = 
processService.queryUserQueueByProcessInstanceId(taskInstance.getProcessInstanceId());
         
taskInstance.getProcessInstance().setQueue(StringUtils.isEmpty(userQueue) ? 
tenant.getQueue() : userQueue);
         
taskInstance.getProcessInstance().setTenantCode(tenant.getTenantCode());
-        taskInstance.setResources(getResourceFullNames(taskNode));
+        taskInstance.setResources(getResourceFullNames(taskInstance));
 
         SQLTaskExecutionContext sqlTaskExecutionContext = new 
SQLTaskExecutionContext();
         DataxTaskExecutionContext dataxTaskExecutionContext = new 
DataxTaskExecutionContext();
@@ -230,21 +227,21 @@ public class TaskPriorityQueueConsumer extends Thread {
 
         // SQL task
         if (taskType == TaskType.SQL) {
-            setSQLTaskRelation(sqlTaskExecutionContext, taskNode);
+            setSQLTaskRelation(sqlTaskExecutionContext, taskInstance);
         }
 
         // DATAX task
         if (taskType == TaskType.DATAX) {
-            setDataxTaskRelation(dataxTaskExecutionContext, taskNode);
+            setDataxTaskRelation(dataxTaskExecutionContext, taskInstance);
         }
 
         // procedure task
         if (taskType == TaskType.PROCEDURE) {
-            setProcedureTaskRelation(procedureTaskExecutionContext, taskNode);
+            setProcedureTaskRelation(procedureTaskExecutionContext, 
taskInstance);
         }
 
         if (taskType == TaskType.SQOOP) {
-            setSqoopTaskRelation(sqoopTaskExecutionContext, taskNode);
+            setSqoopTaskRelation(sqoopTaskExecutionContext, taskInstance);
         }
 
         return TaskExecutionContextBuilder.get()
@@ -262,10 +259,10 @@ public class TaskPriorityQueueConsumer extends Thread {
      * set procedure task relation
      *
      * @param procedureTaskExecutionContext procedureTaskExecutionContext
-     * @param taskNode                      taskNode
+     * @param taskInstance                      taskInstance
      */
-    private void setProcedureTaskRelation(ProcedureTaskExecutionContext 
procedureTaskExecutionContext, TaskNode taskNode) {
-        ProcedureParameters procedureParameters = 
JSONUtils.parseObject(taskNode.getParams(), ProcedureParameters.class);
+    private void setProcedureTaskRelation(ProcedureTaskExecutionContext 
procedureTaskExecutionContext, TaskInstance taskInstance) {
+        ProcedureParameters procedureParameters = 
JSONUtils.parseObject(taskInstance.getTaskParams(), ProcedureParameters.class);
         int datasourceId = procedureParameters.getDatasource();
         DataSource datasource = 
processService.findDataSourceById(datasourceId);
         
procedureTaskExecutionContext.setConnectionParams(datasource.getConnectionParams());
@@ -275,10 +272,10 @@ public class TaskPriorityQueueConsumer extends Thread {
      * set datax task relation
      *
      * @param dataxTaskExecutionContext dataxTaskExecutionContext
-     * @param taskNode                  taskNode
+     * @param taskInstance                  taskInstance
      */
-    protected void setDataxTaskRelation(DataxTaskExecutionContext 
dataxTaskExecutionContext, TaskNode taskNode) {
-        DataxParameters dataxParameters = 
JSONUtils.parseObject(taskNode.getParams(), DataxParameters.class);
+    protected void setDataxTaskRelation(DataxTaskExecutionContext 
dataxTaskExecutionContext, TaskInstance taskInstance) {
+        DataxParameters dataxParameters = 
JSONUtils.parseObject(taskInstance.getTaskParams(), DataxParameters.class);
 
         DataSource dbSource = 
processService.findDataSourceById(dataxParameters.getDataSource());
         DataSource dbTarget = 
processService.findDataSourceById(dataxParameters.getDataTarget());
@@ -300,10 +297,10 @@ public class TaskPriorityQueueConsumer extends Thread {
      * set sqoop task relation
      *
      * @param sqoopTaskExecutionContext sqoopTaskExecutionContext
-     * @param taskNode                  taskNode
+     * @param taskInstance              taskInstance
      */
-    private void setSqoopTaskRelation(SqoopTaskExecutionContext 
sqoopTaskExecutionContext, TaskNode taskNode) {
-        SqoopParameters sqoopParameters = 
JSONUtils.parseObject(taskNode.getParams(), SqoopParameters.class);
+    private void setSqoopTaskRelation(SqoopTaskExecutionContext 
sqoopTaskExecutionContext, TaskInstance taskInstance) {
+        SqoopParameters sqoopParameters = 
JSONUtils.parseObject(taskInstance.getTaskParams(), SqoopParameters.class);
 
         // sqoop job type is template set task relation
         if 
(sqoopParameters.getJobType().equals(SqoopJobType.TEMPLATE.getDescp())) {
@@ -331,10 +328,10 @@ public class TaskPriorityQueueConsumer extends Thread {
      * set SQL task relation
      *
      * @param sqlTaskExecutionContext sqlTaskExecutionContext
-     * @param taskNode                taskNode
+     * @param taskInstance                taskInstance
      */
-    private void setSQLTaskRelation(SQLTaskExecutionContext 
sqlTaskExecutionContext, TaskNode taskNode) {
-        SqlParameters sqlParameters = 
JSONUtils.parseObject(taskNode.getParams(), SqlParameters.class);
+    private void setSQLTaskRelation(SQLTaskExecutionContext 
sqlTaskExecutionContext, TaskInstance taskInstance) {
+        SqlParameters sqlParameters = 
JSONUtils.parseObject(taskInstance.getTaskParams(), SqlParameters.class);
         int datasourceId = sqlParameters.getDatasource();
         DataSource datasource = 
processService.findDataSourceById(datasourceId);
         
sqlTaskExecutionContext.setConnectionParams(datasource.getConnectionParams());
@@ -381,9 +378,9 @@ public class TaskPriorityQueueConsumer extends Thread {
     /**
      * get resource map key is full name and value is tenantCode
      */
-    protected Map<String, String> getResourceFullNames(TaskNode taskNode) {
+    protected Map<String, String> getResourceFullNames(TaskInstance 
taskInstance) {
         Map<String, String> resourcesMap = new HashMap<>();
-        AbstractParameters baseParam = 
TaskParametersUtils.getParameters(taskNode.getType(), taskNode.getParams());
+        AbstractParameters baseParam = 
TaskParametersUtils.getParameters(taskInstance.getTaskType(), 
taskInstance.getTaskParams());
 
         if (baseParam != null) {
             List<ResourceInfo> projectResourceFiles = 
baseParam.getResourceFilesList();
diff --git 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/ConditionsTaskExecThread.java
 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/ConditionsTaskExecThread.java
index d0b314e..8cf6176 100644
--- 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/ConditionsTaskExecThread.java
+++ 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/ConditionsTaskExecThread.java
@@ -130,7 +130,7 @@ public class ConditionsTaskExecThread extends 
MasterBaseTaskExecThread {
         taskInstance.setStartTime(new Date());
         this.processService.saveTaskInstance(taskInstance);
 
-        this.dependentParameters = 
JSONUtils.parseObject(this.taskInstance.getDependency(), 
DependentParameters.class);
+        this.dependentParameters = taskInstance.getDependency();
     }
 
     /**
diff --git 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/DependentTaskExecThread.java
 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/DependentTaskExecThread.java
index 9f78e0c..4ef4783 100644
--- 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/DependentTaskExecThread.java
+++ 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/DependentTaskExecThread.java
@@ -102,8 +102,7 @@ public class DependentTaskExecThread extends 
MasterBaseTaskExecThread {
      */
     private void initDependParameters() {
 
-        this.dependentParameters = 
JSONUtils.parseObject(this.taskInstance.getDependency(),
-                DependentParameters.class);
+        this.dependentParameters = taskInstance.getDependency();
 
         for(DependentTaskModel taskModel : 
dependentParameters.getDependTaskList()){
             this.dependentTaskList.add(new DependentExecute(
diff --git 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterBaseTaskExecThread.java
 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterBaseTaskExecThread.java
index be666ae..30aa39b 100644
--- 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterBaseTaskExecThread.java
+++ 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterBaseTaskExecThread.java
@@ -19,12 +19,13 @@ package org.apache.dolphinscheduler.server.master.runner;
 
 import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
 import org.apache.dolphinscheduler.common.enums.TaskTimeoutStrategy;
-import org.apache.dolphinscheduler.common.model.TaskNode;
+import org.apache.dolphinscheduler.common.enums.TimeoutFlag;
 import org.apache.dolphinscheduler.common.task.TaskTimeoutParameter;
 import org.apache.dolphinscheduler.common.utils.JSONUtils;
 import org.apache.dolphinscheduler.dao.AlertDao;
 import org.apache.dolphinscheduler.dao.entity.ProcessDefinition;
 import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
+import org.apache.dolphinscheduler.dao.entity.TaskDefinition;
 import org.apache.dolphinscheduler.dao.entity.TaskInstance;
 import org.apache.dolphinscheduler.server.master.config.MasterConfig;
 import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
@@ -121,10 +122,11 @@ public class MasterBaseTaskExecThread implements 
Callable<Boolean> {
      * init task timeout parameters
      */
     private void initTimeoutParams() {
-        String taskJson = taskInstance.getTaskJson();
-        TaskNode taskNode = JSONUtils.parseObject(taskJson, TaskNode.class);
-        taskTimeoutParameter = taskNode.getTaskTimeoutParameter();
-
+        TaskDefinition taskDefinition = 
processService.findTaskDefinition(taskInstance.getTaskCode(), 
taskInstance.getTaskDefinitionVersion());
+        boolean timeoutEnable = taskDefinition.getTimeoutFlag() == 
TimeoutFlag.OPEN;
+        taskTimeoutParameter = new TaskTimeoutParameter(timeoutEnable,
+                                                        
taskDefinition.getTimeoutNotifyStrategy(),
+                                                        
taskDefinition.getTimeout());
         if (taskTimeoutParameter.getEnable()) {
             checkTimeoutFlag = true;
         }
diff --git 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterExecThread.java
 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterExecThread.java
index 41eb150..4f83958 100644
--- 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterExecThread.java
+++ 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterExecThread.java
@@ -380,7 +380,8 @@ public class MasterExecThread implements Runnable {
      */
     private void buildFlowDag() throws Exception {
         recoverNodeIdList = 
getStartTaskInstanceList(processInstance.getCommandParam());
-        List<TaskNode> taskNodeList = 
processService.genTaskNodeList(processInstance.getProcessDefinitionCode(), 
processInstance.getProcessDefinitionVersion(), new HashMap<>());
+        List<TaskNode> taskNodeList =
+                
processService.genTaskNodeList(processInstance.getProcessDefinitionCode(), 
processInstance.getProcessDefinitionVersion(), new HashMap<>());
         forbiddenTaskList.clear();
         taskNodeList.stream().forEach(taskNode -> {
             if (taskNode.isForbidden()) {
@@ -485,8 +486,6 @@ public class MasterExecThread implements Runnable {
             taskInstance.setState(ExecutionStatus.SUBMITTED_SUCCESS);
             // process instance id
             taskInstance.setProcessInstanceId(processInstance.getId());
-            // task instance node json
-            taskInstance.setTaskJson(JSONUtils.toJsonString(taskNode));
             // task instance type
             taskInstance.setTaskType(taskNode.getType());
             // task instance whether alert
@@ -556,9 +555,7 @@ public class MasterExecThread implements Runnable {
                 }
             }
             result.put(LOCAL_PARAMS, allParam);
-            taskNode.setParams(JSONUtils.toJsonString(result));
-            // task instance node json
-            taskInstance.setTaskJson(JSONUtils.toJsonString(taskNode));
+            taskInstance.setTaskParams(JSONUtils.toJsonString(result));
         }
     }
 
diff --git 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThread.java
 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThread.java
index 409c2b7..3263554 100644
--- 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThread.java
+++ 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThread.java
@@ -132,8 +132,6 @@ public class TaskExecuteThread implements Runnable, Delayed 
{
                 return;
             }
 
-            // task node
-            TaskNode taskNode = 
JSONUtils.parseObject(taskExecutionContext.getTaskJson(), TaskNode.class);
 
             if (taskExecutionContext.getStartTime() == null) {
                 taskExecutionContext.setStartTime(new Date());
@@ -148,13 +146,10 @@ public class TaskExecuteThread implements Runnable, 
Delayed {
                     taskExecutionContext.getResources(),
                     logger);
 
-            taskExecutionContext.setTaskParams(taskNode.getParams());
+            
taskExecutionContext.setTaskParams(taskExecutionContext.getTaskParams());
             taskExecutionContext.setEnvFile(CommonUtils.getSystemEnvPath());
             taskExecutionContext.setDefinedParams(getGlobalParamsMap());
 
-            // set task timeout
-            setTaskTimeout(taskExecutionContext, taskNode);
-
             taskExecutionContext.setTaskAppId(String.format("%s_%s_%s",
                     taskExecutionContext.getProcessDefineId(),
                     taskExecutionContext.getProcessInstanceId(),
@@ -238,38 +233,6 @@ public class TaskExecuteThread implements Runnable, 
Delayed {
         return globalParamsMap;
     }
 
-    /**
-     * set task timeout
-     * @param taskExecutionContext TaskExecutionContext
-     * @param taskNode
-     */
-    private void setTaskTimeout(TaskExecutionContext taskExecutionContext, 
TaskNode taskNode) {
-        // the default timeout is the maximum value of the integer
-        taskExecutionContext.setTaskTimeout(Integer.MAX_VALUE);
-        TaskTimeoutParameter taskTimeoutParameter = 
taskNode.getTaskTimeoutParameter();
-        if (taskTimeoutParameter.getEnable()) {
-            // get timeout strategy
-            
taskExecutionContext.setTaskTimeoutStrategy(taskTimeoutParameter.getStrategy().getCode());
-            switch (taskTimeoutParameter.getStrategy()) {
-                case WARN:
-                    break;
-                case FAILED:
-                    if (Integer.MAX_VALUE > taskTimeoutParameter.getInterval() 
* 60) {
-                        
taskExecutionContext.setTaskTimeout(taskTimeoutParameter.getInterval() * 60);
-                    }
-                    break;
-                case WARNFAILED:
-                    if (Integer.MAX_VALUE > taskTimeoutParameter.getInterval() 
* 60) {
-                        
taskExecutionContext.setTaskTimeout(taskTimeoutParameter.getInterval() * 60);
-                    }
-                    break;
-                default:
-                    logger.error("not support task timeout strategy: {}", 
taskTimeoutParameter.getStrategy());
-                    throw new IllegalArgumentException("not support task 
timeout strategy");
-
-            }
-        }
-    }
 
     /**
      * kill task
diff --git 
a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/ConditionsTaskTest.java
 
b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/ConditionsTaskTest.java
index 61058de..9a750e1 100644
--- 
a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/ConditionsTaskTest.java
+++ 
b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/ConditionsTaskTest.java
@@ -171,7 +171,6 @@ public class ConditionsTaskTest {
     private TaskInstance getTaskInstance(TaskNode taskNode, ProcessInstance 
processInstance) {
         TaskInstance taskInstance = new TaskInstance();
         taskInstance.setId(1000);
-        taskInstance.setTaskJson(JSONUtils.toJsonString(taskNode));
         taskInstance.setName(taskNode.getName());
         taskInstance.setTaskType(taskNode.getType());
         taskInstance.setProcessInstanceId(processInstance.getId());
diff --git 
a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/DependentTaskTest.java
 
b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/DependentTaskTest.java
index 8b0410b..571e79a 100644
--- 
a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/DependentTaskTest.java
+++ 
b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/DependentTaskTest.java
@@ -22,6 +22,7 @@ import org.apache.dolphinscheduler.common.enums.DependResult;
 import org.apache.dolphinscheduler.common.enums.DependentRelation;
 import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
 import org.apache.dolphinscheduler.common.enums.TaskType;
+import org.apache.dolphinscheduler.common.enums.TimeoutFlag;
 import org.apache.dolphinscheduler.common.model.DependentItem;
 import org.apache.dolphinscheduler.common.model.DependentTaskModel;
 import org.apache.dolphinscheduler.common.model.TaskNode;
@@ -60,6 +61,10 @@ public class DependentTaskTest {
      */
     public static final String FLOWNODE_RUN_FLAG_NORMAL = "NORMAL";
 
+
+    public static final Long TASK_CODE = 1111L;
+    public static final int TASK_VERSION = 1;
+
     private ProcessService processService;
 
     /**
@@ -113,6 +118,9 @@ public class DependentTaskTest {
         Mockito.when(processService
                 .findTaskInstanceById(1000))
                 .thenAnswer(i -> taskInstance);
+
+        Mockito.when(processService.findTaskDefinition(TASK_CODE, 
TASK_VERSION))
+                .thenReturn(getTaskDefinition());
     }
 
     private void testBasicInit() {
@@ -359,14 +367,25 @@ public class DependentTaskTest {
         return taskNode;
     }
 
+    private TaskDefinition getTaskDefinition() {
+        TaskDefinition taskDefinition = new TaskDefinition();
+        taskDefinition.setCode(TASK_CODE);
+        taskDefinition.setVersion(TASK_VERSION);
+        taskDefinition.setTimeoutFlag(TimeoutFlag.CLOSE);
+        taskDefinition.setTimeout(0);
+        return taskDefinition;
+    }
+
     private void setupTaskInstance(TaskNode taskNode) {
         taskInstance = new TaskInstance();
         taskInstance.setId(1000);
+        taskInstance.setTaskCode(TASK_CODE);
+        taskInstance.setTaskDefinitionVersion(TASK_VERSION);
         taskInstance.setProcessInstanceId(processInstance.getId());
         
taskInstance.setProcessDefinitionId(processInstance.getProcessDefinitionId());
         taskInstance.setState(ExecutionStatus.SUBMITTED_SUCCESS);
-        taskInstance.setTaskJson(JSONUtils.toJsonString(taskNode));
         taskInstance.setTaskType(taskNode.getType());
+        
taskInstance.setDependency(JSONUtils.parseObject(taskNode.getDependence(), 
DependentParameters.class));
         taskInstance.setName(taskNode.getName());
     }
 
@@ -405,7 +424,6 @@ public class DependentTaskTest {
         taskInstance.setName(taskName);
         taskInstance.setProcessInstanceId(processInstance.getId());
         
taskInstance.setProcessDefinitionId(processInstance.getProcessDefinitionId());
-        taskInstance.setTaskJson("{}");
         taskInstance.setState(state);
         return taskInstance;
     }
diff --git 
a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/SubProcessTaskTest.java
 
b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/SubProcessTaskTest.java
index 518b116..046ca5b 100644
--- 
a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/SubProcessTaskTest.java
+++ 
b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/SubProcessTaskTest.java
@@ -148,7 +148,8 @@ public class SubProcessTaskTest {
     private TaskInstance getTaskInstance(TaskNode taskNode, ProcessInstance 
processInstance) {
         TaskInstance taskInstance = new TaskInstance();
         taskInstance.setId(1000);
-        taskInstance.setTaskJson(JSONUtils.toJsonString(taskNode));
+        taskInstance.setName("S");
+        taskInstance.setTaskType(TaskType.SUB_PROCESS.toString());
         taskInstance.setName(taskNode.getName());
         taskInstance.setTaskType(taskNode.getType());
         taskInstance.setProcessInstanceId(processInstance.getId());
diff --git 
a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumerTest.java
 
b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumerTest.java
index 14b1c61..bf2cf4c 100644
--- 
a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumerTest.java
+++ 
b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumerTest.java
@@ -110,25 +110,6 @@ public class TaskPriorityQueueConsumerTest {
         taskInstance.setProcessDefinitionId(1);
         taskInstance.setProcessInstanceId(1);
         taskInstance.setState(ExecutionStatus.KILL);
-        
taskInstance.setTaskJson("{\"conditionResult\":\"{\\\"successNode\\\":[\\\"\\\"],\\\"failedNode\\\":[\\\"\\\"]}\","
-                + "\"conditionsTask\":false,"
-                + "\"depList\":[],"
-                + "\"dependence\":\"{}\","
-                + "\"forbidden\":false,"
-                + "\"id\":\"tasks-55201\","
-                + "\"maxRetryTimes\":0,"
-                + "\"name\":\"测试任务\","
-                + "\"params\":\"{\\\"rawScript\\\":\\\"echo 
\\\\\\\"测试任务\\\\\\\"\\\",\\\"localParams\\\":[],\\\"resourceList\\\":[]}\","
-                + "\"preTasks\":\"[]\","
-                + "\"retryInterval\":1,"
-                + "\"runFlag\":\"NORMAL\","
-                + "\"taskInstancePriority\":\"MEDIUM\","
-                + "\"taskTimeoutParameter\":{\"enable\":false,"
-                + "\"interval\":0},"
-                + "\"timeout\":\"{\\\"enable\\\":false,"
-                + "\\\"strategy\\\":\\\"\\\"}\","
-                + "\"type\":\"SHELL\","
-                + "\"workerGroup\":\"default\"}");
         taskInstance.setProcessInstancePriority(Priority.MEDIUM);
         taskInstance.setWorkerGroup("default");
         taskInstance.setExecutorId(2);
@@ -161,13 +142,6 @@ public class TaskPriorityQueueConsumerTest {
         taskInstance.setProcessDefinitionId(1);
         taskInstance.setProcessInstanceId(1);
         taskInstance.setState(ExecutionStatus.KILL);
-        
taskInstance.setTaskJson("{\"conditionsTask\":false,\"depList\":[],\"dependence\":\"{}\",\"forbidden\":false,\"id\":\"tasks-3655\",\"maxRetryTimes\":0,\"name\":\"UDF测试\","
-                + 
"\"params\":\"{\\\"postStatements\\\":[],\\\"connParams\\\":\\\"\\\",\\\"receiversCc\\\":\\\"\\\",\\\"udfs\\\":\\\"1\\\",\\\"type\\\":\\\"HIVE\\\",\\\"title\\\":\\\"test\\\","
-                + "\\\"sql\\\":\\\"select id,name,ds,zodia(ds) from 
t_journey_user\\\",\\\"preStatements\\\":[],"
-                + 
"\\\"sqlType\\\":0,\\\"receivers\\\":\\\"[email protected]\\\",\\\"datasource\\\":3,\\\"showType\\\":\\\"TABLE\\\",\\\"localParams\\\":[]}\","
-                + 
"\"preTasks\":\"[]\",\"retryInterval\":1,\"runFlag\":\"NORMAL\","
-                + "\"taskInstancePriority\":\"MEDIUM\","
-                + 
"\"taskTimeoutParameter\":{\"enable\":false,\"interval\":0},\"timeout\":\"{\\\"enable\\\":false,\\\"strategy\\\":\\\"\\\"}\",\"type\":\"SQL\"}");
         taskInstance.setProcessInstancePriority(Priority.MEDIUM);
         taskInstance.setWorkerGroup("default");
         taskInstance.setExecutorId(2);
@@ -212,26 +186,6 @@ public class TaskPriorityQueueConsumerTest {
         taskInstance.setProcessDefinitionId(1);
         taskInstance.setProcessInstanceId(1);
         taskInstance.setState(ExecutionStatus.KILL);
-        
taskInstance.setTaskJson("{\"conditionResult\":\"{\\\"successNode\\\":[\\\"\\\"],\\\"failedNode\\\":[\\\"\\\"]}\","
-                + 
"\"conditionsTask\":false,\"depList\":[],\"dependence\":\"{}\","
-                + "\"forbidden\":false,\"id\":\"tasks-97625\","
-                + "\"maxRetryTimes\":0,\"name\":\"MySQL数据相互导入\","
-                + "\"params\":\"{\\\"targetTable\\\":\\\"pv2\\\","
-                + "    \\\"postStatements\\\":[],"
-                + "    \\\"jobSpeedRecord\\\":1000,"
-                + "    \\\"customConfig\\\":0,"
-                + "    \\\"dtType\\\":\\\"MYSQL\\\","
-                + "    \\\"dsType\\\":\\\"MYSQL\\\","
-                + "    \\\"jobSpeedByte\\\":0,"
-                + "    \\\"dataSource\\\":80,"
-                + "    \\\"dataTarget\\\":80,"
-                + "    \\\"sql\\\":\\\"SELECT dt,count FROM pv\\\","
-                + "    \\\"preStatements\\\":[]}\","
-                + "\"preTasks\":\"[]\","
-                + 
"\"retryInterval\":1,\"runFlag\":\"NORMAL\",\"taskInstancePriority\":\"MEDIUM\","
-                + 
"\"taskTimeoutParameter\":{\"enable\":false,\"interval\":0},\"timeout\":\"{\\\"enable\\\":false,\\\"strategy\\\":\\\"\\\"}\","
-                + "\"type\":\"DATAX\","
-                + "\"workerGroup\":\"default\"}");
         taskInstance.setProcessInstancePriority(Priority.MEDIUM);
         taskInstance.setWorkerGroup("default");
         taskInstance.setExecutorId(2);
@@ -274,32 +228,6 @@ public class TaskPriorityQueueConsumerTest {
         taskInstance.setProcessDefinitionId(1);
         taskInstance.setProcessInstanceId(1);
         taskInstance.setState(ExecutionStatus.KILL);
-        
taskInstance.setTaskJson("{\"conditionResult\":\"{\\\"successNode\\\":[\\\"\\\"],\\\"failedNode\\\":[\\\"\\\"]}\",\"conditionsTask\":false,\"depList\":[],\"dependence\":\"{}\","
-                + "\"forbidden\":false,\"id\":\"tasks-63634\","
-                + "\"maxRetryTimes\":0,\"name\":\"MySQL数据导入HDSF\","
-                + "\"params\":\"{\\\"sourceType\\\":\\\"MYSQL\\\","
-                + "    \\\"targetType\\\":\\\"HDFS\\\","
-                + "    
\\\"targetParams\\\":\\\"{\\\\\\\"targetPath\\\\\\\":\\\\\\\"/test/datatest\\\\\\\","
-                + "        
\\\\\\\"deleteTargetDir\\\\\\\":true,\\\\\\\"fileType\\\\\\\":\\\\\\\"--as-textfile\\\\\\\","
-                + "        \\\\\\\"compressionCodec\\\\\\\":\\\\\\\"\\\\\\\","
-                + "        \\\\\\\"fieldsTerminated\\\\\\\":\\\\\\\",\\\\\\\","
-                + "        
\\\\\\\"linesTerminated\\\\\\\":\\\\\\\"\\\\\\\\\\\\\\\\n\\\\\\\"}\\\","
-                + "    \\\"modelType\\\":\\\"import\\\","
-                + "    
\\\"sourceParams\\\":\\\"{\\\\\\\"srcType\\\\\\\":\\\\\\\"MYSQL\\\\\\\","
-                + "        
\\\\\\\"srcDatasource\\\\\\\":1,\\\\\\\"srcTable\\\\\\\":\\\\\\\"t_ds_user\\\\\\\","
-                + "        \\\\\\\"srcQueryType\\\\\\\":\\\\\\\"0\\\\\\\","
-                + "        \\\\\\\"srcQuerySql\\\\\\\":\\\\\\\"\\\\\\\","
-                + "        \\\\\\\"srcColumnType\\\\\\\":\\\\\\\"0\\\\\\\","
-                + "        \\\\\\\"srcColumns\\\\\\\":\\\\\\\"\\\\\\\","
-                + "        
\\\\\\\"srcConditionList\\\\\\\":[],\\\\\\\"mapColumnHive\\\\\\\":[],\\\\\\\"mapColumnJava\\\\\\\":[]}\\\","
-                + "    \\\"localParams\\\":[],\\\"concurrency\\\":1}\","
-                + "\"preTasks\":\"[]\","
-                + "\"retryInterval\":1,"
-                + "\"runFlag\":\"NORMAL\","
-                + "\"taskInstancePriority\":\"MEDIUM\","
-                + 
"\"taskTimeoutParameter\":{\"enable\":false,\"interval\":0},\"timeout\":\"{\\\"enable\\\":false,\\\"strategy\\\":\\\"\\\"}\","
-                + "\"type\":\"SQOOP\","
-                + "\"workerGroup\":\"default\"}");
         taskInstance.setProcessInstancePriority(Priority.MEDIUM);
         taskInstance.setWorkerGroup("default");
         taskInstance.setExecutorId(2);
@@ -342,16 +270,6 @@ public class TaskPriorityQueueConsumerTest {
         taskInstance.setProcessDefinitionId(1);
         taskInstance.setProcessInstanceId(1);
         taskInstance.setState(ExecutionStatus.KILL);
-        
taskInstance.setTaskJson("{\"conditionResult\":\"{\\\"successNode\\\":[\\\"\\\"],\\\"failedNode\\\":[\\\"\\\"]}\","
-                + 
"\"conditionsTask\":false,\"depList\":[],\"dependence\":\"{}\","
-                + "\"forbidden\":false,\"id\":\"tasks-55201\","
-                + "\"maxRetryTimes\":0,\"name\":\"测试任务\","
-                + "\"params\":\"{\\\"rawScript\\\":\\\"echo 
\\\\\\\"测试任务\\\\\\\"\\\",\\\"localParams\\\":[],\\\"resourceList\\\":[]}\",\"preTasks\":\"[]\","
-                + "\"retryInterval\":1,\"runFlag\":\"NORMAL\","
-                + "\"taskInstancePriority\":\"MEDIUM\","
-                + 
"\"taskTimeoutParameter\":{\"enable\":false,\"interval\":0},\"timeout\":\"{\\\"enable\\\":false,\\\"strategy\\\":\\\"\\\"}\","
-                + "\"type\":\"SHELL\","
-                + "\"workerGroup\":\"default\"}");
         taskInstance.setProcessInstancePriority(Priority.MEDIUM);
         taskInstance.setWorkerGroup("default");
         taskInstance.setExecutorId(2);
@@ -370,24 +288,6 @@ public class TaskPriorityQueueConsumerTest {
         taskInstance.setProcessDefinitionId(1);
         taskInstance.setProcessInstanceId(1);
         taskInstance.setState(ExecutionStatus.KILL);
-        
taskInstance.setTaskJson("{\"conditionResult\":\"{\\\"successNode\\\":[\\\"\\\"],\\\"failedNode\\\":[\\\"\\\"]}\","
-                + "\"conditionsTask\":false,"
-                + "\"depList\":[],"
-                + "\"dependence\":\"{}\","
-                + "\"forbidden\":false,"
-                + "\"id\":\"tasks-55201\","
-                + "\"maxRetryTimes\":0,"
-                + "\"name\":\"测试任务\","
-                + "\"params\":\"{\\\"rawScript\\\":\\\"echo 
\\\\\\\"测试任务\\\\\\\"\\\",\\\"localParams\\\":[],\\\"resourceList\\\":[]}\","
-                + "\"preTasks\":\"[]\","
-                + "\"retryInterval\":1,"
-                + "\"runFlag\":\"NORMAL\","
-                + "\"taskInstancePriority\":\"MEDIUM\","
-                + "\"taskTimeoutParameter\":{\"enable\":false,\"interval\":0},"
-                + "\"timeout\":\"{\\\"enable\\\":false,"
-                + "\\\"strategy\\\":\\\"\\\"}\","
-                + "\"type\":\"SHELL\","
-                + "\"workerGroup\":\"NoWorkGroup\"}");
         taskInstance.setProcessInstancePriority(Priority.MEDIUM);
         taskInstance.setWorkerGroup("NoWorkGroup");
         taskInstance.setExecutorId(2);
@@ -424,24 +324,6 @@ public class TaskPriorityQueueConsumerTest {
         taskInstance.setProcessDefinitionId(1);
         taskInstance.setProcessInstanceId(1);
         taskInstance.setState(ExecutionStatus.KILL);
-        
taskInstance.setTaskJson("{\"conditionResult\":\"{\\\"successNode\\\":[\\\"\\\"],\\\"failedNode\\\":[\\\"\\\"]}\","
-                + "\"conditionsTask\":false,"
-                + "\"depList\":[],"
-                + "\"dependence\":\"{}\","
-                + "\"forbidden\":false,"
-                + "\"id\":\"tasks-55201\","
-                + "\"maxRetryTimes\":0,"
-                + "\"name\":\"测试任务\","
-                + "\"params\":\"{\\\"rawScript\\\":\\\"echo 
\\\\\\\"测试任务\\\\\\\"\\\",\\\"localParams\\\":[],\\\"resourceList\\\":[]}\","
-                + "\"preTasks\":\"[]\","
-                + "\"retryInterval\":1,"
-                + "\"runFlag\":\"NORMAL\","
-                + "\"taskInstancePriority\":\"MEDIUM\","
-                + "\"taskTimeoutParameter\":{\"enable\":false,\"interval\":0},"
-                + "\"timeout\":\"{\\\"enable\\\":false,"
-                + "\\\"strategy\\\":\\\"\\\"}\","
-                + "\"type\":\"SHELL\","
-                + "\"workerGroup\":\"NoWorkGroup\"}");
         taskInstance.setProcessInstancePriority(Priority.MEDIUM);
         taskInstance.setWorkerGroup("NoWorkGroup");
         taskInstance.setExecutorId(2);
@@ -477,24 +359,6 @@ public class TaskPriorityQueueConsumerTest {
         taskInstance.setProcessDefinitionId(1);
         taskInstance.setProcessInstanceId(1);
         taskInstance.setState(ExecutionStatus.KILL);
-        
taskInstance.setTaskJson("{\"conditionResult\":\"{\\\"successNode\\\":[\\\"\\\"],\\\"failedNode\\\":[\\\"\\\"]}\","
-                + "\"conditionsTask\":false,"
-                + "\"depList\":[],"
-                + "\"dependence\":\"{}\","
-                + "\"forbidden\":false,"
-                + "\"id\":\"tasks-55201\","
-                + "\"maxRetryTimes\":0,"
-                + "\"name\":\"测试任务\","
-                + "\"params\":\"{\\\"rawScript\\\":\\\"echo 
\\\\\\\"测试任务\\\\\\\"\\\",\\\"localParams\\\":[],\\\"resourceList\\\":[]}\","
-                + "\"preTasks\":\"[]\","
-                + "\"retryInterval\":1,"
-                + "\"runFlag\":\"NORMAL\","
-                + "\"taskInstancePriority\":\"MEDIUM\","
-                + "\"taskTimeoutParameter\":{\"enable\":false,\"interval\":0},"
-                + "\"timeout\":\"{\\\"enable\\\":false,"
-                + "\\\"strategy\\\":\\\"\\\"}\","
-                + "\"type\":\"SHELL\","
-                + "\"workerGroup\":\"NoWorkGroup\"}");
         taskInstance.setProcessInstancePriority(Priority.MEDIUM);
         taskInstance.setWorkerGroup("NoWorkGroup");
         taskInstance.setExecutorId(2);
@@ -527,32 +391,12 @@ public class TaskPriorityQueueConsumerTest {
         taskInstance.setProcessDefinitionId(1);
         taskInstance.setProcessInstanceId(1);
         taskInstance.setState(ExecutionStatus.KILL);
-        
taskInstance.setTaskJson("{\"conditionResult\":\"{\\\"successNode\\\":[\\\"\\\"],\\\"failedNode\\\":[\\\"\\\"]}\","
-                + "\"conditionsTask\":false,"
-                + "\"depList\":[],"
-                + "\"dependence\":\"{}\","
-                + "\"forbidden\":false,"
-                + "\"id\":\"tasks-55201\","
-                + "\"maxRetryTimes\":0,"
-                + "\"name\":\"测试任务\","
-                + "\"params\":\"{\\\"rawScript\\\":\\\"echo 
\\\\\\\"测试任务\\\\\\\"\\\",\\\"localParams\\\":[],\\\"resourceList\\\":[{\\\"id\\\":123},{\\\"res\\\":\\\"/data/file\\\"}]}\","
-                + "\"preTasks\":\"[]\","
-                + "\"retryInterval\":1,"
-                + "\"runFlag\":\"NORMAL\","
-                + "\"taskInstancePriority\":\"MEDIUM\","
-                + "\"taskTimeoutParameter\":{\"enable\":false,\"interval\":0},"
-                + "\"timeout\":\"{\\\"enable\\\":false,"
-                + "\\\"strategy\\\":\\\"\\\"}\","
-                + "\"type\":\"SHELL\","
-                + "\"workerGroup\":\"NoWorkGroup\"}");
-
         taskInstance.setProcessInstancePriority(Priority.MEDIUM);
         taskInstance.setWorkerGroup("NoWorkGroup");
         taskInstance.setExecutorId(2);
         // task node
-        TaskNode taskNode = JSONUtils.parseObject(taskInstance.getTaskJson(), 
TaskNode.class);
 
-        Map<String, String>  map = 
taskPriorityQueueConsumer.getResourceFullNames(taskNode);
+        Map<String, String>  map = 
taskPriorityQueueConsumer.getResourceFullNames(taskInstance);
 
         List<Resource> resourcesList = new ArrayList<Resource>();
         Resource resource = new Resource();
@@ -598,15 +442,15 @@ public class TaskPriorityQueueConsumerTest {
     public void testSetDataxTaskRelation() throws Exception {
 
         DataxTaskExecutionContext dataxTaskExecutionContext = new 
DataxTaskExecutionContext();
-        TaskNode taskNode = new TaskNode();
-        taskNode.setParams("{\"dataSource\":1,\"dataTarget\":1}");
+        TaskInstance taskInstance = new TaskInstance();
+        taskInstance.setTaskParams("{\"dataSource\":1,\"dataTarget\":1}");
         DataSource dataSource = new DataSource();
         dataSource.setId(1);
         dataSource.setConnectionParams("");
         dataSource.setType(DbType.MYSQL);
         
Mockito.doReturn(dataSource).when(processService).findDataSourceById(1);
 
-        
taskPriorityQueueConsumer.setDataxTaskRelation(dataxTaskExecutionContext,taskNode);
+        
taskPriorityQueueConsumer.setDataxTaskRelation(dataxTaskExecutionContext, 
taskInstance);
 
         Assert.assertEquals(1,dataxTaskExecutionContext.getDataSourceId());
         Assert.assertEquals(1,dataxTaskExecutionContext.getDataTargetId());
@@ -620,24 +464,6 @@ public class TaskPriorityQueueConsumerTest {
         taskInstance.setProcessDefinitionId(1);
         taskInstance.setProcessInstanceId(1);
         taskInstance.setState(ExecutionStatus.KILL);
-        
taskInstance.setTaskJson("{\"conditionResult\":\"{\\\"successNode\\\":[\\\"\\\"],\\\"failedNode\\\":[\\\"\\\"]}\","
-                + "\"conditionsTask\":false,"
-                + "\"depList\":[],"
-                + "\"dependence\":\"{}\","
-                + "\"forbidden\":false,"
-                + "\"id\":\"tasks-55201\","
-                + "\"maxRetryTimes\":0,"
-                + "\"name\":\"测试任务\","
-                + "\"params\":\"{\\\"rawScript\\\":\\\"echo 
\\\\\\\"测试任务\\\\\\\"\\\",\\\"localParams\\\":[],\\\"resourceList\\\":[]}\","
-                + "\"preTasks\":\"[]\","
-                + "\"retryInterval\":1,"
-                + "\"runFlag\":\"NORMAL\","
-                + "\"taskInstancePriority\":\"MEDIUM\","
-                + "\"taskTimeoutParameter\":{\"enable\":false,\"interval\":0},"
-                + "\"timeout\":\"{\\\"enable\\\":false,"
-                + "\\\"strategy\\\":\\\"\\\"}\","
-                + "\"type\":\"SHELL\","
-                + "\"workerGroup\":\"NoWorkGroup\"}");
         taskInstance.setProcessInstancePriority(Priority.MEDIUM);
         taskInstance.setWorkerGroup("NoWorkGroup");
         taskInstance.setExecutorId(2);
diff --git 
a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/runner/MasterTaskExecThreadTest.java
 
b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/runner/MasterTaskExecThreadTest.java
index 405ad43..9229e59 100644
--- 
a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/runner/MasterTaskExecThreadTest.java
+++ 
b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/runner/MasterTaskExecThreadTest.java
@@ -114,7 +114,6 @@ public class MasterTaskExecThreadTest {
         taskInstance.setTaskType("SHELL");
         taskInstance.setId(252612);
         taskInstance.setName("C");
-        taskInstance.setTaskJson("{}");
         taskInstance.setProcessInstanceId(10111);
         taskInstance.setState(ExecutionStatus.SUBMITTED_SUCCESS);
         return taskInstance;
diff --git 
a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/shell/ShellCommandExecutorTest.java
 
b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/shell/ShellCommandExecutorTest.java
index ac91e1d..e224fa8 100644
--- 
a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/shell/ShellCommandExecutorTest.java
+++ 
b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/shell/ShellCommandExecutorTest.java
@@ -87,7 +87,6 @@ public class ShellCommandExecutorTest {
 
         TaskInstance taskInstance = processService.findTaskInstanceById(7657);
 
-        String taskJson = taskInstance.getTaskJson();
 //        TaskNode taskNode = JSON.parseObject(taskJson, TaskNode.class);
 //        taskProps.setTaskParams(taskNode.getParams());
 
diff --git 
a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/sql/SqlExecutorTest.java
 
b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/sql/SqlExecutorTest.java
index dbaa132..caf1551 100644
--- 
a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/sql/SqlExecutorTest.java
+++ 
b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/sql/SqlExecutorTest.java
@@ -110,9 +110,7 @@ public class SqlExecutorTest {
 
         TaskInstance taskInstance = 
processService.findTaskInstanceById(taskInstId);
 
-        String taskJson = taskInstance.getTaskJson();
-        TaskNode taskNode = JSONUtils.parseObject(taskJson, TaskNode.class);
-        taskProps.setTaskParams(taskNode.getParams());
+        taskProps.setTaskParams(taskInstance.getTaskParams());
 
 
         // custom logger
diff --git 
a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
 
b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
index bccd926..2c6349c 100644
--- 
a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
+++ 
b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
@@ -110,6 +110,8 @@ import 
org.apache.dolphinscheduler.service.exceptions.ServiceException;
 import org.apache.dolphinscheduler.service.log.LogClientService;
 import org.apache.dolphinscheduler.service.quartz.cron.CronUtils;
 
+import org.apache.commons.collections.map.HashedMap;
+
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Calendar;
@@ -1160,9 +1162,12 @@ public class ProcessService {
                                            ProcessInstanceMap instanceMap,
                                            TaskInstance task) {
         CommandType commandType = getSubCommandType(parentProcessInstance, 
childInstance);
-        TaskNode taskNode = JSONUtils.parseObject(task.getTaskJson(), 
TaskNode.class);
-        Map<String, Object> subProcessParam = 
JSONUtils.toMap(taskNode.getParams(), String.class, Object.class);
-        Integer childDefineId = 
Integer.parseInt(String.valueOf(subProcessParam.get(Constants.CMD_PARAM_SUB_PROCESS_DEFINE_ID)));
+        TaskDefinition taskDefinition = 
taskDefinitionLogMapper.queryByDefinitionCodeAndVersion(
+                task.getTaskCode(), task.getTaskDefinitionVersion()
+        );
+        Map<String, String> subProcessParam = 
JSONUtils.toMap(taskDefinition.getTaskParams());
+        Integer childDefineId = 
Integer.parseInt(subProcessParam.get(Constants.CMD_PARAM_SUB_PROCESS_DEFINE_ID));
+
         Object localParams = subProcessParam.get(Constants.LOCAL_PARAMS);
         List<Property> allParam = 
JSONUtils.toList(JSONUtils.toJsonString(localParams), Property.class);
         Map<String, String> globalMap = 
this.getGlobalParamMap(parentProcessInstance.getGlobalParams());
@@ -1663,8 +1668,7 @@ public class ProcessService {
         if (row == null || row.size() == 0) {
             return;
         }
-        TaskNode taskNode = JSONUtils.parseObject(taskInstance.getTaskJson(), 
TaskNode.class);
-        Map<String, Object> taskParams = JSONUtils.toMap(taskNode.getParams(), 
String.class, Object.class);
+        Map<String, Object> taskParams = 
JSONUtils.toMap(taskInstance.getTaskParams(), String.class, Object.class);
         Object localParams = taskParams.get(LOCAL_PARAMS);
         if (localParams == null) {
             return;
@@ -1689,9 +1693,7 @@ public class ProcessService {
             }
         }
         taskParams.put(LOCAL_PARAMS, allParam);
-        taskNode.setParams(JSONUtils.toJsonString(taskParams));
-        // task instance node json
-        taskInstance.setTaskJson(JSONUtils.toJsonString(taskNode));
+        taskInstance.setTaskParams(JSONUtils.toJsonString(taskParams));
         String params4ProcessString = JSONUtils.toJsonString(params4Property);
         int updateCount = 
this.processInstanceMapper.updateGlobalParamsById(params4ProcessString, 
processInstance.getId());
         logger.info("updateCount:{}, params4Process:{}, processInstanceId:{}", 
updateCount, params4ProcessString, processInstance.getId());
@@ -2392,7 +2394,7 @@ public class ProcessService {
                             taskDefinitionMap.get(preTaskName).getVersion(),
                             
taskDefinitionMap.get(taskNode.getName()).getCode(),
                             
taskDefinitionMap.get(taskNode.getName()).getVersion(),
-                            ConditionType.of("none"),
+                            ConditionType.NONE,
                             taskNode.getConditionResult(),
                             now,
                             now));
@@ -2542,6 +2544,77 @@ public class ProcessService {
     }
 
     /**
+     * getTaskNodeFromTaskInstance
+     * return null if task definition do not exists
+     *
+     * @param taskInstance
+     * @return
+     */
+    public TaskNode getTaskNodeFromTaskInstance(TaskInstance taskInstance) {
+        TaskNode taskNode = new TaskNode();
+        ProcessInstance processInstance = 
processInstanceMapper.selectById(taskInstance.getProcessInstanceId());
+        TaskDefinition taskDefinition = 
taskDefinitionLogMapper.queryByDefinitionCodeAndVersion(
+                taskInstance.getTaskCode(),
+                taskInstance.getTaskDefinitionVersion());
+        if (taskDefinition == null) {
+            return null;
+        }
+        List<ProcessTaskRelationLog> taskRelationList = 
processTaskRelationLogMapper.queryByProcessCodeAndVersion(
+                taskInstance.getProcessDefinitionCode(), 
processInstance.getProcessDefinitionVersion()
+        );
+        Map<Long, Integer> taskCodeMap = new HashedMap();
+
+        taskRelationList.forEach(relation -> 
taskCodeMap.putIfAbsent(relation.getPostTaskCode(), 
relation.getPostTaskVersion()));
+
+        taskNode.setCode(String.valueOf(taskDefinition.getCode()));
+        taskNode.setVersion(taskDefinition.getVersion());
+        taskNode.setName(taskDefinition.getName());
+        taskNode.setName(taskDefinition.getName());
+        taskNode.setDesc(taskDefinition.getDescription());
+        taskNode.setType(taskDefinition.getTaskType().getDescp());
+        taskNode.setRunFlag(taskDefinition.getFlag() == Flag.YES ? 
Constants.FLOWNODE_RUN_FLAG_FORBIDDEN : Constants.FLOWNODE_RUN_FLAG_NORMAL);
+        taskNode.setMaxRetryTimes(taskDefinition.getFailRetryTimes());
+        taskNode.setRetryInterval(taskDefinition.getFailRetryInterval());
+        taskNode.setParams(taskDefinition.getTaskParams());
+        taskNode.setTaskInstancePriority(taskDefinition.getTaskPriority());
+        taskNode.setWorkerGroup(taskDefinition.getWorkerGroup());
+        return taskNode;
+    }
+
+    /**
+     * find task definition by code and verision
+     *
+     * @param taskCode
+     * @param taskDefinitionVersion
+     * @return
+     */
+    public TaskDefinition findTaskDefinition(long taskCode, int 
taskDefinitionVersion) {
+        return 
taskDefinitionLogMapper.queryByDefinitionCodeAndVersion(taskCode, 
taskDefinitionVersion);
+    }
+
+    /**
+     * query taks definition list by process code and process version
+     *
+     * @param processCode
+     * @param processVersion
+     * @return
+     */
+    public List<TaskDefinitionLog> queryTaskDefinitionList(Long processCode, 
int processVersion) {
+        List<ProcessTaskRelationLog> processTaskRelationLogs =
+                
processTaskRelationLogMapper.queryByProcessCodeAndVersion(processCode, 
processVersion);
+        Map<Long, TaskDefinition> postTaskDefinitionMap = new HashedMap();
+        processTaskRelationLogs.forEach(processTaskRelationLog -> {
+            Long code = processTaskRelationLog.getPostTaskCode();
+            int version = processTaskRelationLog.getPostTaskVersion();
+            if (postTaskDefinitionMap.containsKey(code)) {
+                TaskDefinition taskDefinition = 
taskDefinitionLogMapper.queryByDefinitionCodeAndVersion(code, version);
+                postTaskDefinitionMap.putIfAbsent(code, taskDefinition);
+            }
+        });
+        return new ArrayList(postTaskDefinitionMap.values());
+    }
+
+    /**
      * parse locations
      *
      * @param locations processDefinition locations
@@ -2565,7 +2638,7 @@ public class ProcessService {
      * add authorized resources
      *
      * @param ownResources own resources
-     * @param userId userId
+     * @param userId       userId
      */
     private void addAuthorizedResources(List<Resource> ownResources, int 
userId) {
         List<Integer> relationResourceIds = 
resourceUserMapper.queryResourcesIdListByUserIdAndPerm(userId, 7);
diff --git 
a/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/process/ProcessServiceTest.java
 
b/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/process/ProcessServiceTest.java
index 1593374..20ba75c 100644
--- 
a/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/process/ProcessServiceTest.java
+++ 
b/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/process/ProcessServiceTest.java
@@ -114,7 +114,7 @@ public class ProcessServiceTest {
         parentInstance.setWarningGroupId(0);
 
         TaskInstance task = new TaskInstance();
-        task.setTaskJson("{\"params\":{\"processDefinitionId\":100}}");
+        task.setTaskParams("{\"processDefinitionId\":100}}");
         task.setId(10);
 
         ProcessInstance childInstance = null;
diff --git a/sql/dolphinscheduler_mysql.sql b/sql/dolphinscheduler_mysql.sql
index e081590..ded1038 100644
--- a/sql/dolphinscheduler_mysql.sql
+++ b/sql/dolphinscheduler_mysql.sql
@@ -830,6 +830,7 @@ CREATE TABLE `t_ds_task_instance` (
   `retry_times` int(4) DEFAULT '0' COMMENT 'task retry times',
   `pid` int(4) DEFAULT NULL COMMENT 'pid of task',
   `app_link` text COMMENT 'yarn app id',
+  `task_params` text COMMENT 'job custom parameters',
   `flag` tinyint(4) DEFAULT '1' COMMENT '0 not available, 1 available',
   `retry_interval` int(4) DEFAULT NULL COMMENT 'retry interval when task 
failed ',
   `max_retry_times` int(2) DEFAULT NULL COMMENT 'max retry times',
diff --git a/sql/dolphinscheduler_postgre.sql b/sql/dolphinscheduler_postgre.sql
index 2b381a4..5d0c214 100644
--- a/sql/dolphinscheduler_postgre.sql
+++ b/sql/dolphinscheduler_postgre.sql
@@ -689,6 +689,7 @@ CREATE TABLE t_ds_task_instance (
   retry_times int DEFAULT '0' ,
   pid int DEFAULT NULL ,
   app_link text ,
+  task_params text ,
   flag int DEFAULT '1' ,
   retry_interval int DEFAULT NULL ,
   max_retry_times int DEFAULT NULL ,

Reply via email to