This is an automated email from the ASF dual-hosted git repository.

leonbao pushed a commit to branch json_split
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git


The following commit(s) were added to refs/heads/json_split by this push:
     new 862565a  [Feature][JsonSplit] Fix dependTask bug (#5360)
862565a is described below

commit 862565a7c7d404845a43fb5db3eda28fc5cbf05a
Author: JinyLeeChina <[email protected]>
AuthorDate: Fri Apr 23 14:23:04 2021 +0800

    [Feature][JsonSplit] Fix dependTask bug (#5360)
    
    * update SnowFlake
    
    * update processDefinite from processInstance
    
    * update processDefinite from processInstance
    
    * Fix task logger path
    
    * Fix dependTask bug
    
    Co-authored-by: JinyLeeChina <[email protected]>
---
 .../dolphinscheduler/dao/entity/TaskInstance.java  |  1 -
 .../dao/mapper/TaskInstanceMapper.xml              |  4 +-
 .../master/runner/DependentTaskExecThread.java     | 61 ++++++++++------------
 .../server/master/runner/MasterExecThread.java     | 53 ++++++++++---------
 .../server/utils/DependentExecute.java             |  6 +--
 .../service/process/ProcessService.java            |  1 -
 6 files changed, 60 insertions(+), 66 deletions(-)

diff --git 
a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskInstance.java
 
b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskInstance.java
index f1da73f..b417d2f 100644
--- 
a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskInstance.java
+++ 
b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskInstance.java
@@ -241,7 +241,6 @@ public class TaskInstance implements Serializable {
     /**
      * task params
      */
-    @TableField(exist = false)
     private String taskParams;
 
     public void init(String host, Date startTime, String executePath) {
diff --git 
a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/TaskInstanceMapper.xml
 
b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/TaskInstanceMapper.xml
index 96ec4bb..8c96e44 100644
--- 
a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/TaskInstanceMapper.xml
+++ 
b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/TaskInstanceMapper.xml
@@ -22,13 +22,13 @@
         id, name, task_type, process_instance_id, task_code, 
task_definition_version, state, submit_time,
         start_time, end_time, host, execute_path, log_path, alert_flag, 
retry_times, pid, app_link,
         flag, retry_interval, max_retry_times, task_instance_priority, 
worker_group, executor_id,
-        first_submit_time, delay_time, var_pool
+        first_submit_time, delay_time, task_params, var_pool
     </sql>
     <sql id="baseSqlV2">
         ${alias}.id, ${alias}.name, ${alias}.task_type, ${alias}.task_code, 
${alias}.task_definition_version, ${alias}.process_instance_id, ${alias}.state, 
${alias}.submit_time,
         ${alias}.start_time, ${alias}.end_time, ${alias}.host, 
${alias}.execute_path, ${alias}.log_path, ${alias}.alert_flag, 
${alias}.retry_times, ${alias}.pid, ${alias}.app_link,
         ${alias}.flag, ${alias}.retry_interval, ${alias}.max_retry_times, 
${alias}.task_instance_priority, ${alias}.worker_group, ${alias}.executor_id,
-        ${alias}.first_submit_time, ${alias}.delay_time, ${alias}.var_pool
+        ${alias}.first_submit_time, ${alias}.delay_time, ${alias}.task_params, 
${alias}.var_pool
     </sql>
     <update id="setFailoverByHostAndStateArray">
         update t_ds_task_instance
diff --git 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/DependentTaskExecThread.java
 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/DependentTaskExecThread.java
index 4d13f3b..6b2bceb 100644
--- 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/DependentTaskExecThread.java
+++ 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/DependentTaskExecThread.java
@@ -26,12 +26,11 @@ import 
org.apache.dolphinscheduler.common.model.DependentTaskModel;
 import org.apache.dolphinscheduler.common.task.dependent.DependentParameters;
 import org.apache.dolphinscheduler.common.thread.Stopper;
 import org.apache.dolphinscheduler.common.utils.DependentUtils;
-import org.apache.dolphinscheduler.common.utils.JSONUtils;
 import org.apache.dolphinscheduler.common.utils.LoggerUtils;
 import org.apache.dolphinscheduler.common.utils.NetUtils;
 import org.apache.dolphinscheduler.dao.entity.TaskInstance;
-import org.apache.dolphinscheduler.server.utils.LogUtils;
 import org.apache.dolphinscheduler.server.utils.DependentExecute;
+import org.apache.dolphinscheduler.server.utils.LogUtils;
 
 import java.util.ArrayList;
 import java.util.Date;
@@ -62,13 +61,13 @@ public class DependentTaskExecThread extends 
MasterBaseTaskExecThread {
     /**
      * dependent date
      */
-    @JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss",timezone="GMT+8")
+    @JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss", timezone = "GMT+8")
     private Date dependentDate;
 
     /**
      * constructor of MasterBaseTaskExecThread
      *
-     * @param taskInstance    task instance
+     * @param taskInstance task instance
      */
     public DependentTaskExecThread(TaskInstance taskInstance) {
         super(taskInstance);
@@ -78,7 +77,7 @@ public class DependentTaskExecThread extends 
MasterBaseTaskExecThread {
 
     @Override
     public Boolean submitWaitComplete() {
-        try{
+        try {
             logger.info("dependent task start");
             this.taskInstance = submit();
             logger = 
LoggerFactory.getLogger(LoggerUtils.buildTaskId(LoggerUtils.TASK_LOGGER_INFO_PREFIX,
@@ -92,8 +91,8 @@ public class DependentTaskExecThread extends 
MasterBaseTaskExecThread {
             initDependParameters();
             waitTaskQuit();
             updateTaskState();
-        }catch (Exception e){
-            logger.error("dependent task run exception" , e);
+        } catch (Exception e) {
+            logger.error("dependent task run exception", e);
         }
         return true;
     }
@@ -102,16 +101,13 @@ public class DependentTaskExecThread extends 
MasterBaseTaskExecThread {
      * init dependent parameters
      */
     private void initDependParameters() {
-
         this.dependentParameters = taskInstance.getDependency();
-
-        for(DependentTaskModel taskModel : 
dependentParameters.getDependTaskList()){
-            this.dependentTaskList.add(new DependentExecute(
-                    taskModel.getDependItemList(), taskModel.getRelation()));
+        for (DependentTaskModel taskModel : 
dependentParameters.getDependTaskList()) {
+            this.dependentTaskList.add(new 
DependentExecute(taskModel.getDependItemList(), taskModel.getRelation()));
         }
-        if(this.processInstance.getScheduleTime() != null){
+        if (this.processInstance.getScheduleTime() != null) {
             this.dependentDate = this.processInstance.getScheduleTime();
-        }else{
+        } else {
             this.dependentDate = new Date();
         }
     }
@@ -121,9 +117,9 @@ public class DependentTaskExecThread extends 
MasterBaseTaskExecThread {
      */
     private void updateTaskState() {
         ExecutionStatus status;
-        if(this.cancel){
+        if (this.cancel) {
             status = ExecutionStatus.KILL;
-        }else{
+        } else {
             DependResult result = getTaskDependResult();
             status = (result == DependResult.SUCCESS) ? 
ExecutionStatus.SUCCESS : ExecutionStatus.FAILURE;
         }
@@ -144,8 +140,8 @@ public class DependentTaskExecThread extends 
MasterBaseTaskExecThread {
             return true;
         }
         while (Stopper.isRunning()) {
-            try{
-                if(this.processInstance == null){
+            try {
+                if (this.processInstance == null) {
                     logger.error("process instance not exists , master task 
exec thread exit");
                     return true;
                 }
@@ -153,12 +149,12 @@ public class DependentTaskExecThread extends 
MasterBaseTaskExecThread {
                     this.checkTimeoutFlag = !alertTimeout();
                     handleTimeoutFailed();
                 }
-                if(this.cancel || this.processInstance.getState() == 
ExecutionStatus.READY_STOP){
+                if (this.cancel || this.processInstance.getState() == 
ExecutionStatus.READY_STOP) {
                     cancelTaskInstance();
                     break;
                 }
 
-                if ( allDependentTaskFinish() || 
taskInstance.getState().typeIsFinished()){
+                if (allDependentTaskFinish() || 
taskInstance.getState().typeIsFinished()) {
                     break;
                 }
                 // update process task
@@ -166,7 +162,7 @@ public class DependentTaskExecThread extends 
MasterBaseTaskExecThread {
                 processInstance = 
processService.findProcessInstanceById(processInstance.getId());
                 Thread.sleep(Constants.SLEEP_TIME_MILLIS);
             } catch (Exception e) {
-                logger.error("exception",e);
+                logger.error("exception", e);
                 if (processInstance != null) {
                     logger.error("wait task quit failed, instance id:{}, task 
id:{}",
                             processInstance.getId(), taskInstance.getId());
@@ -196,20 +192,20 @@ public class DependentTaskExecThread extends 
MasterBaseTaskExecThread {
 
     /**
      * judge all dependent tasks finish
+     *
      * @return whether all dependent tasks finish
      */
-    private boolean allDependentTaskFinish(){
+    private boolean allDependentTaskFinish() {
         boolean finish = true;
-        for(DependentExecute dependentExecute : dependentTaskList){
-            for(Map.Entry<String, DependResult> entry: 
dependentExecute.getDependResultMap().entrySet()) {
-                if(!dependResultMap.containsKey(entry.getKey())){
+        for (DependentExecute dependentExecute : dependentTaskList) {
+            for (Map.Entry<String, DependResult> entry : 
dependentExecute.getDependResultMap().entrySet()) {
+                if (!dependResultMap.containsKey(entry.getKey())) {
                     dependResultMap.put(entry.getKey(), entry.getValue());
                     //save depend result to log
-                    logger.info("dependent item complete {} {},{}",
-                            DEPENDENT_SPLIT, entry.getKey(), entry.getValue());
+                    logger.info("dependent item complete {} {},{}", 
DEPENDENT_SPLIT, entry.getKey(), entry.getValue());
                 }
             }
-            if(!dependentExecute.finish(dependentDate)){
+            if (!dependentExecute.finish(dependentDate)) {
                 finish = false;
             }
         }
@@ -218,17 +214,16 @@ public class DependentTaskExecThread extends 
MasterBaseTaskExecThread {
 
     /**
      * get dependent result
+     *
      * @return DependResult
      */
-    private DependResult getTaskDependResult(){
+    private DependResult getTaskDependResult() {
         List<DependResult> dependResultList = new ArrayList<>();
-        for(DependentExecute dependentExecute : dependentTaskList){
+        for (DependentExecute dependentExecute : dependentTaskList) {
             DependResult dependResult = 
dependentExecute.getModelDependResult(dependentDate);
             dependResultList.add(dependResult);
         }
-        DependResult result = DependentUtils.getDependResultForRelation(
-                this.dependentParameters.getRelation(), dependResultList
-        );
+        DependResult result = 
DependentUtils.getDependResultForRelation(this.dependentParameters.getRelation(),
 dependResultList);
         logger.info("dependent task completed, dependent result:{}", result);
         return result;
     }
diff --git 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterExecThread.java
 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterExecThread.java
index 23121cb..a0c560a 100644
--- 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterExecThread.java
+++ 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterExecThread.java
@@ -34,6 +34,7 @@ import 
org.apache.dolphinscheduler.common.enums.FailureStrategy;
 import org.apache.dolphinscheduler.common.enums.Flag;
 import org.apache.dolphinscheduler.common.enums.Priority;
 import org.apache.dolphinscheduler.common.enums.TaskDependType;
+import org.apache.dolphinscheduler.common.enums.TaskType;
 import org.apache.dolphinscheduler.common.graph.DAG;
 import org.apache.dolphinscheduler.common.model.TaskNode;
 import org.apache.dolphinscheduler.common.model.TaskNodeRelation;
@@ -448,13 +449,14 @@ public class MasterExecThread implements Runnable {
      * find task instance in db.
      * in case submit more than one same name task in the same time.
      *
-     * @param taskName task name
+     * @param taskCode task code
+     * @param taskVersion task version
      * @return TaskInstance
      */
-    private TaskInstance findTaskIfExists(String taskName) {
+    private TaskInstance findTaskIfExists(Long taskCode, int taskVersion) {
         List<TaskInstance> taskInstanceList = 
processService.findValidTaskListByProcessId(this.processInstance.getId());
         for (TaskInstance taskInstance : taskInstanceList) {
-            if (taskInstance.getName().equals(taskName)) {
+            if (taskInstance.getTaskCode() == taskCode && 
taskInstance.getTaskDefinitionVersion() == taskVersion) {
                 return taskInstance;
             }
         }
@@ -465,20 +467,19 @@ public class MasterExecThread implements Runnable {
      * encapsulation task
      *
      * @param processInstance process instance
-     * @param nodeName node name
+     * @param taskNode taskNode
      * @return TaskInstance
      */
-    private TaskInstance createTaskInstance(ProcessInstance processInstance, 
String nodeName,
-                                            TaskNode taskNode) {
+    private TaskInstance createTaskInstance(ProcessInstance processInstance, 
TaskNode taskNode) {
         //update processInstance for update the globalParams
         this.processInstance = 
this.processService.findProcessInstanceById(this.processInstance.getId());
-        TaskInstance taskInstance = findTaskIfExists(nodeName);
+        TaskInstance taskInstance = findTaskIfExists(taskNode.getCode(), 
taskNode.getVersion());
         if (taskInstance == null) {
             taskInstance = new TaskInstance();
             taskInstance.setTaskCode(taskNode.getCode());
             taskInstance.setTaskDefinitionVersion(taskNode.getVersion());
             // task name
-            taskInstance.setName(nodeName);
+            taskInstance.setName(taskNode.getName());
             // task instance state
             taskInstance.setState(ExecutionStatus.SUBMITTED_SUCCESS);
             // process instance id
@@ -518,27 +519,28 @@ public class MasterExecThread implements Runnable {
             } else {
                 taskInstance.setWorkerGroup(taskWorkerGroup);
             }
-            //get process global
-            setProcessGlobal(taskNode, taskInstance);
+            if 
(TaskType.DEPENDENT.getDesc().equalsIgnoreCase(taskNode.getType())) {
+                taskInstance.setTaskParams(taskNode.getDependence());
+            } else {
+                
taskInstance.setTaskParams(globalParamToTaskParams(taskNode.getParams()));
+            }
             // delay execution time
             taskInstance.setDelayTime(taskNode.getDelayTime());
         }
         return taskInstance;
     }
 
-    private void setProcessGlobal(TaskNode taskNode, TaskInstance 
taskInstance) {
+    private String globalParamToTaskParams(String params) {
         String globalParams = this.processInstance.getGlobalParams();
-        if (StringUtils.isNotEmpty(globalParams)) {
-            Map<String, String> globalMap = 
processService.getGlobalParamMap(globalParams);
-            if (globalMap != null && globalMap.size() != 0) {
-                setGlobalMapToTask(taskNode, taskInstance, globalMap);
-            }
+        if (StringUtils.isBlank(globalParams)) {
+            return params;
         }
-    }
-
-    private void setGlobalMapToTask(TaskNode taskNode, TaskInstance 
taskInstance, Map<String, String> globalMap) {
-        // the param save in localParams
-        Map<String, Object> result = JSONUtils.toMap(taskNode.getParams(), 
String.class, Object.class);
+        Map<String, String> globalMap = 
processService.getGlobalParamMap(globalParams);
+        if (globalMap == null || globalMap.size() == 0) {
+            return params;
+        }
+        // the process global param save in localParams
+        Map<String, Object> result = JSONUtils.toMap(params, String.class, 
Object.class);
         Object localParams = result.get(LOCAL_PARAMS);
         if (localParams != null) {
             List<Property> allParam = 
JSONUtils.toList(JSONUtils.toJsonString(localParams), Property.class);
@@ -552,8 +554,8 @@ public class MasterExecThread implements Runnable {
                 }
             }
             result.put(LOCAL_PARAMS, allParam);
-            taskInstance.setTaskParams(JSONUtils.toJsonString(result));
         }
+        return JSONUtils.toJsonString(result);
     }
 
     private void submitPostNode(String parentNodeName) {
@@ -567,9 +569,10 @@ public class MasterExecThread implements Runnable {
                 throw new RuntimeException();
             }
             TaskNode taskNodeObject = dag.getNode(taskNode);
-            VarPoolUtils.setTaskNodeLocalParams(taskNodeObject, propToValue);
-            taskInstances.add(createTaskInstance(processInstance, taskNode,
-                    taskNodeObject));
+            if 
(!TaskType.DEPENDENT.getDesc().equalsIgnoreCase(taskNodeObject.getType())) {
+                VarPoolUtils.setTaskNodeLocalParams(taskNodeObject, 
propToValue);
+            }
+            taskInstances.add(createTaskInstance(processInstance, 
taskNodeObject));
         }
 
         // if previous node success , post node submit
diff --git 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/DependentExecute.java
 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/DependentExecute.java
index 09c98fc..f273afd 100644
--- 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/DependentExecute.java
+++ 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/DependentExecute.java
@@ -87,7 +87,7 @@ public class DependentExecute {
      */
     private DependResult getDependentResultForItem(DependentItem 
dependentItem, Date currentTime){
         List<DateInterval> dateIntervals = 
DependentUtils.getDateIntervalList(currentTime, dependentItem.getDateValue());
-        return calculateResultForTasks(dependentItem, dateIntervals );
+        return calculateResultForTasks(dependentItem, dateIntervals);
     }
 
     /**
@@ -257,9 +257,7 @@ public class DependentExecute {
             }
             dependResultList.add(dependResult);
         }
-        modelDependResult = DependentUtils.getDependResultForRelation(
-                this.relation, dependResultList
-        );
+        modelDependResult = 
DependentUtils.getDependResultForRelation(this.relation, dependResultList);
         return modelDependResult;
     }
 
diff --git 
a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
 
b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
index 89a89be..7433824 100644
--- 
a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
+++ 
b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
@@ -1247,7 +1247,6 @@ public class ProcessService {
             if (taskInstance.isSubProcess()) {
                 taskInstance.setRetryTimes(taskInstance.getRetryTimes() + 1);
             } else {
-
                 if (processInstanceState != ExecutionStatus.READY_STOP
                         && processInstanceState != 
ExecutionStatus.READY_PAUSE) {
                     // failure task set invalid

Reply via email to