This is an automated email from the ASF dual-hosted git repository.
leonbao pushed a commit to branch params-trans
in repository https://gitbox.apache.org/repos/asf/incubator-dolphinscheduler.git
The following commit(s) were added to refs/heads/params-trans by this push:
new 87ff13f [Feature-#3805][server-master] global params of master (#4678)
87ff13f is described below
commit 87ff13fc865ffb456e94aa197e367b72239249e8
Author: wangxj3 <[email protected]>
AuthorDate: Sun Feb 7 17:10:33 2021 +0800
[Feature-#3805][server-master] global params of master (#4678)
* global initParam and set Param
* fix dataFormat error
* fix deal outParams bug
* fix code style
* fix code style
* fix code style
* fix code style
* fix code style
* fix code style
* fix code style
* add test
* fix code style (variable name)
* fix reset globalParams bug
Co-authored-by: wangxj <wangxj31>
---
.../dao/mapper/ProcessInstanceMapper.java | 3 +
.../dao/mapper/ProcessInstanceMapper.xml | 6 +-
.../remote/command/TaskExecuteResponseCommand.java | 13 +-
.../master/processor/TaskResponseProcessor.java | 4 +-
.../master/processor/queue/TaskResponseEvent.java | 16 +-
.../processor/queue/TaskResponseService.java | 3 +-
.../server/master/runner/MasterExecThread.java | 51 +++++-
.../processor/queue/TaskResponseServiceTest.java | 3 +-
.../service/process/ProcessService.java | 176 ++++++++++++++-------
.../service/process/ProcessServiceTest.java | 22 +++
10 files changed, 231 insertions(+), 66 deletions(-)
diff --git
a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ProcessInstanceMapper.java
b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ProcessInstanceMapper.java
index a9ebbf0..7116dc4 100644
---
a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ProcessInstanceMapper.java
+++
b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ProcessInstanceMapper.java
@@ -221,4 +221,7 @@ public interface ProcessInstanceMapper extends
BaseMapper<ProcessInstance> {
@Param("processDefinitionId") int processDefinitionId,
@Param("states") int[] states);
+ int updateGlobalParamById(
+ @Param("globalParams") String globalParams,
+ @Param("id") int id);
}
diff --git
a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ProcessInstanceMapper.xml
b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ProcessInstanceMapper.xml
index f661635..793b58e 100644
---
a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ProcessInstanceMapper.xml
+++
b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ProcessInstanceMapper.xml
@@ -219,5 +219,9 @@
</foreach>
order by id asc
</select>
-
+ <update id="updateGlobalParamById">
+ update t_ds_process_instance
+ set global_params = #{globalParams}
+ where id = #{id}
+ </update>
</mapper>
diff --git
a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/TaskExecuteResponseCommand.java
b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/TaskExecuteResponseCommand.java
index de5b82c..21fe471 100644
---
a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/TaskExecuteResponseCommand.java
+++
b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/TaskExecuteResponseCommand.java
@@ -68,7 +68,10 @@ public class TaskExecuteResponseCommand implements
Serializable {
* varPool string
*/
private String varPool;
-
+ /**
+ * task return result
+ */
+ private String result;
public void setVarPool(String varPool) {
this.varPool = varPool;
}
@@ -139,4 +142,12 @@ public class TaskExecuteResponseCommand implements
Serializable {
+ ", appIds='" + appIds + '\''
+ '}';
}
+
+ public String getResult() {
+ return result;
+ }
+
+ public void setResult(String result) {
+ this.result = result;
+ }
}
diff --git
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskResponseProcessor.java
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskResponseProcessor.java
index 080fdd5..186c4f3 100644
---
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskResponseProcessor.java
+++
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskResponseProcessor.java
@@ -80,7 +80,9 @@ public class TaskResponseProcessor implements
NettyRequestProcessor {
responseCommand.getAppIds(),
responseCommand.getTaskInstanceId(),
responseCommand.getVarPool(),
- channel);
+ channel,
+ responseCommand.getResult()
+ );
taskResponseService.addResponse(taskResponseEvent);
}
diff --git
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskResponseEvent.java
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskResponseEvent.java
index 0ca558a..9789bcc 100644
---
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskResponseEvent.java
+++
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskResponseEvent.java
@@ -92,6 +92,10 @@ public class TaskResponseEvent {
* channel
*/
private Channel channel;
+ /**
+ * task return result
+ */
+ private String result;
public static TaskResponseEvent newAck(ExecutionStatus state,
Date startTime,
@@ -118,7 +122,8 @@ public class TaskResponseEvent {
String appIds,
int taskInstanceId,
String varPool,
- Channel channel) {
+ Channel channel,
+ String result) {
TaskResponseEvent event = new TaskResponseEvent();
event.setState(state);
event.setEndTime(endTime);
@@ -128,6 +133,7 @@ public class TaskResponseEvent {
event.setEvent(Event.RESULT);
event.setVarPool(varPool);
event.setChannel(channel);
+ event.setResult(result);
return event;
}
@@ -226,4 +232,12 @@ public class TaskResponseEvent {
public void setChannel(Channel channel) {
this.channel = channel;
}
+
+ public String getResult() {
+ return result;
+ }
+
+ public void setResult(String result) {
+ this.result = result;
+ }
}
diff --git
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskResponseService.java
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskResponseService.java
index 1b5eddb..f3f2e7f 100644
---
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskResponseService.java
+++
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskResponseService.java
@@ -165,7 +165,8 @@ public class TaskResponseService {
taskResponseEvent.getProcessId(),
taskResponseEvent.getAppIds(),
taskResponseEvent.getTaskInstanceId(),
- taskResponseEvent.getVarPool()
+ taskResponseEvent.getVarPool(),
+ taskResponseEvent.getResult()
);
}
// if taskInstance is null (maybe deleted) . retry will be
meaningless . so response success
diff --git
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterExecThread.java
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterExecThread.java
index 3b113b6..0e6c0d8 100644
---
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterExecThread.java
+++
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterExecThread.java
@@ -22,11 +22,13 @@ import static
org.apache.dolphinscheduler.common.Constants.CMDPARAM_COMPLEMENT_D
import static
org.apache.dolphinscheduler.common.Constants.CMD_PARAM_RECOVERY_START_NODE_STRING;
import static
org.apache.dolphinscheduler.common.Constants.CMD_PARAM_START_NODE_NAMES;
import static
org.apache.dolphinscheduler.common.Constants.DEFAULT_WORKER_GROUP;
+import static org.apache.dolphinscheduler.common.Constants.LOCAL_PARAMS;
import static
org.apache.dolphinscheduler.common.Constants.SEC_2_MINUTES_TIME_UNIT;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.CommandType;
import org.apache.dolphinscheduler.common.enums.DependResult;
+import org.apache.dolphinscheduler.common.enums.Direct;
import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
import org.apache.dolphinscheduler.common.enums.FailureStrategy;
import org.apache.dolphinscheduler.common.enums.Flag;
@@ -36,6 +38,7 @@ import org.apache.dolphinscheduler.common.graph.DAG;
import org.apache.dolphinscheduler.common.model.TaskNode;
import org.apache.dolphinscheduler.common.model.TaskNodeRelation;
import org.apache.dolphinscheduler.common.process.ProcessDag;
+import org.apache.dolphinscheduler.common.process.Property;
import org.apache.dolphinscheduler.common.thread.Stopper;
import org.apache.dolphinscheduler.common.thread.ThreadUtils;
import org.apache.dolphinscheduler.common.utils.CollectionUtils;
@@ -67,6 +70,7 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Date;
+import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
@@ -74,6 +78,7 @@ import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
+import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -491,7 +496,8 @@ public class MasterExecThread implements Runnable {
*/
private TaskInstance createTaskInstance(ProcessInstance processInstance,
String nodeName,
TaskNode taskNode) {
-
+ //update processInstance for update the globalParams
+ this.processInstance =
this.processService.findProcessInstanceById(this.processInstance.getId());
TaskInstance taskInstance = findTaskIfExists(nodeName);
if (taskInstance == null) {
taskInstance = new TaskInstance();
@@ -540,13 +546,53 @@ public class MasterExecThread implements Runnable {
} else {
taskInstance.setWorkerGroup(taskWorkerGroup);
}
-
+ //get process global
+ setProcessGlobal(taskNode, taskInstance);
// delay execution time
taskInstance.setDelayTime(taskNode.getDelayTime());
}
return taskInstance;
}
+ private void setProcessGlobal(TaskNode taskNode, TaskInstance
taskInstance) {
+ String globalParams = this.processInstance.getGlobalParams();
+ if (StringUtils.isNotEmpty(globalParams)) {
+ Map<String, String> globalMap = getGlobalParamMap(globalParams);
+ if (globalMap != null) {
+ // the param save in localParams
+ Map<String, Object> result =
JSONUtils.toMap(taskNode.getParams(), String.class, Object.class);
+ Object localParams = result.get(LOCAL_PARAMS);
+ if (localParams != null) {
+ List<Property> allParam =
JSONUtils.toList(JSONUtils.toJsonString(localParams), Property.class);
+ for (Property info : allParam) {
+ if (info.getDirect().equals(Direct.IN)) {
+ String paramName = info.getProp();
+ String value = globalMap.get(paramName);
+ if (StringUtils.isNotEmpty(value)) {
+ info.setValue(value);
+ }
+ }
+ }
+ result.put(LOCAL_PARAMS, allParam);
+ taskNode.setParams(JSONUtils.toJsonString(result));
+ // task instance node json
+ taskInstance.setTaskJson(JSONUtils.toJsonString(taskNode));
+ }
+ }
+ }
+ }
+
+ public Map<String, String> getGlobalParamMap(String globalParams) {
+ List<Property> propList;
+ Map<String,String> globalParamMap = new HashMap<>();
+ if (StringUtils.isNotEmpty(globalParams)) {
+ propList = JSONUtils.toList(globalParams, Property.class);
+ globalParamMap =
propList.stream().collect(Collectors.toMap(Property::getProp,
Property::getValue));
+ }
+
+ return globalParamMap;
+ }
+
private void submitPostNode(String parentNodeName) {
Set<String> submitTaskNodeList =
DagHelper.parsePostNodes(parentNodeName, skipTaskNodeList, dag,
completeTaskList);
List<TaskInstance> taskInstances = new ArrayList<>();
@@ -952,6 +998,7 @@ public class MasterExecThread implements Runnable {
// node success , post node submit
if (task.getState() == ExecutionStatus.SUCCESS) {
processInstance.setVarPool(task.getVarPool());
+ processInstance =
processService.findProcessInstanceById(processInstance.getId());
processService.updateProcessInstance(processInstance);
completeTaskList.put(task.getName(), task);
submitPostNode(task.getName());
diff --git
a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskResponseServiceTest.java
b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskResponseServiceTest.java
index 5d10f84..ec0807c 100644
---
a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskResponseServiceTest.java
+++
b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskResponseServiceTest.java
@@ -70,7 +70,8 @@ public class TaskResponseServiceTest {
"ids",
22,
"varPol",
- channel);
+ channel,
+
"[{\"id\":70000,\"database_name\":\"yuul\",\"status\":-1,\"create_time\":1601202829000,\"update_time\":1601202829000,\"table_name3\":\"\",\"table_name4\":\"\"}]");
taskInstance = new TaskInstance();
taskInstance.setId(22);
diff --git
a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
index b9065ec..d77a654 100644
---
a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
+++
b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
@@ -24,6 +24,7 @@ import static
org.apache.dolphinscheduler.common.Constants.CMD_PARAM_RECOVER_PRO
import static
org.apache.dolphinscheduler.common.Constants.CMD_PARAM_SUB_PROCESS;
import static
org.apache.dolphinscheduler.common.Constants.CMD_PARAM_SUB_PROCESS_DEFINE_ID;
import static
org.apache.dolphinscheduler.common.Constants.CMD_PARAM_SUB_PROCESS_PARENT_INSTANCE_ID;
+import static org.apache.dolphinscheduler.common.Constants.LOCAL_PARAMS;
import static org.apache.dolphinscheduler.common.Constants.YYYY_MM_DD_HH_MM_SS;
import static java.util.stream.Collectors.toSet;
@@ -32,6 +33,7 @@ import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.AuthorizationType;
import org.apache.dolphinscheduler.common.enums.CommandType;
import org.apache.dolphinscheduler.common.enums.CycleEnum;
+import org.apache.dolphinscheduler.common.enums.Direct;
import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
import org.apache.dolphinscheduler.common.enums.FailureStrategy;
import org.apache.dolphinscheduler.common.enums.Flag;
@@ -89,6 +91,7 @@ import java.util.Date;
import java.util.EnumMap;
import java.util.HashMap;
import java.util.HashSet;
+import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
@@ -103,6 +106,8 @@ import org.springframework.stereotype.Component;
import org.springframework.transaction.annotation.Transactional;
import com.cronutils.model.Cron;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.node.ArrayNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
/**
@@ -161,10 +166,10 @@ public class ProcessService {
/**
* handle Command (construct ProcessInstance from Command) , wrapped in
transaction
*
- * @param logger logger
- * @param host host
+ * @param logger logger
+ * @param host host
* @param validThreadNum validThreadNum
- * @param command found command
+ * @param command found command
* @return process instance
*/
@Transactional(rollbackFor = Exception.class)
@@ -204,7 +209,7 @@ public class ProcessService {
/**
* set process waiting thread
*
- * @param command command
+ * @param command command
* @param processInstance processInstance
* @return process instance
*/
@@ -222,7 +227,7 @@ public class ProcessService {
/**
* check thread num
*
- * @param command command
+ * @param command command
* @param validThreadNum validThreadNum
* @return if thread is enough
*/
@@ -425,7 +430,7 @@ public class ProcessService {
* recursive query sub process definition id by parent id.
*
* @param parentId parentId
- * @param ids ids
+ * @param ids ids
*/
public void recurseFindSubProcessId(int parentId, List<Integer> ids) {
ProcessDefinition processDefinition =
processDefineMapper.selectById(parentId);
@@ -456,7 +461,7 @@ public class ProcessService {
* create recovery waiting thread command and delete origin command at
the same time.
* if the recovery command is exists, only update the field update_time
*
- * @param originCommand originCommand
+ * @param originCommand originCommand
* @param processInstance processInstance
*/
public void createRecoveryWaitingThreadCommand(Command originCommand,
ProcessInstance processInstance) {
@@ -508,7 +513,7 @@ public class ProcessService {
/**
* get schedule time from command
*
- * @param command command
+ * @param command command
* @param cmdParam cmdParam map
* @return date
*/
@@ -524,8 +529,8 @@ public class ProcessService {
* generate a new work process instance from command.
*
* @param processDefinition processDefinition
- * @param command command
- * @param cmdParam cmdParam map
+ * @param command command
+ * @param cmdParam cmdParam map
* @return process instance
*/
private ProcessInstance generateNewProcessInstance(ProcessDefinition
processDefinition,
@@ -601,7 +606,7 @@ public class ProcessService {
* use definition creator's tenant.
*
* @param tenantId tenantId
- * @param userId userId
+ * @param userId userId
* @return tenant
*/
public Tenant getTenantForProcess(int tenantId, int userId) {
@@ -624,7 +629,7 @@ public class ProcessService {
/**
* check command parameters is valid
*
- * @param command command
+ * @param command command
* @param cmdParam cmdParam map
* @return whether command param is valid
*/
@@ -644,7 +649,7 @@ public class ProcessService {
* construct process instance according to one command.
*
* @param command command
- * @param host host
+ * @param host host
* @return process instance
*/
private ProcessInstance constructProcessInstance(Command command, String
host) {
@@ -686,11 +691,6 @@ public class ProcessService {
} else {
processInstance =
this.findProcessInstanceDetailById(processInstanceId);
// Recalculate global parameters after rerun.
-
processInstance.setGlobalParams(ParameterUtils.curingGlobalParams(
- processDefinition.getGlobalParamMap(),
- processDefinition.getGlobalParamList(),
- getCommandTypeIfComplement(processInstance, command),
- processInstance.getScheduleTime()));
}
processDefinition =
processDefineMapper.selectById(processInstance.getProcessDefinitionId());
processInstance.setProcessDefinition(processDefinition);
@@ -807,7 +807,7 @@ public class ProcessService {
* return complement data if the process start with complement data
*
* @param processInstance processInstance
- * @param command command
+ * @param command command
* @return command type
*/
private CommandType getCommandTypeIfComplement(ProcessInstance
processInstance, Command command) {
@@ -822,8 +822,8 @@ public class ProcessService {
* initialize complement data parameters
*
* @param processDefinition processDefinition
- * @param processInstance processInstance
- * @param cmdParam cmdParam
+ * @param processInstance processInstance
+ * @param cmdParam cmdParam
*/
private void initComplementDataParam(ProcessDefinition processDefinition,
ProcessInstance processInstance,
@@ -895,7 +895,7 @@ public class ProcessService {
* only the keys doesn't in sub process global would be joined.
*
* @param parentGlobalParams parentGlobalParams
- * @param subGlobalParams subGlobalParams
+ * @param subGlobalParams subGlobalParams
* @return global params join
*/
private String joinGlobalParams(String parentGlobalParams, String
subGlobalParams) {
@@ -965,7 +965,7 @@ public class ProcessService {
* set map {parent instance id, task instance id, 0(child instance id)}
*
* @param parentInstance parentInstance
- * @param parentTask parentTask
+ * @param parentTask parentTask
* @return process instance map
*/
private ProcessInstanceMap setProcessInstanceMap(ProcessInstance
parentInstance, TaskInstance parentTask) {
@@ -994,7 +994,7 @@ public class ProcessService {
* find previous task work process map.
*
* @param parentProcessInstance parentProcessInstance
- * @param parentTask parentTask
+ * @param parentTask parentTask
* @return process instance map
*/
private ProcessInstanceMap findPreviousTaskProcessMap(ProcessInstance
parentProcessInstance,
@@ -1020,7 +1020,7 @@ public class ProcessService {
* create sub work process command
*
* @param parentProcessInstance parentProcessInstance
- * @param task task
+ * @param task task
*/
public void createSubWorkProcess(ProcessInstance parentProcessInstance,
TaskInstance task) {
if (!task.isSubProcess()) {
@@ -1119,7 +1119,7 @@ public class ProcessService {
* update sub process definition
*
* @param parentProcessInstance parentProcessInstance
- * @param childDefinitionId childDefinitionId
+ * @param childDefinitionId childDefinitionId
*/
private void updateSubProcessDefinitionByParent(ProcessInstance
parentProcessInstance, int childDefinitionId) {
ProcessDefinition fatherDefinition =
this.findProcessDefineById(parentProcessInstance.getProcessDefinitionId());
@@ -1133,7 +1133,7 @@ public class ProcessService {
/**
* submit task to mysql
*
- * @param taskInstance taskInstance
+ * @param taskInstance taskInstance
* @param processInstance processInstance
* @return task instance
*/
@@ -1187,16 +1187,16 @@ public class ProcessService {
* return stop if work process state is ready stop
* if all of above are not satisfied, return submit success
*
- * @param taskInstance taskInstance
+ * @param taskInstance taskInstance
* @param processInstanceState processInstanceState
* @return process instance state
*/
public ExecutionStatus getSubmitTaskState(TaskInstance taskInstance,
ExecutionStatus processInstanceState) {
ExecutionStatus state = taskInstance.getState();
+ // running, delayed or killed
+ // the task already exists in task queue
+ // return state
if (
- // running, delayed or killed
- // the task already exists in task queue
- // return state
state == ExecutionStatus.RUNNING_EXECUTION
|| state == ExecutionStatus.DELAY_EXECUTION
|| state == ExecutionStatus.KILL
@@ -1363,7 +1363,7 @@ public class ProcessService {
* get id list by task state
*
* @param instanceId instanceId
- * @param state state
+ * @param state state
* @return task instance states
*/
public List<Integer> findTaskIdByInstanceState(int instanceId,
ExecutionStatus state) {
@@ -1418,7 +1418,7 @@ public class ProcessService {
* find work process map by parent process id and parent task id.
*
* @param parentWorkProcessId parentWorkProcessId
- * @param parentTaskId parentTaskId
+ * @param parentTaskId parentTaskId
* @return process instance map
*/
public ProcessInstanceMap findWorkProcessMapByParent(Integer
parentWorkProcessId, Integer parentTaskId) {
@@ -1440,7 +1440,7 @@ public class ProcessService {
* find sub process instance
*
* @param parentProcessId parentProcessId
- * @param parentTaskId parentTaskId
+ * @param parentTaskId parentTaskId
* @return process instance
*/
public ProcessInstance findSubProcessInstance(Integer parentProcessId,
Integer parentTaskId) {
@@ -1472,12 +1472,12 @@ public class ProcessService {
/**
* change task state
*
- * @param state state
- * @param startTime startTime
- * @param host host
+ * @param state state
+ * @param startTime startTime
+ * @param host host
* @param executePath executePath
- * @param logPath logPath
- * @param taskInstId taskInstId
+ * @param logPath logPath
+ * @param taskInstId taskInstId
*/
public void changeTaskState(TaskInstance taskInstance, ExecutionStatus
state, Date startTime, String host,
String executePath,
@@ -1505,12 +1505,12 @@ public class ProcessService {
* update the process instance
*
* @param processInstanceId processInstanceId
- * @param processJson processJson
- * @param globalParams globalParams
- * @param scheduleTime scheduleTime
- * @param flag flag
- * @param locations locations
- * @param connects connects
+ * @param processJson processJson
+ * @param globalParams globalParams
+ * @param scheduleTime scheduleTime
+ * @param flag flag
+ * @param locations locations
+ * @param connects connects
* @return update process instance result
*/
public int updateProcessInstance(Integer processInstanceId, String
processJson,
@@ -1531,25 +1531,85 @@ public class ProcessService {
/**
* change task state
*
- * @param state state
- * @param endTime endTime
+ * @param state state
+ * @param endTime endTime
* @param taskInstId taskInstId
- * @param varPool varPool
+ * @param varPool varPool
*/
public void changeTaskState(TaskInstance taskInstance, ExecutionStatus
state,
Date endTime,
int processId,
String appIds,
int taskInstId,
- String varPool) {
+ String varPool,
+ String result) {
taskInstance.setPid(processId);
taskInstance.setAppLink(appIds);
taskInstance.setState(state);
taskInstance.setEndTime(endTime);
taskInstance.setVarPool(varPool);
+ changeOutParam(result, taskInstance);
saveTaskInstance(taskInstance);
}
+ public void changeOutParam(String result, TaskInstance taskInstance) {
+ if (StringUtils.isEmpty(result)) {
+ return;
+ }
+ List<Map<String, String>> workerResultParam =
getListMapByString(result);
+ if (CollectionUtils.isEmpty(workerResultParam)) {
+ return;
+ }
+ //if the result more than one line,just get the first .
+ Map<String, String> row = workerResultParam.get(0);
+ if (row == null || row.size() == 0) {
+ return;
+ }
+ TaskNode taskNode = JSONUtils.parseObject(taskInstance.getTaskJson(),
TaskNode.class);
+ Map<String, Object> taskParams = JSONUtils.toMap(taskNode.getParams(),
String.class, Object.class);
+ Object localParams = taskParams.get(LOCAL_PARAMS);
+ if (localParams == null) {
+ return;
+ }
+ ProcessInstance processInstance =
this.processInstanceMapper.queryDetailById(taskInstance.getProcessInstanceId());
+ List<Property> params4Process =
JSONUtils.toList(processInstance.getGlobalParams(), Property.class);
+ Map<String, Property> allParamMap =
params4Process.stream().collect(Collectors.toMap(Property::getProp, Property ->
Property));
+
+ List<Property> allParam =
JSONUtils.toList(JSONUtils.toJsonString(localParams), Property.class);
+ for (Property info : allParam) {
+ if (info.getDirect() == Direct.OUT) {
+ String paramName = info.getProp();
+ Property property = allParamMap.get(paramName);
+ if (property == null) {
+ continue;
+ }
+ String value = row.get(paramName);
+ if (StringUtils.isNotEmpty(value)) {
+ property.setValue(value);
+ info.setValue(value);
+ }
+ }
+ }
+ taskParams.put(LOCAL_PARAMS, allParam);
+ taskNode.setParams(JSONUtils.toJsonString(taskParams));
+ // task instance node json
+ taskInstance.setTaskJson(JSONUtils.toJsonString(taskNode));
+ String params4ProcessString = JSONUtils.toJsonString(params4Process);
+ int updateCount =
this.processInstanceMapper.updateGlobalParamById(params4ProcessString,
processInstance.getId());
+ logger.info("updateCount:{}, params4Process:{}, processInstanceId:{}",
updateCount, params4ProcessString, processInstance.getId());
+ }
+
+ public List<Map<String, String>> getListMapByString(String json) {
+ List<Map<String, String>> allParams = new ArrayList<>();
+ ArrayNode paramsByJson = JSONUtils.parseArray(json);
+ Iterator<JsonNode> listIterator = paramsByJson.iterator();
+ while (listIterator.hasNext()) {
+ Map<String, String> param =
JSONUtils.toMap(listIterator.next().toString(), String.class, String.class);
+ allParams.add(param);
+ }
+ return allParams;
+ }
+
/**
* convert integer list to string list
*
@@ -1642,7 +1702,7 @@ public class ProcessService {
* update process instance state by id
*
* @param processInstanceId processInstanceId
- * @param executionStatus executionStatus
+ * @param executionStatus executionStatus
* @return update process result
*/
public int updateProcessInstanceState(Integer processInstanceId,
ExecutionStatus executionStatus) {
@@ -1679,7 +1739,7 @@ public class ProcessService {
/**
* find tenant code by resource name
*
- * @param resName resource name
+ * @param resName resource name
* @param resourceType resource type
* @return tenant code
*/
@@ -1703,9 +1763,9 @@ public class ProcessService {
/**
* get dependency cycle by work process define id and scheduler fire time
*
- * @param masterId masterId
+ * @param masterId masterId
* @param processDefinitionId processDefinitionId
- * @param scheduledFireTime the time the task schedule is expected to
trigger
+ * @param scheduledFireTime the time the task schedule is expected to
trigger
* @return CycleDependency
* @throws Exception if error throws Exception
*/
@@ -1718,8 +1778,8 @@ public class ProcessService {
/**
* get dependency cycle list by work process define id list and scheduler
fire time
*
- * @param masterId masterId
- * @param ids ids
+ * @param masterId masterId
+ * @param ids ids
* @param scheduledFireTime the time the task schedule is expected to
trigger
* @return CycleDependency list
* @throws Exception if error throws Exception
@@ -1814,8 +1874,8 @@ public class ProcessService {
* find last running process instance
*
* @param definitionId process definition id
- * @param startTime start time
- * @param endTime end time
+ * @param startTime start time
+ * @param endTime end time
* @return process instance
*/
public ProcessInstance findLastRunningProcess(int definitionId, Date
startTime, Date endTime) {
@@ -1915,7 +1975,7 @@ public class ProcessService {
/**
* list unauthorized udf function
*
- * @param userId user id
+ * @param userId user id
* @param needChecks data source id array
* @return unauthorized udf function list
*/
diff --git
a/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/process/ProcessServiceTest.java
b/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/process/ProcessServiceTest.java
index 551c9bb..7eec366 100644
---
a/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/process/ProcessServiceTest.java
+++
b/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/process/ProcessServiceTest.java
@@ -442,4 +442,26 @@ public class ProcessServiceTest {
Assert.assertEquals(expect,
processService.changeJson(newProcessData,oldJson));
}
+
+ @Test
+ public void testChangeOutParam() {
+ String result = "[{\"d\":\"20210203\"}]";
+ TaskInstance taskInstance = new TaskInstance();
+ taskInstance.setProcessInstanceId(62);
+
taskInstance.setTaskJson("{\"id\":\"tasks-86175\",\"name\":\"wew\",\"desc\":null,\"type\":\"SHELL\",\"runFlag\":\"NORMAL\",\"loc\":null,\"maxRetryTimes\":0,"
+ + "\"retryInterval\":1,\"params\":{\"rawScript\":\"echo
20210203\",\"localParams\":[{\"prop\":\"d\",\"direct\":\"OUT\",\"type\":\"VARCHAR\",\"value\":\"\"}],"
+ +
"\"resourceList\":[]},\"preTasks\":[],\"extras\":null,\"depList\":[],\"dependence\":{},\"conditionResult\":{\"successNode\":[\"\"],\"failedNode\":[\"\"]},"
+ +
"\"taskInstancePriority\":\"MEDIUM\",\"workerGroup\":\"default\",\"workerGroupId\":null,"
+ +
"\"timeout\":{\"strategy\":\"\",\"interval\":null,\"enable\":false},\"delayTime\":0}");
+ ProcessInstance processInstance = new ProcessInstance();
+ processInstance.setId(62);
+
processInstance.setGlobalParams("[{\"prop\":\"sql2\",\"direct\":null,\"type\":null,\"value\":\"\"},{\"prop\":\"out\",\"direct\":null,\"type\":null,\"value\":\"\"},"
+ +
"{\"prop\":\"d\",\"direct\":\"IN\",\"type\":\"VARCHAR\",\"value\":\"\"}]");
+ String params4ProcessString =
"[{\"prop\":\"sql2\",\"direct\":null,\"type\":null,\"value\":\"\"},{\"prop\":\"out\",\"direct\":null,\"type\":null,\"value\":\"\"},"
+ +
"{\"prop\":\"d\",\"direct\":\"IN\",\"type\":\"VARCHAR\",\"value\":\"20210203\"}]";
+
Mockito.when(processInstanceMapper.queryDetailById(taskInstance.getProcessInstanceId())).thenReturn(processInstance);
+
Mockito.when(this.processInstanceMapper.updateGlobalParamById(params4ProcessString,
processInstance.getId())).thenReturn(1);
+ processService.changeOutParam(result,taskInstance);
+ }
+
}