This is an automated email from the ASF dual-hosted git repository.

leonbao pushed a commit to branch json_split
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git


The following commit(s) were added to refs/heads/json_split by this push:
     new c355896  update taskParams/add task delayTime/fix conditionType bug 
(#5385)
c355896 is described below

commit c3558965c16d0cdad618c01646ef014569aed661
Author: JinyLeeChina <[email protected]>
AuthorDate: Mon Apr 26 11:28:35 2021 +0800

    update taskParams/add task delayTime/fix conditionType bug (#5385)
    
    Co-authored-by: JinyLeeChina <[email protected]>
---
 .../apache/dolphinscheduler/common/Constants.java  |  2 ++
 .../dolphinscheduler/common/model/TaskNode.java    | 20 ++++++++++++
 .../common/utils/VarPoolUtils.java                 |  3 +-
 .../dao/entity/TaskDefinition.java                 | 14 ++++++++
 .../dao/entity/TaskDefinitionLog.java              |  1 +
 .../dolphinscheduler/dao/entity/TaskInstance.java  |  4 ++-
 .../dao/mapper/TaskDefinitionLogMapper.xml         |  6 ++--
 .../dao/mapper/TaskDefinitionMapper.xml            |  6 ++--
 .../master/runner/ConditionsTaskExecThread.java    | 37 +++++++++-------------
 .../server/master/runner/MasterExecThread.java     | 17 ++++------
 .../service/process/ProcessService.java            | 35 ++++++++++----------
 sql/dolphinscheduler_mysql.sql                     |  2 ++
 sql/dolphinscheduler_postgre.sql                   |  8 ++---
 13 files changed, 93 insertions(+), 62 deletions(-)

diff --git 
a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java
 
b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java
index db3cd14..148df29 100644
--- 
a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java
+++ 
b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java
@@ -789,6 +789,8 @@ public final class Constants {
     public static final String SUBPROCESS_INSTANCE_ID = "subProcessInstanceId";
     public static final String PROCESS_INSTANCE_STATE = "processInstanceState";
     public static final String PARENT_WORKFLOW_INSTANCE = 
"parentWorkflowInstance";
+    public static final String CONDITION_RESULT = "conditionResult";
+    public static final String DEPENDENCE = "dependence";
     public static final String TASK_TYPE = "taskType";
     public static final String TASK_LIST = "taskList";
     public static final String RWXR_XR_X = "rwxr-xr-x";
diff --git 
a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/model/TaskNode.java
 
b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/model/TaskNode.java
index 89efe2e..b9c5a28 100644
--- 
a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/model/TaskNode.java
+++ 
b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/model/TaskNode.java
@@ -25,7 +25,9 @@ import 
org.apache.dolphinscheduler.common.utils.CollectionUtils;
 import org.apache.dolphinscheduler.common.utils.JSONUtils;
 import org.apache.dolphinscheduler.common.utils.StringUtils;
 
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 import java.util.Objects;
 
 import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
@@ -371,6 +373,24 @@ public class TaskNode {
         this.preTaskNodeList = preTaskNodeList;
     }
 
+    public String getTaskParams() {
+        Map<String, Object> taskParams = JSONUtils.toMap(this.params, 
String.class, Object.class);
+        if (taskParams == null) {
+            taskParams = new HashMap<>();
+        }
+        taskParams.put(Constants.CONDITION_RESULT, this.conditionResult);
+        taskParams.put(Constants.DEPENDENCE, this.dependence);
+        return JSONUtils.toJsonString(taskParams);
+    }
+
+    public Map<String, Object> taskParamsToJsonObj(String taskParams) {
+        Map<String, Object> taskParamsMap = JSONUtils.toMap(taskParams, 
String.class, Object.class);
+        if (taskParamsMap == null) {
+            taskParamsMap = new HashMap<>();
+        }
+        return taskParamsMap;
+    }
+
     @Override
     public String toString() {
         return "TaskNode{"
diff --git 
a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/VarPoolUtils.java
 
b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/VarPoolUtils.java
index 3d4f65a..cd300e3 100644
--- 
a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/VarPoolUtils.java
+++ 
b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/VarPoolUtils.java
@@ -39,8 +39,7 @@ public class VarPoolUtils {
      */
     public static void setTaskNodeLocalParams(TaskNode taskNode, Map<String, 
Object> propToValue) {
         String taskParamsJson = taskNode.getParams();
-        Map<String,Object> taskParams = JSONUtils.parseObject(taskParamsJson, 
HashMap.class);
-
+        Map<String,Object> taskParams = JSONUtils.toMap(taskParamsJson, 
String.class, Object.class);
         Object localParamsObject = taskParams.get(LOCALPARAMS);
         if (null != localParamsObject && null != propToValue && 
propToValue.size() > 0) {
             ArrayList<Object> localParams = (ArrayList)localParamsObject;
diff --git 
a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskDefinition.java
 
b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskDefinition.java
index e757ea3..f4ef1a9 100644
--- 
a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskDefinition.java
+++ 
b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskDefinition.java
@@ -153,6 +153,11 @@ public class TaskDefinition {
     private int timeout;
 
     /**
+     * delay execution time.
+     */
+    private int delayTime;
+
+    /**
      * resource ids
      */
     private String resourceIds;
@@ -381,6 +386,14 @@ public class TaskDefinition {
         this.resourceIds = resourceIds;
     }
 
+    public int getDelayTime() {
+        return delayTime;
+    }
+
+    public void setDelayTime(int delayTime) {
+        this.delayTime = delayTime;
+    }
+
     @Override
     public String toString() {
         return "TaskDefinition{"
@@ -405,6 +418,7 @@ public class TaskDefinition {
                 + ", timeoutFlag=" + timeoutFlag
                 + ", timeoutNotifyStrategy=" + timeoutNotifyStrategy
                 + ", timeout=" + timeout
+                + ", delayTime=" + delayTime
                 + ", resourceIds='" + resourceIds + '\''
                 + ", createTime=" + createTime
                 + ", updateTime=" + updateTime
diff --git 
a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskDefinitionLog.java
 
b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskDefinitionLog.java
index dfade53..96851cc 100644
--- 
a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskDefinitionLog.java
+++ 
b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskDefinitionLog.java
@@ -63,6 +63,7 @@ public class TaskDefinitionLog extends TaskDefinition {
         
this.setTimeoutNotifyStrategy(taskDefinition.getTimeoutNotifyStrategy());
         this.setTaskType(taskDefinition.getTaskType());
         this.setTimeout(taskDefinition.getTimeout());
+        this.setDelayTime(taskDefinition.getDelayTime());
         this.setTimeoutFlag(taskDefinition.getTimeoutFlag());
         this.setUpdateTime(taskDefinition.getUpdateTime());
         this.setCreateTime(taskDefinition.getCreateTime());
diff --git 
a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskInstance.java
 
b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskInstance.java
index b417d2f..3733c6d 100644
--- 
a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskInstance.java
+++ 
b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskInstance.java
@@ -17,6 +17,7 @@
 
 package org.apache.dolphinscheduler.dao.entity;
 
+import org.apache.dolphinscheduler.common.Constants;
 import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
 import org.apache.dolphinscheduler.common.enums.Flag;
 import org.apache.dolphinscheduler.common.enums.Priority;
@@ -415,7 +416,8 @@ public class TaskInstance implements Serializable {
 
     public DependentParameters getDependency() {
         if (this.dependency == null) {
-            this.dependency = JSONUtils.parseObject(this.getTaskParams(), 
DependentParameters.class);
+            Map<String, Object> taskParamsMap = 
JSONUtils.toMap(this.getTaskParams(), String.class, Object.class);
+            this.dependency = JSONUtils.parseObject((String) 
taskParamsMap.get(Constants.DEPENDENCE), DependentParameters.class);
         }
         return this.dependency;
     }
diff --git 
a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/TaskDefinitionLogMapper.xml
 
b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/TaskDefinitionLogMapper.xml
index 673a266..7f31843 100644
--- 
a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/TaskDefinitionLogMapper.xml
+++ 
b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/TaskDefinitionLogMapper.xml
@@ -20,13 +20,13 @@
 <mapper 
namespace="org.apache.dolphinscheduler.dao.mapper.TaskDefinitionLogMapper">
     <sql id="baseSql">
         id, code, `name`, version, description, project_code, user_id, 
task_type, task_params, flag, task_priority,
-        worker_group, fail_retry_times, fail_retry_interval, timeout_flag, 
timeout_notify_strategy, timeout,
+        worker_group, fail_retry_times, fail_retry_interval, timeout_flag, 
timeout_notify_strategy, timeout, delay_time,
         resource_ids, operator, operate_time, create_time, update_time
     </sql>
     <select id="queryByDefinitionName" 
resultType="org.apache.dolphinscheduler.dao.entity.TaskDefinitionLog">
         select td.id, td.code, td.name, td.version, td.description, 
td.project_code, td.user_id, td.task_type, td.task_params,
-        td.flag, td.task_priority, td.worker_group, td.fail_retry_times, 
td.fail_retry_interval, td.timeout_flag,
-        td.timeout_notify_strategy, td.timeout, td.resource_ids, 
td.operator,td.operate_time, td.create_time, td.update_time,
+        td.flag, td.task_priority, td.worker_group, td.fail_retry_times, 
td.fail_retry_interval, td.timeout_flag, td.timeout_notify_strategy,
+        td.timeout, td.delay_time, td.resource_ids, 
td.operator,td.operate_time, td.create_time, td.update_time,
         u.user_name,p.name as project_name
         from t_ds_task_definition_log td
         JOIN t_ds_user u ON td.user_id = u.id
diff --git 
a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/TaskDefinitionMapper.xml
 
b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/TaskDefinitionMapper.xml
index 2556b37..89801ef 100644
--- 
a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/TaskDefinitionMapper.xml
+++ 
b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/TaskDefinitionMapper.xml
@@ -20,7 +20,7 @@
 <mapper 
namespace="org.apache.dolphinscheduler.dao.mapper.TaskDefinitionMapper">
     <sql id="baseSql">
         id, code, `name`, version, description, project_code, user_id, 
task_type, task_params, flag, task_priority,
-        worker_group, fail_retry_times, fail_retry_interval, timeout_flag, 
timeout_notify_strategy, timeout,
+        worker_group, fail_retry_times, fail_retry_interval, timeout_flag, 
timeout_notify_strategy, timeout, delay_time,
         resource_ids, create_time, update_time
     </sql>
     <select id="queryByDefinitionName" 
resultType="org.apache.dolphinscheduler.dao.entity.TaskDefinition">
@@ -63,8 +63,8 @@
     </select>
     <select id="queryByDefinitionId" 
resultType="org.apache.dolphinscheduler.dao.entity.TaskDefinition">
         select td.id, td.code, td.name, td.version, td.description, 
td.project_code, td.user_id, td.task_type, td.task_params,
-        td.flag, td.task_priority, td.worker_group, td.fail_retry_times, 
td.fail_retry_interval, td.timeout_flag,
-        td.timeout_notify_strategy, td.timeout, td.resource_ids, 
td.create_time, td.update_time, u.user_name,p.name as project_name
+        td.flag, td.task_priority, td.worker_group, td.fail_retry_times, 
td.fail_retry_interval, td.timeout_flag, td.timeout_notify_strategy,
+        td.timeout, td.delay_time, td.resource_ids, td.create_time, 
td.update_time, u.user_name,p.name as project_name
         from t_ds_task_definition td
         JOIN t_ds_user u ON td.user_id = u.id
         JOIN t_ds_project p ON td.project_code = p.code
diff --git 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/ConditionsTaskExecThread.java
 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/ConditionsTaskExecThread.java
index 016ec7d..5fa9fc1 100644
--- 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/ConditionsTaskExecThread.java
+++ 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/ConditionsTaskExecThread.java
@@ -56,7 +56,7 @@ public class ConditionsTaskExecThread extends 
MasterBaseTaskExecThread {
     /**
      * constructor of MasterBaseTaskExecThread
      *
-     * @param taskInstance    task instance
+     * @param taskInstance task instance
      */
     public ConditionsTaskExecThread(TaskInstance taskInstance) {
         super(taskInstance);
@@ -65,7 +65,7 @@ public class ConditionsTaskExecThread extends 
MasterBaseTaskExecThread {
 
     @Override
     public Boolean submitWaitComplete() {
-        try{
+        try {
             this.taskInstance = submit();
             logger = 
LoggerFactory.getLogger(LoggerUtils.buildTaskId(LoggerUtils.TASK_LOGGER_INFO_PREFIX,
                     processInstance.getProcessDefinitionCode(),
@@ -78,33 +78,28 @@ public class ConditionsTaskExecThread extends 
MasterBaseTaskExecThread {
             logger.info("dependent task start");
             waitTaskQuit();
             updateTaskState();
-        }catch (Exception e){
-            logger.error("conditions task run exception" , e);
+        } catch (Exception e) {
+            logger.error("conditions task run exception", e);
         }
         return true;
     }
 
     private void waitTaskQuit() {
-        List<TaskInstance> taskInstances = 
processService.findValidTaskListByProcessId(
-                taskInstance.getProcessInstanceId()
-        );
-        for(TaskInstance task : taskInstances){
+        List<TaskInstance> taskInstances = 
processService.findValidTaskListByProcessId(taskInstance.getProcessInstanceId());
+        for (TaskInstance task : taskInstances) {
             completeTaskList.putIfAbsent(task.getName(), task.getState());
         }
 
         List<DependResult> modelResultList = new ArrayList<>();
-        for(DependentTaskModel dependentTaskModel : 
dependentParameters.getDependTaskList()){
-
+        for (DependentTaskModel dependentTaskModel : 
dependentParameters.getDependTaskList()) {
             List<DependResult> itemDependResult = new ArrayList<>();
-            for(DependentItem item : dependentTaskModel.getDependItemList()){
+            for (DependentItem item : dependentTaskModel.getDependItemList()) {
                 itemDependResult.add(getDependResultForItem(item));
             }
             DependResult modelResult = 
DependentUtils.getDependResultForRelation(dependentTaskModel.getRelation(), 
itemDependResult);
             modelResultList.add(modelResult);
         }
-        conditionResult = DependentUtils.getDependResultForRelation(
-                dependentParameters.getRelation(), modelResultList
-        );
+        conditionResult = 
DependentUtils.getDependResultForRelation(dependentParameters.getRelation(), 
modelResultList);
         logger.info("the conditions task depend result : {}", conditionResult);
     }
 
@@ -113,9 +108,9 @@ public class ConditionsTaskExecThread extends 
MasterBaseTaskExecThread {
      */
     private void updateTaskState() {
         ExecutionStatus status;
-        if(this.cancel){
+        if (this.cancel) {
             status = ExecutionStatus.KILL;
-        }else{
+        } else {
             status = (conditionResult == DependResult.SUCCESS) ? 
ExecutionStatus.SUCCESS : ExecutionStatus.FAILURE;
         }
         taskInstance.setState(status);
@@ -137,20 +132,18 @@ public class ConditionsTaskExecThread extends 
MasterBaseTaskExecThread {
 
     /**
      * depend result for depend item
-     * @param item
-     * @return
      */
-    private DependResult getDependResultForItem(DependentItem item){
+    private DependResult getDependResultForItem(DependentItem item) {
 
         DependResult dependResult = DependResult.SUCCESS;
-        if(!completeTaskList.containsKey(item.getDepTasks())){
+        if (!completeTaskList.containsKey(item.getDepTasks())) {
             logger.info("depend item: {} have not completed yet.", 
item.getDepTasks());
             dependResult = DependResult.FAILED;
             return dependResult;
         }
         ExecutionStatus executionStatus = 
completeTaskList.get(item.getDepTasks());
-        if(executionStatus != item.getStatus()){
-            logger.info("depend item : {} expect status: {}, actual status: 
{}" ,item.getDepTasks(), item.getStatus(), executionStatus);
+        if (executionStatus != item.getStatus()) {
+            logger.info("depend item : {} expect status: {}, actual status: 
{}", item.getDepTasks(), item.getStatus(), executionStatus);
             dependResult = DependResult.FAILED;
         }
         logger.info("dependent item complete {} {},{}",
diff --git 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterExecThread.java
 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterExecThread.java
index a0c560a..20d168d 100644
--- 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterExecThread.java
+++ 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterExecThread.java
@@ -34,7 +34,6 @@ import 
org.apache.dolphinscheduler.common.enums.FailureStrategy;
 import org.apache.dolphinscheduler.common.enums.Flag;
 import org.apache.dolphinscheduler.common.enums.Priority;
 import org.apache.dolphinscheduler.common.enums.TaskDependType;
-import org.apache.dolphinscheduler.common.enums.TaskType;
 import org.apache.dolphinscheduler.common.graph.DAG;
 import org.apache.dolphinscheduler.common.model.TaskNode;
 import org.apache.dolphinscheduler.common.model.TaskNodeRelation;
@@ -175,7 +174,7 @@ public class MasterExecThread implements Runnable {
      *
      * @param parentNodeName parent node name
      */
-    private Map<String, Object> propToValue = new ConcurrentHashMap<String, 
Object>();
+    private Map<String, Object> propToValue = new ConcurrentHashMap<>();
 
     /**
      * constructor of MasterExecThread
@@ -519,11 +518,7 @@ public class MasterExecThread implements Runnable {
             } else {
                 taskInstance.setWorkerGroup(taskWorkerGroup);
             }
-            if 
(TaskType.DEPENDENT.getDesc().equalsIgnoreCase(taskNode.getType())) {
-                taskInstance.setTaskParams(taskNode.getDependence());
-            } else {
-                
taskInstance.setTaskParams(globalParamToTaskParams(taskNode.getParams()));
-            }
+            
taskInstance.setTaskParams(globalParamToTaskParams(taskNode.getTaskParams()));
             // delay execution time
             taskInstance.setDelayTime(taskNode.getDelayTime());
         }
@@ -545,8 +540,11 @@ public class MasterExecThread implements Runnable {
         if (localParams != null) {
             List<Property> allParam = 
JSONUtils.toList(JSONUtils.toJsonString(localParams), Property.class);
             for (Property info : allParam) {
+                String paramName = info.getProp();
+                if (StringUtils.isNotEmpty(paramName) && 
propToValue.containsKey(paramName)) {
+                    info.setValue((String) propToValue.get(paramName));
+                }
                 if (info.getDirect().equals(Direct.IN)) {
-                    String paramName = info.getProp();
                     String value = globalMap.get(paramName);
                     if (StringUtils.isNotEmpty(value)) {
                         info.setValue(value);
@@ -569,9 +567,6 @@ public class MasterExecThread implements Runnable {
                 throw new RuntimeException();
             }
             TaskNode taskNodeObject = dag.getNode(taskNode);
-            if 
(!TaskType.DEPENDENT.getDesc().equalsIgnoreCase(taskNodeObject.getType())) {
-                VarPoolUtils.setTaskNodeLocalParams(taskNodeObject, 
propToValue);
-            }
             taskInstances.add(createTaskInstance(processInstance, 
taskNodeObject));
         }
 
diff --git 
a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
 
b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
index 7868348..f4f326a 100644
--- 
a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
+++ 
b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
@@ -42,7 +42,6 @@ import org.apache.dolphinscheduler.common.enums.Flag;
 import org.apache.dolphinscheduler.common.enums.ReleaseState;
 import org.apache.dolphinscheduler.common.enums.ResourceType;
 import org.apache.dolphinscheduler.common.enums.TaskDependType;
-import org.apache.dolphinscheduler.common.enums.TaskType;
 import org.apache.dolphinscheduler.common.enums.TimeoutFlag;
 import org.apache.dolphinscheduler.common.enums.WarningType;
 import org.apache.dolphinscheduler.common.graph.DAG;
@@ -110,8 +109,6 @@ import 
org.apache.dolphinscheduler.service.exceptions.ServiceException;
 import org.apache.dolphinscheduler.service.log.LogClientService;
 import org.apache.dolphinscheduler.service.quartz.cron.CronUtils;
 
-import org.apache.commons.collections.map.HashedMap;
-
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Calendar;
@@ -2200,7 +2197,7 @@ public class ProcessService {
         taskDefinition.setName(taskNode.getName());
         taskDefinition.setDescription(taskNode.getDesc());
         taskDefinition.setTaskType(taskNode.getType().toUpperCase());
-        
taskDefinition.setTaskParams(TaskType.DEPENDENT.getDesc().equalsIgnoreCase(taskNode.getType())
 ? taskNode.getDependence() : taskNode.getParams());
+        taskDefinition.setTaskParams(taskNode.getTaskParams());
         taskDefinition.setFlag(taskNode.isForbidden() ? Flag.NO : Flag.YES);
         taskDefinition.setTaskPriority(taskNode.getTaskInstancePriority());
         taskDefinition.setWorkerGroup(taskNode.getWorkerGroup());
@@ -2209,6 +2206,7 @@ public class ProcessService {
         
taskDefinition.setTimeoutFlag(taskNode.getTaskTimeoutParameter().getEnable() ? 
TimeoutFlag.OPEN : TimeoutFlag.CLOSE);
         
taskDefinition.setTimeoutNotifyStrategy(taskNode.getTaskTimeoutParameter().getStrategy());
         
taskDefinition.setTimeout(taskNode.getTaskTimeoutParameter().getInterval());
+        taskDefinition.setDelayTime(taskNode.getDelayTime());
         taskDefinition.setResourceIds(getResourceIds(taskDefinition));
     }
 
@@ -2221,7 +2219,6 @@ public class ProcessService {
     public String getResourceIds(TaskDefinition taskDefinition) {
         Set<Integer> resourceIds = null;
         AbstractParameters params = 
TaskParametersUtils.getParameters(taskDefinition.getTaskType(), 
taskDefinition.getTaskParams());
-
         if (params != null && 
CollectionUtils.isNotEmpty(params.getResourceFilesList())) {
             resourceIds = params.getResourceFilesList().
                     stream()
@@ -2341,7 +2338,8 @@ public class ProcessService {
             List<String> depList = taskNode.getDepList();
             if (CollectionUtils.isNotEmpty(depList)) {
                 for (String preTaskName : depList) {
-                    builderRelationList.add(new ProcessTaskRelation("",
+                    builderRelationList.add(new ProcessTaskRelation(
+                            StringUtils.EMPTY,
                             processDefinition.getVersion(),
                             projectCode,
                             processDefinition.getCode(),
@@ -2350,12 +2348,13 @@ public class ProcessService {
                             
taskDefinitionMap.get(taskNode.getName()).getCode(),
                             
taskDefinitionMap.get(taskNode.getName()).getVersion(),
                             ConditionType.NONE,
-                            taskNode.getConditionResult(),
+                            StringUtils.EMPTY,
                             now,
                             now));
                 }
             } else {
-                builderRelationList.add(new ProcessTaskRelation("",
+                builderRelationList.add(new ProcessTaskRelation(
+                        StringUtils.EMPTY,
                         processDefinition.getVersion(),
                         projectCode,
                         processDefinition.getCode(),
@@ -2363,8 +2362,8 @@ public class ProcessService {
                         0,
                         taskDefinitionMap.get(taskNode.getName()).getCode(),
                         taskDefinitionMap.get(taskNode.getName()).getVersion(),
-                        ConditionType.of("none"),
-                        taskNode.getConditionResult(),
+                        ConditionType.NONE,
+                        StringUtils.EMPTY,
                         now,
                         now));
             }
@@ -2464,7 +2463,6 @@ public class ProcessService {
                     v = new TaskNode();
                     v.setCode(processTaskRelation.getPostTaskCode());
                     v.setVersion(processTaskRelation.getPostTaskVersion());
-                    
v.setConditionResult(processTaskRelation.getConditionParams());
                     List<PreviousTaskNode> preTaskNodeList = new ArrayList<>();
                     if (processTaskRelation.getPreTaskCode() > 0) {
                         preTaskNodeList.add(new 
PreviousTaskNode(processTaskRelation.getPreTaskCode(), "", 
processTaskRelation.getPreTaskVersion()));
@@ -2489,13 +2487,18 @@ public class ProcessService {
             v.setRunFlag(taskDefinitionLog.getFlag() == Flag.YES ? 
Constants.FLOWNODE_RUN_FLAG_NORMAL : Constants.FLOWNODE_RUN_FLAG_FORBIDDEN);
             v.setMaxRetryTimes(taskDefinitionLog.getFailRetryTimes());
             v.setRetryInterval(taskDefinitionLog.getFailRetryInterval());
-            
v.setParams(TaskType.DEPENDENT.getDesc().equalsIgnoreCase(taskDefinitionLog.getTaskType())
 ? null : taskDefinitionLog.getTaskParams());
-            
v.setDependence(TaskType.DEPENDENT.getDesc().equalsIgnoreCase(taskDefinitionLog.getTaskType())
 ? taskDefinitionLog.getTaskParams() : null);
+            Map<String, Object> taskParamsMap = 
v.taskParamsToJsonObj(taskDefinitionLog.getTaskParams());
+            v.setConditionResult((String) 
taskParamsMap.get(Constants.CONDITION_RESULT));
+            v.setDependence((String) taskParamsMap.get(Constants.DEPENDENCE));
+            taskParamsMap.remove(Constants.CONDITION_RESULT);
+            taskParamsMap.remove(Constants.DEPENDENCE);
+            v.setParams(JSONUtils.toJsonString(taskParamsMap));
             v.setTaskInstancePriority(taskDefinitionLog.getTaskPriority());
             v.setWorkerGroup(taskDefinitionLog.getWorkerGroup());
             v.setTimeout(JSONUtils.toJsonString(new 
TaskTimeoutParameter(taskDefinitionLog.getTimeoutFlag() == TimeoutFlag.OPEN,
                     taskDefinitionLog.getTimeoutNotifyStrategy(),
                     taskDefinitionLog.getTimeout())));
+            v.setDelayTime(taskDefinitionLog.getDelayTime());
             v.getPreTaskNodeList().forEach(task -> 
task.setName(taskDefinitionLogMap.get(task.getCode()).getName()));
             
v.setPreTasks(JSONUtils.toJsonString(v.getPreTaskNodeList().stream().map(PreviousTaskNode::getName).collect(Collectors.toList())));
         });
@@ -2503,7 +2506,7 @@ public class ProcessService {
     }
 
     /**
-     * find task definition by code and verision
+     * find task definition by code and version
      *
      * @param taskCode
      * @param taskDefinitionVersion
@@ -2514,7 +2517,7 @@ public class ProcessService {
     }
 
     /**
-     * query taks definition list by process code and process version
+     * query tasks definition list by process code and process version
      *
      * @param processCode
      * @param processVersion
@@ -2523,7 +2526,7 @@ public class ProcessService {
     public List<TaskDefinitionLog> queryTaskDefinitionList(Long processCode, 
int processVersion) {
         List<ProcessTaskRelationLog> processTaskRelationLogs =
                 
processTaskRelationLogMapper.queryByProcessCodeAndVersion(processCode, 
processVersion);
-        Map<Long, TaskDefinition> postTaskDefinitionMap = new HashedMap();
+        Map<Long, TaskDefinition> postTaskDefinitionMap = new HashMap<>();
         processTaskRelationLogs.forEach(processTaskRelationLog -> {
             Long code = processTaskRelationLog.getPostTaskCode();
             int version = processTaskRelationLog.getPostTaskVersion();
diff --git a/sql/dolphinscheduler_mysql.sql b/sql/dolphinscheduler_mysql.sql
index cc1d70a..57eeaca 100644
--- a/sql/dolphinscheduler_mysql.sql
+++ b/sql/dolphinscheduler_mysql.sql
@@ -465,6 +465,7 @@ CREATE TABLE `t_ds_task_definition` (
   `timeout_flag` tinyint(2) DEFAULT '0' COMMENT 'timeout flag:0 close, 1 open',
   `timeout_notify_strategy` tinyint(4) DEFAULT NULL COMMENT 'timeout 
notification policy: 0 warning, 1 fail',
   `timeout` int(11) DEFAULT '0' COMMENT 'timeout length,unit: minute',
+  `delay_time` int(11) DEFAULT '0' COMMENT 'delay execution time,unit: minute',
   `resource_ids` varchar(255) DEFAULT NULL COMMENT 'resource id, separated by 
comma',
   `create_time` datetime NOT NULL COMMENT 'create time',
   `update_time` datetime DEFAULT NULL COMMENT 'update time',
@@ -494,6 +495,7 @@ CREATE TABLE `t_ds_task_definition_log` (
   `timeout_flag` tinyint(2) DEFAULT '0' COMMENT 'timeout flag:0 close, 1 open',
   `timeout_notify_strategy` tinyint(4) DEFAULT NULL COMMENT 'timeout 
notification policy: 0 warning, 1 fail',
   `timeout` int(11) DEFAULT '0' COMMENT 'timeout length,unit: minute',
+  `delay_time` int(11) DEFAULT '0' COMMENT 'delay execution time,unit: minute',
   `resource_ids` varchar(255) DEFAULT NULL COMMENT 'resource id, separated by 
comma',
   `operator` int(11) DEFAULT NULL COMMENT 'operator user id',
   `operate_time` datetime DEFAULT NULL COMMENT 'operate time',
diff --git a/sql/dolphinscheduler_postgre.sql b/sql/dolphinscheduler_postgre.sql
index 37b07f3..86135b3 100644
--- a/sql/dolphinscheduler_postgre.sql
+++ b/sql/dolphinscheduler_postgre.sql
@@ -330,8 +330,7 @@ CREATE TABLE t_ds_process_definition_log (
   operate_time timestamp DEFAULT NULL ,
   create_time timestamp DEFAULT NULL ,
   update_time timestamp DEFAULT NULL ,
-  PRIMARY KEY (id),
-  CONSTRAINT process_definition_unique UNIQUE (name, project_id)
+  PRIMARY KEY (id)
 ) ;
 
 DROP TABLE IF EXISTS t_ds_task_definition;
@@ -353,6 +352,7 @@ CREATE TABLE t_ds_task_definition (
   timeout_flag int DEFAULT NULL ,
   timeout_notify_strategy int DEFAULT NULL ,
   timeout int DEFAULT '0' ,
+  delay_time int DEFAULT '0' ,
   resource_ids varchar(255) DEFAULT NULL ,
   create_time timestamp DEFAULT NULL ,
   update_time timestamp DEFAULT NULL ,
@@ -381,13 +381,13 @@ CREATE TABLE t_ds_task_definition_log (
   timeout_flag int DEFAULT NULL ,
   timeout_notify_strategy int DEFAULT NULL ,
   timeout int DEFAULT '0' ,
+  delay_time int DEFAULT '0' ,
   resource_ids varchar(255) DEFAULT NULL ,
   operator int DEFAULT NULL ,
   operate_time timestamp DEFAULT NULL ,
   create_time timestamp DEFAULT NULL ,
   update_time timestamp DEFAULT NULL ,
-  PRIMARY KEY (id),
-  CONSTRAINT task_definition_unique UNIQUE (name, project_id)
+  PRIMARY KEY (id)
 ) ;
 
 DROP TABLE IF EXISTS t_ds_process_task_relation;

Reply via email to