This is an automated email from the ASF dual-hosted git repository.

leonbao pushed a commit to branch params-trans
in repository https://gitbox.apache.org/repos/asf/incubator-dolphinscheduler.git


The following commit(s) were added to refs/heads/params-trans by this push:
     new 87ff13f  [Feature-#3805][server-master] global params of master (#4678)
87ff13f is described below

commit 87ff13fc865ffb456e94aa197e367b72239249e8
Author: wangxj3 <[email protected]>
AuthorDate: Sun Feb 7 17:10:33 2021 +0800

    [Feature-#3805][server-master] global params of master (#4678)
    
    * global initParam and set Param
    
    * fix dataFormat error
    
    * fix deal outParams bug
    
    * fix code style
    
    * fix code style
    
    * fix code style
    
    * fix code style
    
    * fix code style
    
    * fix code style
    
    * fix code style
    
    * add test
    
    * fix code style (variable name)
    
    * fix reset globalParams bug
    
    Co-authored-by: wangxj <wangxj31>
---
 .../dao/mapper/ProcessInstanceMapper.java          |   3 +
 .../dao/mapper/ProcessInstanceMapper.xml           |   6 +-
 .../remote/command/TaskExecuteResponseCommand.java |  13 +-
 .../master/processor/TaskResponseProcessor.java    |   4 +-
 .../master/processor/queue/TaskResponseEvent.java  |  16 +-
 .../processor/queue/TaskResponseService.java       |   3 +-
 .../server/master/runner/MasterExecThread.java     |  51 +++++-
 .../processor/queue/TaskResponseServiceTest.java   |   3 +-
 .../service/process/ProcessService.java            | 176 ++++++++++++++-------
 .../service/process/ProcessServiceTest.java        |  22 +++
 10 files changed, 231 insertions(+), 66 deletions(-)

diff --git 
a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ProcessInstanceMapper.java
 
b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ProcessInstanceMapper.java
index a9ebbf0..7116dc4 100644
--- 
a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ProcessInstanceMapper.java
+++ 
b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ProcessInstanceMapper.java
@@ -221,4 +221,7 @@ public interface ProcessInstanceMapper extends 
BaseMapper<ProcessInstance> {
             @Param("processDefinitionId") int processDefinitionId,
             @Param("states") int[] states);
 
+    int updateGlobalParamById(
+            @Param("globalParams") String globalParams,
+            @Param("id")  int id);
 }
diff --git 
a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ProcessInstanceMapper.xml
 
b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ProcessInstanceMapper.xml
index f661635..793b58e 100644
--- 
a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ProcessInstanceMapper.xml
+++ 
b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ProcessInstanceMapper.xml
@@ -219,5 +219,9 @@
         </foreach>
         order by id asc
     </select>
-
+    <update id="updateGlobalParamById">
+        update t_ds_process_instance
+        set global_params = #{globalParams}
+        where id = #{id}
+    </update>
 </mapper>
diff --git 
a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/TaskExecuteResponseCommand.java
 
b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/TaskExecuteResponseCommand.java
index de5b82c..21fe471 100644
--- 
a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/TaskExecuteResponseCommand.java
+++ 
b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/TaskExecuteResponseCommand.java
@@ -68,7 +68,10 @@ public class TaskExecuteResponseCommand implements 
Serializable {
      * varPool string
      */
     private String varPool;
-
+    /**
+     * task return result
+     */
+    private String result;
     public void setVarPool(String varPool) {
         this.varPool = varPool;
     }
@@ -139,4 +142,12 @@ public class TaskExecuteResponseCommand implements 
Serializable {
                 + ", appIds='" + appIds + '\''
                 + '}';
     }
+
+    public String getResult() {
+        return result;
+    }
+
+    public void setResult(String result) {
+        this.result = result;
+    }
 }
diff --git 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskResponseProcessor.java
 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskResponseProcessor.java
index 080fdd5..186c4f3 100644
--- 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskResponseProcessor.java
+++ 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskResponseProcessor.java
@@ -80,7 +80,9 @@ public class TaskResponseProcessor implements 
NettyRequestProcessor {
                 responseCommand.getAppIds(),
                 responseCommand.getTaskInstanceId(),
                 responseCommand.getVarPool(),
-                channel);
+                channel,
+                responseCommand.getResult()
+                );
         taskResponseService.addResponse(taskResponseEvent);
     }
 
diff --git 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskResponseEvent.java
 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskResponseEvent.java
index 0ca558a..9789bcc 100644
--- 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskResponseEvent.java
+++ 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskResponseEvent.java
@@ -92,6 +92,10 @@ public class TaskResponseEvent {
      * channel
      */
     private Channel channel;
+    /**
+     * task return result
+     */
+    private String result;
     
     public static TaskResponseEvent newAck(ExecutionStatus state,
                                            Date startTime,
@@ -118,7 +122,8 @@ public class TaskResponseEvent {
                                               String appIds,
                                               int taskInstanceId,
                                               String varPool,
-                                              Channel channel) {
+                                              Channel channel,
+                                              String result) {
         TaskResponseEvent event = new TaskResponseEvent();
         event.setState(state);
         event.setEndTime(endTime);
@@ -128,6 +133,7 @@ public class TaskResponseEvent {
         event.setEvent(Event.RESULT);
         event.setVarPool(varPool);
         event.setChannel(channel);
+        event.setResult(result);
         return event;
     }
 
@@ -226,4 +232,12 @@ public class TaskResponseEvent {
     public void setChannel(Channel channel) {
         this.channel = channel;
     }
+
+    public String getResult() {
+        return result;
+    }
+
+    public void setResult(String result) {
+        this.result = result;
+    }
 }
diff --git 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskResponseService.java
 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskResponseService.java
index 1b5eddb..f3f2e7f 100644
--- 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskResponseService.java
+++ 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskResponseService.java
@@ -165,7 +165,8 @@ public class TaskResponseService {
                             taskResponseEvent.getProcessId(),
                             taskResponseEvent.getAppIds(),
                             taskResponseEvent.getTaskInstanceId(),
-                            taskResponseEvent.getVarPool()
+                            taskResponseEvent.getVarPool(),
+                                taskResponseEvent.getResult()
                         );
                     }
                     // if taskInstance is null (maybe deleted) . retry will be 
meaningless . so response success
diff --git 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterExecThread.java
 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterExecThread.java
index 3b113b6..0e6c0d8 100644
--- 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterExecThread.java
+++ 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterExecThread.java
@@ -22,11 +22,13 @@ import static 
org.apache.dolphinscheduler.common.Constants.CMDPARAM_COMPLEMENT_D
 import static 
org.apache.dolphinscheduler.common.Constants.CMD_PARAM_RECOVERY_START_NODE_STRING;
 import static 
org.apache.dolphinscheduler.common.Constants.CMD_PARAM_START_NODE_NAMES;
 import static 
org.apache.dolphinscheduler.common.Constants.DEFAULT_WORKER_GROUP;
+import static org.apache.dolphinscheduler.common.Constants.LOCAL_PARAMS;
 import static 
org.apache.dolphinscheduler.common.Constants.SEC_2_MINUTES_TIME_UNIT;
 
 import org.apache.dolphinscheduler.common.Constants;
 import org.apache.dolphinscheduler.common.enums.CommandType;
 import org.apache.dolphinscheduler.common.enums.DependResult;
+import org.apache.dolphinscheduler.common.enums.Direct;
 import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
 import org.apache.dolphinscheduler.common.enums.FailureStrategy;
 import org.apache.dolphinscheduler.common.enums.Flag;
@@ -36,6 +38,7 @@ import org.apache.dolphinscheduler.common.graph.DAG;
 import org.apache.dolphinscheduler.common.model.TaskNode;
 import org.apache.dolphinscheduler.common.model.TaskNodeRelation;
 import org.apache.dolphinscheduler.common.process.ProcessDag;
+import org.apache.dolphinscheduler.common.process.Property;
 import org.apache.dolphinscheduler.common.thread.Stopper;
 import org.apache.dolphinscheduler.common.thread.ThreadUtils;
 import org.apache.dolphinscheduler.common.utils.CollectionUtils;
@@ -67,6 +70,7 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.Date;
+import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
@@ -74,6 +78,7 @@ import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Future;
+import java.util.stream.Collectors;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -491,7 +496,8 @@ public class MasterExecThread implements Runnable {
      */
     private TaskInstance createTaskInstance(ProcessInstance processInstance, 
String nodeName,
                                             TaskNode taskNode) {
-
+        //update processInstance for update the globalParams
+        this.processInstance = 
this.processService.findProcessInstanceById(this.processInstance.getId());
         TaskInstance taskInstance = findTaskIfExists(nodeName);
         if (taskInstance == null) {
             taskInstance = new TaskInstance();
@@ -540,13 +546,53 @@ public class MasterExecThread implements Runnable {
             } else {
                 taskInstance.setWorkerGroup(taskWorkerGroup);
             }
-
+            //get process global
+            setProcessGlobal(taskNode, taskInstance);
             // delay execution time
             taskInstance.setDelayTime(taskNode.getDelayTime());
         }
         return taskInstance;
     }
 
+    private void setProcessGlobal(TaskNode taskNode, TaskInstance 
taskInstance) {
+        String globalParams = this.processInstance.getGlobalParams();
+        if (StringUtils.isNotEmpty(globalParams)) {
+            Map<String, String> globalMap = getGlobalParamMap(globalParams);
+            if (globalMap != null) {
+                // the param save in localParams
+                Map<String, Object> result = 
JSONUtils.toMap(taskNode.getParams(), String.class, Object.class);
+                Object localParams = result.get(LOCAL_PARAMS);
+                if (localParams != null) {
+                    List<Property> allParam = 
JSONUtils.toList(JSONUtils.toJsonString(localParams), Property.class);
+                    for (Property info : allParam) {
+                        if (info.getDirect().equals(Direct.IN)) {
+                            String paramName = info.getProp();
+                            String value = globalMap.get(paramName);
+                            if (StringUtils.isNotEmpty(value)) {
+                                info.setValue(value);
+                            }
+                        }
+                    }
+                    result.put(LOCAL_PARAMS, allParam);
+                    taskNode.setParams(JSONUtils.toJsonString(result));
+                    // task instance node json
+                    taskInstance.setTaskJson(JSONUtils.toJsonString(taskNode));
+                }
+            }
+        }
+    }
+
+    public Map<String, String> getGlobalParamMap(String globalParams) {
+        List<Property> propList;
+        Map<String,String> globalParamMap = new HashMap<>();
+        if (StringUtils.isNotEmpty(globalParams)) {
+            propList = JSONUtils.toList(globalParams, Property.class);
+            globalParamMap = 
propList.stream().collect(Collectors.toMap(Property::getProp, 
Property::getValue));
+        }
+
+        return globalParamMap;
+    }
+
     private void submitPostNode(String parentNodeName) {
         Set<String> submitTaskNodeList = 
DagHelper.parsePostNodes(parentNodeName, skipTaskNodeList, dag, 
completeTaskList);
         List<TaskInstance> taskInstances = new ArrayList<>();
@@ -952,6 +998,7 @@ public class MasterExecThread implements Runnable {
                 // node success , post node submit
                 if (task.getState() == ExecutionStatus.SUCCESS) {
                     processInstance.setVarPool(task.getVarPool());
+                    processInstance = 
processService.findProcessInstanceById(processInstance.getId());
                     processService.updateProcessInstance(processInstance);
                     completeTaskList.put(task.getName(), task);
                     submitPostNode(task.getName());
diff --git 
a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskResponseServiceTest.java
 
b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskResponseServiceTest.java
index 5d10f84..ec0807c 100644
--- 
a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskResponseServiceTest.java
+++ 
b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskResponseServiceTest.java
@@ -70,7 +70,8 @@ public class TaskResponseServiceTest {
             "ids",
             22,
             "varPol",
-            channel);
+            channel,
+                
"[{\"id\":70000,\"database_name\":\"yuul\",\"status\":-1,\"create_time\":1601202829000,\"update_time\":1601202829000,\"table_name3\":\"\",\"table_name4\":\"\"}]");
 
         taskInstance = new TaskInstance();
         taskInstance.setId(22);
diff --git 
a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
 
b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
index b9065ec..d77a654 100644
--- 
a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
+++ 
b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
@@ -24,6 +24,7 @@ import static 
org.apache.dolphinscheduler.common.Constants.CMD_PARAM_RECOVER_PRO
 import static 
org.apache.dolphinscheduler.common.Constants.CMD_PARAM_SUB_PROCESS;
 import static 
org.apache.dolphinscheduler.common.Constants.CMD_PARAM_SUB_PROCESS_DEFINE_ID;
 import static 
org.apache.dolphinscheduler.common.Constants.CMD_PARAM_SUB_PROCESS_PARENT_INSTANCE_ID;
+import static org.apache.dolphinscheduler.common.Constants.LOCAL_PARAMS;
 import static org.apache.dolphinscheduler.common.Constants.YYYY_MM_DD_HH_MM_SS;
 
 import static java.util.stream.Collectors.toSet;
@@ -32,6 +33,7 @@ import org.apache.dolphinscheduler.common.Constants;
 import org.apache.dolphinscheduler.common.enums.AuthorizationType;
 import org.apache.dolphinscheduler.common.enums.CommandType;
 import org.apache.dolphinscheduler.common.enums.CycleEnum;
+import org.apache.dolphinscheduler.common.enums.Direct;
 import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
 import org.apache.dolphinscheduler.common.enums.FailureStrategy;
 import org.apache.dolphinscheduler.common.enums.Flag;
@@ -89,6 +91,7 @@ import java.util.Date;
 import java.util.EnumMap;
 import java.util.HashMap;
 import java.util.HashSet;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
@@ -103,6 +106,8 @@ import org.springframework.stereotype.Component;
 import org.springframework.transaction.annotation.Transactional;
 
 import com.cronutils.model.Cron;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.node.ArrayNode;
 import com.fasterxml.jackson.databind.node.ObjectNode;
 
 /**
@@ -161,10 +166,10 @@ public class ProcessService {
     /**
      * handle Command (construct ProcessInstance from Command) , wrapped in 
transaction
      *
-     * @param logger logger
-     * @param host host
+     * @param logger         logger
+     * @param host           host
      * @param validThreadNum validThreadNum
-     * @param command found command
+     * @param command        found command
      * @return process instance
      */
     @Transactional(rollbackFor = Exception.class)
@@ -204,7 +209,7 @@ public class ProcessService {
     /**
      * set process waiting thread
      *
-     * @param command command
+     * @param command         command
      * @param processInstance processInstance
      * @return process instance
      */
@@ -222,7 +227,7 @@ public class ProcessService {
     /**
      * check thread num
      *
-     * @param command command
+     * @param command        command
      * @param validThreadNum validThreadNum
      * @return if thread is enough
      */
@@ -425,7 +430,7 @@ public class ProcessService {
      * recursive query sub process definition id by parent id.
      *
      * @param parentId parentId
-     * @param ids ids
+     * @param ids      ids
      */
     public void recurseFindSubProcessId(int parentId, List<Integer> ids) {
         ProcessDefinition processDefinition = 
processDefineMapper.selectById(parentId);
@@ -456,7 +461,7 @@ public class ProcessService {
      * create recovery waiting thread  command and delete origin command at 
the same time.
      * if the recovery command is exists, only update the field update_time
      *
-     * @param originCommand originCommand
+     * @param originCommand   originCommand
      * @param processInstance processInstance
      */
     public void createRecoveryWaitingThreadCommand(Command originCommand, 
ProcessInstance processInstance) {
@@ -508,7 +513,7 @@ public class ProcessService {
     /**
      * get schedule time from command
      *
-     * @param command command
+     * @param command  command
      * @param cmdParam cmdParam map
      * @return date
      */
@@ -524,8 +529,8 @@ public class ProcessService {
      * generate a new work process instance from command.
      *
      * @param processDefinition processDefinition
-     * @param command command
-     * @param cmdParam cmdParam map
+     * @param command           command
+     * @param cmdParam          cmdParam map
      * @return process instance
      */
     private ProcessInstance generateNewProcessInstance(ProcessDefinition 
processDefinition,
@@ -601,7 +606,7 @@ public class ProcessService {
      * use definition creator's tenant.
      *
      * @param tenantId tenantId
-     * @param userId userId
+     * @param userId   userId
      * @return tenant
      */
     public Tenant getTenantForProcess(int tenantId, int userId) {
@@ -624,7 +629,7 @@ public class ProcessService {
     /**
      * check command parameters is valid
      *
-     * @param command command
+     * @param command  command
      * @param cmdParam cmdParam map
      * @return whether command param is valid
      */
@@ -644,7 +649,7 @@ public class ProcessService {
      * construct process instance according to one command.
      *
      * @param command command
-     * @param host host
+     * @param host    host
      * @return process instance
      */
     private ProcessInstance constructProcessInstance(Command command, String 
host) {
@@ -686,11 +691,6 @@ public class ProcessService {
             } else {
                 processInstance = 
this.findProcessInstanceDetailById(processInstanceId);
                 // Recalculate global parameters after rerun.
-                
processInstance.setGlobalParams(ParameterUtils.curingGlobalParams(
-                    processDefinition.getGlobalParamMap(),
-                    processDefinition.getGlobalParamList(),
-                    getCommandTypeIfComplement(processInstance, command),
-                    processInstance.getScheduleTime()));
             }
             processDefinition = 
processDefineMapper.selectById(processInstance.getProcessDefinitionId());
             processInstance.setProcessDefinition(processDefinition);
@@ -807,7 +807,7 @@ public class ProcessService {
      * return complement data if the process start with complement data
      *
      * @param processInstance processInstance
-     * @param command command
+     * @param command         command
      * @return command type
      */
     private CommandType getCommandTypeIfComplement(ProcessInstance 
processInstance, Command command) {
@@ -822,8 +822,8 @@ public class ProcessService {
      * initialize complement data parameters
      *
      * @param processDefinition processDefinition
-     * @param processInstance processInstance
-     * @param cmdParam cmdParam
+     * @param processInstance   processInstance
+     * @param cmdParam          cmdParam
      */
     private void initComplementDataParam(ProcessDefinition processDefinition,
                                          ProcessInstance processInstance,
@@ -895,7 +895,7 @@ public class ProcessService {
      * only the keys doesn't in sub process global would be joined.
      *
      * @param parentGlobalParams parentGlobalParams
-     * @param subGlobalParams subGlobalParams
+     * @param subGlobalParams    subGlobalParams
      * @return global params join
      */
     private String joinGlobalParams(String parentGlobalParams, String 
subGlobalParams) {
@@ -965,7 +965,7 @@ public class ProcessService {
      * set map {parent instance id, task instance id, 0(child instance id)}
      *
      * @param parentInstance parentInstance
-     * @param parentTask parentTask
+     * @param parentTask     parentTask
      * @return process instance map
      */
     private ProcessInstanceMap setProcessInstanceMap(ProcessInstance 
parentInstance, TaskInstance parentTask) {
@@ -994,7 +994,7 @@ public class ProcessService {
      * find previous task work process map.
      *
      * @param parentProcessInstance parentProcessInstance
-     * @param parentTask parentTask
+     * @param parentTask            parentTask
      * @return process instance map
      */
     private ProcessInstanceMap findPreviousTaskProcessMap(ProcessInstance 
parentProcessInstance,
@@ -1020,7 +1020,7 @@ public class ProcessService {
      * create sub work process command
      *
      * @param parentProcessInstance parentProcessInstance
-     * @param task task
+     * @param task                  task
      */
     public void createSubWorkProcess(ProcessInstance parentProcessInstance, 
TaskInstance task) {
         if (!task.isSubProcess()) {
@@ -1119,7 +1119,7 @@ public class ProcessService {
      * update sub process definition
      *
      * @param parentProcessInstance parentProcessInstance
-     * @param childDefinitionId childDefinitionId
+     * @param childDefinitionId     childDefinitionId
      */
     private void updateSubProcessDefinitionByParent(ProcessInstance 
parentProcessInstance, int childDefinitionId) {
         ProcessDefinition fatherDefinition = 
this.findProcessDefineById(parentProcessInstance.getProcessDefinitionId());
@@ -1133,7 +1133,7 @@ public class ProcessService {
     /**
      * submit task to mysql
      *
-     * @param taskInstance taskInstance
+     * @param taskInstance    taskInstance
      * @param processInstance processInstance
      * @return task instance
      */
@@ -1187,16 +1187,16 @@ public class ProcessService {
      * return stop if work process state is ready stop
      * if all of above are not satisfied, return submit success
      *
-     * @param taskInstance taskInstance
+     * @param taskInstance         taskInstance
      * @param processInstanceState processInstanceState
      * @return process instance state
      */
     public ExecutionStatus getSubmitTaskState(TaskInstance taskInstance, 
ExecutionStatus processInstanceState) {
         ExecutionStatus state = taskInstance.getState();
+        // running, delayed or killed
+        // the task already exists in task queue
+        // return state
         if (
-                // running, delayed or killed
-                // the task already exists in task queue
-                // return state
                 state == ExecutionStatus.RUNNING_EXECUTION
                         || state == ExecutionStatus.DELAY_EXECUTION
                         || state == ExecutionStatus.KILL
@@ -1363,7 +1363,7 @@ public class ProcessService {
      * get id list by task state
      *
      * @param instanceId instanceId
-     * @param state state
+     * @param state      state
      * @return task instance states
      */
     public List<Integer> findTaskIdByInstanceState(int instanceId, 
ExecutionStatus state) {
@@ -1418,7 +1418,7 @@ public class ProcessService {
      * find work process map by parent process id and parent task id.
      *
      * @param parentWorkProcessId parentWorkProcessId
-     * @param parentTaskId parentTaskId
+     * @param parentTaskId        parentTaskId
      * @return process instance map
      */
     public ProcessInstanceMap findWorkProcessMapByParent(Integer 
parentWorkProcessId, Integer parentTaskId) {
@@ -1440,7 +1440,7 @@ public class ProcessService {
      * find sub process instance
      *
      * @param parentProcessId parentProcessId
-     * @param parentTaskId parentTaskId
+     * @param parentTaskId    parentTaskId
      * @return process instance
      */
     public ProcessInstance findSubProcessInstance(Integer parentProcessId, 
Integer parentTaskId) {
@@ -1472,12 +1472,12 @@ public class ProcessService {
     /**
      * change task state
      *
-     * @param state state
-     * @param startTime startTime
-     * @param host host
+     * @param state       state
+     * @param startTime   startTime
+     * @param host        host
      * @param executePath executePath
-     * @param logPath logPath
-     * @param taskInstId taskInstId
+     * @param logPath     logPath
+     * @param taskInstId  taskInstId
      */
     public void changeTaskState(TaskInstance taskInstance, ExecutionStatus 
state, Date startTime, String host,
                                 String executePath,
@@ -1505,12 +1505,12 @@ public class ProcessService {
      * update the process instance
      *
      * @param processInstanceId processInstanceId
-     * @param processJson processJson
-     * @param globalParams globalParams
-     * @param scheduleTime scheduleTime
-     * @param flag flag
-     * @param locations locations
-     * @param connects connects
+     * @param processJson       processJson
+     * @param globalParams      globalParams
+     * @param scheduleTime      scheduleTime
+     * @param flag              flag
+     * @param locations         locations
+     * @param connects          connects
      * @return update process instance result
      */
     public int updateProcessInstance(Integer processInstanceId, String 
processJson,
@@ -1531,25 +1531,85 @@ public class ProcessService {
     /**
      * change task state
      *
-     * @param state state
-     * @param endTime endTime
+     * @param state      state
+     * @param endTime    endTime
      * @param taskInstId taskInstId
-     * @param varPool varPool
+     * @param varPool    varPool
      */
     public void changeTaskState(TaskInstance taskInstance, ExecutionStatus 
state,
                                 Date endTime,
                                 int processId,
                                 String appIds,
                                 int taskInstId,
-                                String varPool) {
+                                String varPool,
+                                String result) {
         taskInstance.setPid(processId);
         taskInstance.setAppLink(appIds);
         taskInstance.setState(state);
         taskInstance.setEndTime(endTime);
         taskInstance.setVarPool(varPool);
+        changeOutParam(result, taskInstance);
         saveTaskInstance(taskInstance);
     }
 
+    public void changeOutParam(String result, TaskInstance taskInstance) {
+        if (StringUtils.isEmpty(result)) {
+            return;
+        }
+        List<Map<String, String>> workerResultParam = 
getListMapByString(result);
+        if (CollectionUtils.isEmpty(workerResultParam)) {
+            return;
+        }
+        //if the result more than one line,just get the first .
+        Map<String, String> row = workerResultParam.get(0);
+        if (row == null || row.size() == 0) {
+            return;
+        }
+        TaskNode taskNode = JSONUtils.parseObject(taskInstance.getTaskJson(), 
TaskNode.class);
+        Map<String, Object> taskParams = JSONUtils.toMap(taskNode.getParams(), 
String.class, Object.class);
+        Object localParams = taskParams.get(LOCAL_PARAMS);
+        if (localParams == null) {
+            return;
+        }
+        ProcessInstance processInstance = 
this.processInstanceMapper.queryDetailById(taskInstance.getProcessInstanceId());
+        List<Property> params4Process = 
JSONUtils.toList(processInstance.getGlobalParams(), Property.class);
+        Map<String, Property> allParamMap = 
params4Process.stream().collect(Collectors.toMap(Property::getProp, Property -> 
Property));
+
+        List<Property> allParam = 
JSONUtils.toList(JSONUtils.toJsonString(localParams), Property.class);
+        for (Property info : allParam) {
+            if (info.getDirect() == Direct.OUT) {
+                String paramName = info.getProp();
+                Property property = allParamMap.get(paramName);
+                if (property == null) {
+                    continue;
+                }
+                String value = row.get(paramName);
+                if (StringUtils.isNotEmpty(value)) {
+                    property.setValue(value);
+                    info.setValue(value);
+                }
+            }
+        }
+        taskParams.put(LOCAL_PARAMS, allParam);
+        taskNode.setParams(JSONUtils.toJsonString(taskParams));
+        // task instance node json
+        taskInstance.setTaskJson(JSONUtils.toJsonString(taskNode));
+        String params4ProcessString = JSONUtils.toJsonString(params4Process);
+        int updateCount = 
this.processInstanceMapper.updateGlobalParamById(params4ProcessString, 
processInstance.getId());
+        logger.info("updateCount:{}, params4Process:{}, processInstanceId:{}", 
updateCount, params4ProcessString, processInstance.getId());
+    }
+
+    public List<Map<String, String>> getListMapByString(String json) {
+        List<Map<String, String>> allParams = new ArrayList<>();
+        ArrayNode paramsByJson = JSONUtils.parseArray(json);
+        Iterator<JsonNode> listIterator = paramsByJson.iterator();
+        while (listIterator.hasNext()) {
+            Map<String, String> param = 
JSONUtils.toMap(listIterator.next().toString(), String.class, String.class);
+            allParams.add(param);
+        }
+        return allParams;
+    }
+
     /**
      * convert integer list to string list
      *
@@ -1642,7 +1702,7 @@ public class ProcessService {
      * update process instance state by id
      *
      * @param processInstanceId processInstanceId
-     * @param executionStatus executionStatus
+     * @param executionStatus   executionStatus
      * @return update process result
      */
     public int updateProcessInstanceState(Integer processInstanceId, 
ExecutionStatus executionStatus) {
@@ -1679,7 +1739,7 @@ public class ProcessService {
     /**
      * find tenant code by resource name
      *
-     * @param resName resource name
+     * @param resName      resource name
      * @param resourceType resource type
      * @return tenant code
      */
@@ -1703,9 +1763,9 @@ public class ProcessService {
     /**
      * get dependency cycle by work process define id and scheduler fire time
      *
-     * @param masterId masterId
+     * @param masterId            masterId
      * @param processDefinitionId processDefinitionId
-     * @param scheduledFireTime the time the task schedule is expected to 
trigger
+     * @param scheduledFireTime   the time the task schedule is expected to 
trigger
      * @return CycleDependency
      * @throws Exception if error throws Exception
      */
@@ -1718,8 +1778,8 @@ public class ProcessService {
     /**
      * get dependency cycle list by work process define id list and scheduler 
fire time
      *
-     * @param masterId masterId
-     * @param ids ids
+     * @param masterId          masterId
+     * @param ids               ids
      * @param scheduledFireTime the time the task schedule is expected to 
trigger
      * @return CycleDependency list
      * @throws Exception if error throws Exception
@@ -1814,8 +1874,8 @@ public class ProcessService {
      * find last running process instance
      *
      * @param definitionId process definition id
-     * @param startTime start time
-     * @param endTime end time
+     * @param startTime    start time
+     * @param endTime      end time
      * @return process instance
      */
     public ProcessInstance findLastRunningProcess(int definitionId, Date 
startTime, Date endTime) {
@@ -1915,7 +1975,7 @@ public class ProcessService {
     /**
      * list unauthorized udf function
      *
-     * @param userId user id
+     * @param userId     user id
      * @param needChecks data source id array
      * @return unauthorized udf function list
      */
diff --git 
a/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/process/ProcessServiceTest.java
 
b/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/process/ProcessServiceTest.java
index 551c9bb..7eec366 100644
--- 
a/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/process/ProcessServiceTest.java
+++ 
b/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/process/ProcessServiceTest.java
@@ -442,4 +442,26 @@ public class ProcessServiceTest {
         Assert.assertEquals(expect, 
processService.changeJson(newProcessData,oldJson));
 
     }
+
+    @Test
+    public void testChangeOutParam() {
+        String result = "[{\"d\":\"20210203\"}]";
+        TaskInstance taskInstance = new TaskInstance();
+        taskInstance.setProcessInstanceId(62);
+        
taskInstance.setTaskJson("{\"id\":\"tasks-86175\",\"name\":\"wew\",\"desc\":null,\"type\":\"SHELL\",\"runFlag\":\"NORMAL\",\"loc\":null,\"maxRetryTimes\":0,"
+                + "\"retryInterval\":1,\"params\":{\"rawScript\":\"echo 
20210203\",\"localParams\":[{\"prop\":\"d\",\"direct\":\"OUT\",\"type\":\"VARCHAR\",\"value\":\"\"}],"
+                + 
"\"resourceList\":[]},\"preTasks\":[],\"extras\":null,\"depList\":[],\"dependence\":{},\"conditionResult\":{\"successNode\":[\"\"],\"failedNode\":[\"\"]},"
+                + 
"\"taskInstancePriority\":\"MEDIUM\",\"workerGroup\":\"default\",\"workerGroupId\":null,"
+                + 
"\"timeout\":{\"strategy\":\"\",\"interval\":null,\"enable\":false},\"delayTime\":0}");
+        ProcessInstance processInstance = new ProcessInstance();
+        processInstance.setId(62);
+        
processInstance.setGlobalParams("[{\"prop\":\"sql2\",\"direct\":null,\"type\":null,\"value\":\"\"},{\"prop\":\"out\",\"direct\":null,\"type\":null,\"value\":\"\"},"
+                + 
"{\"prop\":\"d\",\"direct\":\"IN\",\"type\":\"VARCHAR\",\"value\":\"\"}]");
+        String params4ProcessString = 
"[{\"prop\":\"sql2\",\"direct\":null,\"type\":null,\"value\":\"\"},{\"prop\":\"out\",\"direct\":null,\"type\":null,\"value\":\"\"},"
+                + 
"{\"prop\":\"d\",\"direct\":\"IN\",\"type\":\"VARCHAR\",\"value\":\"20210203\"}]";
+        
Mockito.when(processInstanceMapper.queryDetailById(taskInstance.getProcessInstanceId())).thenReturn(processInstance);
+        
Mockito.when(this.processInstanceMapper.updateGlobalParamById(params4ProcessString,
 processInstance.getId())).thenReturn(1);
+        processService.changeOutParam(result,taskInstance);
+    }
+
 }

Reply via email to