This is an automated email from the ASF dual-hosted git repository.
leonbao pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git
The following commit(s) were added to refs/heads/dev by this push:
new b285ccf930 [Future-9396]Support output parameters transfer from parent
workflow to child work flow (#9410)
b285ccf930 is described below
commit b285ccf9306cc5e402a184dd24da82ca7a08689c
Author: caishunfeng <[email protected]>
AuthorDate: Mon Apr 11 20:03:16 2022 +0800
[Future-9396]Support output parameters transfer from parent workflow to
child work flow (#9410)
* [Future-9396]Support output parameters transfer from parent workflow to
child work flow
* fix note
---
.../master/runner/WorkflowExecuteThread.java | 54 ++--
.../service/process/ProcessServiceImpl.java | 313 +++++++++++----------
.../task/api/parameters/AbstractParameters.java | 16 +-
3 files changed, 209 insertions(+), 174 deletions(-)
diff --git
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThread.java
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThread.java
index 9c1aa90c79..75a621bd43 100644
---
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThread.java
+++
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThread.java
@@ -17,10 +17,14 @@
package org.apache.dolphinscheduler.server.master.runner;
-import com.google.common.collect.Lists;
-import org.apache.commons.collections.CollectionUtils;
-import org.apache.commons.lang.StringUtils;
-import org.apache.commons.lang.math.NumberUtils;
+import static
org.apache.dolphinscheduler.common.Constants.CMDPARAM_COMPLEMENT_DATA_END_DATE;
+import static
org.apache.dolphinscheduler.common.Constants.CMDPARAM_COMPLEMENT_DATA_START_DATE;
+import static
org.apache.dolphinscheduler.common.Constants.CMD_PARAM_RECOVERY_START_NODE_STRING;
+import static
org.apache.dolphinscheduler.common.Constants.CMD_PARAM_RECOVER_PROCESS_ID_STRING;
+import static
org.apache.dolphinscheduler.common.Constants.CMD_PARAM_START_NODES;
+import static
org.apache.dolphinscheduler.common.Constants.DEFAULT_WORKER_GROUP;
+import static
org.apache.dolphinscheduler.plugin.task.api.TaskConstants.TASK_TYPE_BLOCKING;
+
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.CommandType;
import org.apache.dolphinscheduler.common.enums.FailureStrategy;
@@ -67,8 +71,10 @@ import
org.apache.dolphinscheduler.service.alert.ProcessAlertManager;
import org.apache.dolphinscheduler.service.process.ProcessService;
import org.apache.dolphinscheduler.service.quartz.cron.CronUtils;
import org.apache.dolphinscheduler.service.queue.PeerTaskInstancePriorityQueue;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.commons.lang.StringUtils;
+import org.apache.commons.lang.math.NumberUtils;
import java.util.ArrayList;
import java.util.Arrays;
@@ -86,13 +92,10 @@ import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
-import static
org.apache.dolphinscheduler.common.Constants.CMDPARAM_COMPLEMENT_DATA_END_DATE;
-import static
org.apache.dolphinscheduler.common.Constants.CMDPARAM_COMPLEMENT_DATA_START_DATE;
-import static
org.apache.dolphinscheduler.common.Constants.CMD_PARAM_RECOVERY_START_NODE_STRING;
-import static
org.apache.dolphinscheduler.common.Constants.CMD_PARAM_RECOVER_PROCESS_ID_STRING;
-import static
org.apache.dolphinscheduler.common.Constants.CMD_PARAM_START_NODES;
-import static
org.apache.dolphinscheduler.common.Constants.DEFAULT_WORKER_GROUP;
-import static
org.apache.dolphinscheduler.plugin.task.api.TaskConstants.TASK_TYPE_BLOCKING;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.collect.Lists;
/**
* master exec thread,split dag
@@ -447,6 +450,7 @@ public class WorkflowExecuteThread {
if (taskInstance.getState().typeIsSuccess()) {
completeTaskMap.put(taskInstance.getTaskCode(),
taskInstance.getId());
+ processInstance.setVarPool(taskInstance.getVarPool());
processService.saveProcessInstance(processInstance);
if (!processInstance.isBlocked()) {
submitPostNode(Long.toString(taskInstance.getTaskCode()));
@@ -1210,6 +1214,10 @@ public class WorkflowExecuteThread {
if (allProperty.size() > 0) {
taskInstance.setVarPool(JSONUtils.toJsonString(allProperty.values()));
}
+ } else {
+ if (StringUtils.isNotEmpty(processInstance.getVarPool())) {
+ taskInstance.setVarPool(processInstance.getVarPool());
+ }
}
}
@@ -1279,19 +1287,19 @@ public class WorkflowExecuteThread {
taskInstances.add(task);
}
//the end node of the branch of the dag
- if (StringUtils.isNotEmpty(parentNodeCode) &&
dag.getEndNode().contains(parentNodeCode)){
+ if (StringUtils.isNotEmpty(parentNodeCode) &&
dag.getEndNode().contains(parentNodeCode)) {
TaskInstance endTaskInstance =
taskInstanceMap.get(completeTaskMap.get(NumberUtils.toLong(parentNodeCode)));
String taskInstanceVarPool = endTaskInstance.getVarPool();
- if(StringUtils.isNotEmpty(taskInstanceVarPool)) {
+ if (StringUtils.isNotEmpty(taskInstanceVarPool)) {
Set<Property> taskProperties =
JSONUtils.toList(taskInstanceVarPool, Property.class)
- .stream().collect(Collectors.toSet());
+ .stream().collect(Collectors.toSet());
String processInstanceVarPool = processInstance.getVarPool();
if (StringUtils.isNotEmpty(processInstanceVarPool)) {
Set<Property> properties =
JSONUtils.toList(processInstanceVarPool, Property.class)
- .stream().collect(Collectors.toSet());
+ .stream().collect(Collectors.toSet());
properties.addAll(taskProperties);
processInstance.setVarPool(JSONUtils.toJsonString(properties));
- }else{
+ } else {
processInstance.setVarPool(JSONUtils.toJsonString(taskProperties));
}
}
@@ -1637,7 +1645,7 @@ public class WorkflowExecuteThread {
stateEvent.setExecutionStatus(processInstance.getState());
stateEvent.setProcessInstanceId(this.processInstance.getId());
stateEvent.setType(StateEventType.PROCESS_STATE_CHANGE);
-// this.processStateChangeHandler(stateEvent);
+ // this.processStateChangeHandler(stateEvent);
// replace with `stateEvents`, make sure `WorkflowExecuteThread`
can be deleted to avoid memory leaks
this.stateEvents.add(stateEvent);
}
@@ -1650,10 +1658,10 @@ public class WorkflowExecuteThread {
ExecutionStatus state = stateEvent.getExecutionStatus();
if (processInstance.getState() != state) {
logger.info(
- "work flow process instance [id: {}, name:{}], state
change from {} to {}, cmd type: {}",
- processInstance.getId(), processInstance.getName(),
- processInstance.getState(), state,
- processInstance.getCommandType());
+ "work flow process instance [id: {}, name:{}], state change
from {} to {}, cmd type: {}",
+ processInstance.getId(), processInstance.getName(),
+ processInstance.getState(), state,
+ processInstance.getCommandType());
processInstance.setState(state);
if (state.typeIsFinished()) {
diff --git
a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java
b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java
index e6d03a8aaa..7fee2416df 100644
---
a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java
+++
b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java
@@ -17,13 +17,18 @@
package org.apache.dolphinscheduler.service.process;
-import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
-import com.fasterxml.jackson.core.type.TypeReference;
-import com.fasterxml.jackson.databind.node.ObjectNode;
-import com.google.common.collect.Lists;
-import org.apache.commons.collections.CollectionUtils;
-import org.apache.commons.lang.StringUtils;
-import org.apache.commons.lang.math.NumberUtils;
+import static java.util.stream.Collectors.toSet;
+import static
org.apache.dolphinscheduler.common.Constants.CMDPARAM_COMPLEMENT_DATA_END_DATE;
+import static
org.apache.dolphinscheduler.common.Constants.CMDPARAM_COMPLEMENT_DATA_START_DATE;
+import static
org.apache.dolphinscheduler.common.Constants.CMD_PARAM_EMPTY_SUB_PROCESS;
+import static
org.apache.dolphinscheduler.common.Constants.CMD_PARAM_FATHER_PARAMS;
+import static
org.apache.dolphinscheduler.common.Constants.CMD_PARAM_RECOVER_PROCESS_ID_STRING;
+import static
org.apache.dolphinscheduler.common.Constants.CMD_PARAM_SUB_PROCESS;
+import static
org.apache.dolphinscheduler.common.Constants.CMD_PARAM_SUB_PROCESS_DEFINE_CODE;
+import static
org.apache.dolphinscheduler.common.Constants.CMD_PARAM_SUB_PROCESS_PARENT_INSTANCE_ID;
+import static org.apache.dolphinscheduler.common.Constants.LOCAL_PARAMS;
+import static
org.apache.dolphinscheduler.plugin.task.api.utils.DataQualityConstants.TASK_INSTANCE_ID;
+
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.AuthorizationType;
import org.apache.dolphinscheduler.common.enums.CommandType;
@@ -124,11 +129,10 @@ import
org.apache.dolphinscheduler.service.log.LogClientService;
import org.apache.dolphinscheduler.service.quartz.cron.CronUtils;
import org.apache.dolphinscheduler.service.task.TaskPluginManager;
import org.apache.dolphinscheduler.spi.enums.ResourceType;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.stereotype.Component;
-import org.springframework.transaction.annotation.Transactional;
+
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.commons.lang.StringUtils;
+import org.apache.commons.lang.math.NumberUtils;
import java.util.ArrayList;
import java.util.Arrays;
@@ -143,17 +147,16 @@ import java.util.Objects;
import java.util.Set;
import java.util.stream.Collectors;
-import static java.util.stream.Collectors.toSet;
-import static
org.apache.dolphinscheduler.common.Constants.CMDPARAM_COMPLEMENT_DATA_END_DATE;
-import static
org.apache.dolphinscheduler.common.Constants.CMDPARAM_COMPLEMENT_DATA_START_DATE;
-import static
org.apache.dolphinscheduler.common.Constants.CMD_PARAM_EMPTY_SUB_PROCESS;
-import static
org.apache.dolphinscheduler.common.Constants.CMD_PARAM_FATHER_PARAMS;
-import static
org.apache.dolphinscheduler.common.Constants.CMD_PARAM_RECOVER_PROCESS_ID_STRING;
-import static
org.apache.dolphinscheduler.common.Constants.CMD_PARAM_SUB_PROCESS;
-import static
org.apache.dolphinscheduler.common.Constants.CMD_PARAM_SUB_PROCESS_DEFINE_CODE;
-import static
org.apache.dolphinscheduler.common.Constants.CMD_PARAM_SUB_PROCESS_PARENT_INSTANCE_ID;
-import static org.apache.dolphinscheduler.common.Constants.LOCAL_PARAMS;
-import static
org.apache.dolphinscheduler.plugin.task.api.utils.DataQualityConstants.TASK_INSTANCE_ID;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Component;
+import org.springframework.transaction.annotation.Transactional;
+
+import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import com.google.common.collect.Lists;
/**
* process relative dao that some mappers in this.
@@ -163,12 +166,12 @@ public class ProcessServiceImpl implements ProcessService
{
private final Logger logger = LoggerFactory.getLogger(getClass());
- private final int[] stateArray = new
int[]{ExecutionStatus.SUBMITTED_SUCCESS.ordinal(),
- ExecutionStatus.DISPATCH.ordinal(),
- ExecutionStatus.RUNNING_EXECUTION.ordinal(),
- ExecutionStatus.DELAY_EXECUTION.ordinal(),
- ExecutionStatus.READY_PAUSE.ordinal(),
- ExecutionStatus.READY_STOP.ordinal()};
+ private final int[] stateArray = new int[]
{ExecutionStatus.SUBMITTED_SUCCESS.ordinal(),
+ ExecutionStatus.DISPATCH.ordinal(),
+ ExecutionStatus.RUNNING_EXECUTION.ordinal(),
+ ExecutionStatus.DELAY_EXECUTION.ordinal(),
+ ExecutionStatus.READY_PAUSE.ordinal(),
+ ExecutionStatus.READY_STOP.ordinal()};
@Autowired
private UserMapper userMapper;
@@ -309,7 +312,7 @@ public class ProcessServiceImpl implements ProcessService {
if (processDefinition.getExecutionType().typeIsSerialWait()) {
while (true) {
List<ProcessInstance> runningProcessInstances =
this.processInstanceMapper.queryByProcessDefineCodeAndStatusAndNextId(processInstance.getProcessDefinitionCode(),
- Constants.RUNNING_PROCESS_STATE,
processInstance.getId());
+ Constants.RUNNING_PROCESS_STATE, processInstance.getId());
if (CollectionUtils.isEmpty(runningProcessInstances)) {
processInstance.setState(ExecutionStatus.SUBMITTED_SUCCESS);
saveProcessInstance(processInstance);
@@ -322,14 +325,14 @@ public class ProcessServiceImpl implements ProcessService
{
}
} else if (processDefinition.getExecutionType().typeIsSerialDiscard())
{
List<ProcessInstance> runningProcessInstances =
this.processInstanceMapper.queryByProcessDefineCodeAndStatusAndNextId(processInstance.getProcessDefinitionCode(),
- Constants.RUNNING_PROCESS_STATE, processInstance.getId());
+ Constants.RUNNING_PROCESS_STATE, processInstance.getId());
if (CollectionUtils.isEmpty(runningProcessInstances)) {
processInstance.setState(ExecutionStatus.STOP);
saveProcessInstance(processInstance);
}
} else if
(processDefinition.getExecutionType().typeIsSerialPriority()) {
List<ProcessInstance> runningProcessInstances =
this.processInstanceMapper.queryByProcessDefineCodeAndStatusAndNextId(processInstance.getProcessDefinitionCode(),
- Constants.RUNNING_PROCESS_STATE, processInstance.getId());
+ Constants.RUNNING_PROCESS_STATE, processInstance.getId());
if (CollectionUtils.isNotEmpty(runningProcessInstances)) {
for (ProcessInstance info : runningProcessInstances) {
info.setCommandType(CommandType.STOP);
@@ -342,7 +345,7 @@ public class ProcessServiceImpl implements ProcessService {
String address = host.split(":")[0];
int port = Integer.parseInt(host.split(":")[1]);
StateEventChangeCommand stateEventChangeCommand = new
StateEventChangeCommand(
- info.getId(), 0, info.getState(),
info.getId(), 0
+ info.getId(), 0, info.getState(), info.getId(), 0
);
try {
stateEventCallbackService.sendResult(address,
port, stateEventChangeCommand.convert2Command());
@@ -662,21 +665,21 @@ public class ProcessServiceImpl implements ProcessService
{
// process instance quit by "waiting thread" state
if (originCommand == null) {
Command command = new Command(
- CommandType.RECOVER_WAITING_THREAD,
- processInstance.getTaskDependType(),
- processInstance.getFailureStrategy(),
- processInstance.getExecutorId(),
- processInstance.getProcessDefinition().getCode(),
- JSONUtils.toJsonString(cmdParam),
- processInstance.getWarningType(),
- processInstance.getWarningGroupId(),
- processInstance.getScheduleTime(),
- processInstance.getWorkerGroup(),
- processInstance.getEnvironmentCode(),
- processInstance.getProcessInstancePriority(),
- processInstance.getDryRun(),
- processInstance.getId(),
- processInstance.getProcessDefinitionVersion()
+ CommandType.RECOVER_WAITING_THREAD,
+ processInstance.getTaskDependType(),
+ processInstance.getFailureStrategy(),
+ processInstance.getExecutorId(),
+ processInstance.getProcessDefinition().getCode(),
+ JSONUtils.toJsonString(cmdParam),
+ processInstance.getWarningType(),
+ processInstance.getWarningGroupId(),
+ processInstance.getScheduleTime(),
+ processInstance.getWorkerGroup(),
+ processInstance.getEnvironmentCode(),
+ processInstance.getProcessInstancePriority(),
+ processInstance.getDryRun(),
+ processInstance.getId(),
+ processInstance.getProcessDefinitionVersion()
);
saveCommand(command);
return;
@@ -708,8 +711,8 @@ public class ProcessServiceImpl implements ProcessService {
private Date getScheduleTime(Command command, Map<String, String>
cmdParam) {
Date scheduleTime = command.getScheduleTime();
if (scheduleTime == null
- && cmdParam != null
- && cmdParam.containsKey(CMDPARAM_COMPLEMENT_DATA_START_DATE)) {
+ && cmdParam != null
+ && cmdParam.containsKey(CMDPARAM_COMPLEMENT_DATA_START_DATE)) {
Date start =
DateUtils.stringToDate(cmdParam.get(CMDPARAM_COMPLEMENT_DATA_START_DATE));
Date end =
DateUtils.stringToDate(cmdParam.get(CMDPARAM_COMPLEMENT_DATA_END_DATE));
@@ -720,7 +723,7 @@ public class ProcessServiceImpl implements ProcessService {
scheduleTime = complementDateList.get(0);
} else {
logger.error("set scheduler time error: complement date list
is empty, command: {}",
- command.toString());
+ command.toString());
}
}
return scheduleTime;
@@ -769,10 +772,10 @@ public class ProcessServiceImpl implements ProcessService
{
// curing global params
processInstance.setGlobalParams(ParameterUtils.curingGlobalParams(
- processDefinition.getGlobalParamMap(),
- processDefinition.getGlobalParamList(),
- getCommandTypeIfComplement(processInstance, command),
- processInstance.getScheduleTime()));
+ processDefinition.getGlobalParamMap(),
+ processDefinition.getGlobalParamList(),
+ getCommandTypeIfComplement(processInstance, command),
+ processInstance.getScheduleTime()));
// set process instance priority
processInstance.setProcessInstancePriority(command.getProcessInstancePriority());
@@ -799,7 +802,7 @@ public class ProcessServiceImpl implements ProcessService {
startParamMap.putAll(fatherParamMap);
// set start param into global params
if (startParamMap.size() > 0
- && processDefinition.getGlobalParamMap() != null) {
+ && processDefinition.getGlobalParamMap() != null) {
for (Map.Entry<String, String> param :
processDefinition.getGlobalParamMap().entrySet()) {
String val = startParamMap.get(param.getKey());
if (val != null) {
@@ -863,8 +866,8 @@ public class ProcessServiceImpl implements ProcessService {
private Boolean checkCmdParam(Command command, Map<String, String>
cmdParam) {
if (command.getTaskDependType() == TaskDependType.TASK_ONLY ||
command.getTaskDependType() == TaskDependType.TASK_PRE) {
if (cmdParam == null
- || !cmdParam.containsKey(Constants.CMD_PARAM_START_NODES)
- ||
cmdParam.get(Constants.CMD_PARAM_START_NODES).isEmpty()) {
+ || !cmdParam.containsKey(Constants.CMD_PARAM_START_NODES)
+ || cmdParam.get(Constants.CMD_PARAM_START_NODES).isEmpty()) {
logger.error("command node depend type is {}, but start nodes
is null ", command.getTaskDependType());
return false;
}
@@ -908,10 +911,10 @@ public class ProcessServiceImpl implements ProcessService
{
// Recalculate global parameters after rerun.
processInstance.setGlobalParams(ParameterUtils.curingGlobalParams(
- processDefinition.getGlobalParamMap(),
- processDefinition.getGlobalParamList(),
- commandTypeIfComplement,
- processInstance.getScheduleTime()));
+ processDefinition.getGlobalParamMap(),
+ processDefinition.getGlobalParamList(),
+ commandTypeIfComplement,
+ processInstance.getScheduleTime()));
processInstance.setProcessDefinition(processDefinition);
}
//reset command parameter
@@ -954,7 +957,7 @@ public class ProcessServiceImpl implements ProcessService {
initTaskInstance(this.findTaskInstanceById(taskId));
}
cmdParam.put(Constants.CMD_PARAM_RECOVERY_START_NODE_STRING,
- String.join(Constants.COMMA,
convertIntListToString(failedList)));
+ String.join(Constants.COMMA,
convertIntListToString(failedList)));
processInstance.setCommandParam(JSONUtils.toJsonString(cmdParam));
processInstance.setRunTimes(runTime + 1);
break;
@@ -967,7 +970,7 @@ public class ProcessServiceImpl implements ProcessService {
cmdParam.remove(Constants.CMD_PARAM_RECOVERY_START_NODE_STRING);
List<Integer> suspendedNodeList =
this.findTaskIdByInstanceState(processInstance.getId(), ExecutionStatus.PAUSE);
List<Integer> stopNodeList =
findTaskIdByInstanceState(processInstance.getId(),
- ExecutionStatus.KILL);
+ ExecutionStatus.KILL);
suspendedNodeList.addAll(stopNodeList);
for (Integer taskId : suspendedNodeList) {
// initialize the pause state
@@ -1044,7 +1047,7 @@ public class ProcessServiceImpl implements ProcessService
{
}
return processDefineLogMapper.queryByDefinitionCodeAndVersion(
- processInstance.getProcessDefinitionCode(),
processInstance.getProcessDefinitionVersion());
+ processInstance.getProcessDefinitionCode(),
processInstance.getProcessDefinitionVersion());
}
}
@@ -1086,13 +1089,13 @@ public class ProcessServiceImpl implements
ProcessService {
List<Date> complementDate = CronUtils.getSelfFireDateList(start, end,
listSchedules);
if (complementDate.size() > 0
- && Flag.NO == processInstance.getIsSubProcess()) {
+ && Flag.NO == processInstance.getIsSubProcess()) {
processInstance.setScheduleTime(complementDate.get(0));
}
processInstance.setGlobalParams(ParameterUtils.curingGlobalParams(
- processDefinition.getGlobalParamMap(),
- processDefinition.getGlobalParamList(),
- CommandType.COMPLEMENT_DATA,
processInstance.getScheduleTime()));
+ processDefinition.getGlobalParamMap(),
+ processDefinition.getGlobalParamList(),
+ CommandType.COMPLEMENT_DATA, processInstance.getScheduleTime()));
}
/**
@@ -1111,7 +1114,7 @@ public class ProcessServiceImpl implements ProcessService
{
Map<String, String> paramMap = JSONUtils.toMap(cmdParam);
// write sub process id into cmd param.
if (paramMap.containsKey(CMD_PARAM_SUB_PROCESS)
- &&
CMD_PARAM_EMPTY_SUB_PROCESS.equals(paramMap.get(CMD_PARAM_SUB_PROCESS))) {
+ &&
CMD_PARAM_EMPTY_SUB_PROCESS.equals(paramMap.get(CMD_PARAM_SUB_PROCESS))) {
paramMap.remove(CMD_PARAM_SUB_PROCESS);
paramMap.put(CMD_PARAM_SUB_PROCESS,
String.valueOf(subProcessInstance.getId()));
subProcessInstance.setCommandParam(JSONUtils.toJsonString(paramMap));
@@ -1123,8 +1126,8 @@ public class ProcessServiceImpl implements ProcessService
{
if (StringUtils.isNotEmpty(parentInstanceId)) {
ProcessInstance parentInstance =
findProcessInstanceDetailById(Integer.parseInt(parentInstanceId));
if (parentInstance != null) {
- subProcessInstance.setGlobalParams(
- joinGlobalParams(parentInstance.getGlobalParams(),
subProcessInstance.getGlobalParams()));
+
subProcessInstance.setGlobalParams(joinGlobalParams(parentInstance.getGlobalParams(),
subProcessInstance.getGlobalParams()));
+
subProcessInstance.setVarPool(joinVarPool(parentInstance.getVarPool(),
subProcessInstance.getVarPool()));
this.saveProcessInstance(subProcessInstance);
} else {
logger.error("sub process command params error, cannot find
parent instance: {} ", cmdParam);
@@ -1162,11 +1165,31 @@ public class ProcessServiceImpl implements
ProcessService {
// e.g. the subProp's type is not equals with parent, or
subProp's direct is not equals with parent
// It's suggested to add node name in property, this kind of
problem can be solved.
List<Property> extraSubParams = subParams.stream()
- .filter(subProp ->
!parentParamKeys.contains(subProp.getProp())).collect(Collectors.toList());
+ .filter(subProp ->
!parentParamKeys.contains(subProp.getProp())).collect(Collectors.toList());
parentParams.addAll(extraSubParams);
return JSONUtils.toJsonString(parentParams);
}
+ /**
+ * join parent var pool params into sub process.
+ * only the keys doesn't in sub process global would be joined.
+ *
+ * @param parentValPool
+ * @param subValPool
+ * @return
+ */
+ private String joinVarPool(String parentValPool, String subValPool) {
+ List<Property> parentValPools =
Lists.newArrayList(JSONUtils.toList(parentValPool, Property.class));
+ parentValPools = parentValPools.stream().filter(valPool ->
valPool.getDirect() == Direct.OUT).collect(Collectors.toList());
+
+ List<Property> subValPools =
Lists.newArrayList(JSONUtils.toList(subValPool, Property.class));
+
+ Set<String> parentValPoolKeys =
parentValPools.stream().map(Property::getProp).collect(toSet());
+ List<Property> extraSubValPools = subValPools.stream().filter(sub ->
!parentValPoolKeys.contains(sub.getProp())).collect(Collectors.toList());
+ parentValPools.addAll(extraSubValPools);
+ return JSONUtils.toJsonString(parentValPools);
+ }
+
/**
* initialize task instance
*
@@ -1175,7 +1198,7 @@ public class ProcessServiceImpl implements ProcessService
{
private void initTaskInstance(TaskInstance taskInstance) {
if (!taskInstance.isSubProcess()
- && (taskInstance.getState().typeIsCancel() ||
taskInstance.getState().typeIsFailure())) {
+ && (taskInstance.getState().typeIsCancel() ||
taskInstance.getState().typeIsFailure())) {
taskInstance.setFlag(Flag.NO);
updateTaskInstance(taskInstance);
return;
@@ -1220,12 +1243,12 @@ public class ProcessServiceImpl implements
ProcessService {
@Transactional(rollbackFor = Exception.class)
public TaskInstance submitTask(ProcessInstance processInstance,
TaskInstance taskInstance) {
logger.info("start submit task : {}, instance id:{}, state: {}",
- taskInstance.getName(), taskInstance.getProcessInstanceId(),
processInstance.getState());
+ taskInstance.getName(), taskInstance.getProcessInstanceId(),
processInstance.getState());
//submit to db
TaskInstance task = submitTaskInstanceToDB(taskInstance,
processInstance);
if (task == null) {
logger.error("end submit task to db error, task name:{}, process
id:{} state: {} ",
- taskInstance.getName(), taskInstance.getProcessInstance(),
processInstance.getState());
+ taskInstance.getName(), taskInstance.getProcessInstance(),
processInstance.getState());
return null;
}
@@ -1234,7 +1257,7 @@ public class ProcessServiceImpl implements ProcessService
{
}
logger.info("end submit task to db successfully:{} {} state:{}
complete, instance id:{} state: {} ",
- taskInstance.getId(), taskInstance.getName(), task.getState(),
processInstance.getId(), processInstance.getState());
+ taskInstance.getId(), taskInstance.getName(), task.getState(),
processInstance.getId(), processInstance.getState());
return task;
}
@@ -1292,7 +1315,7 @@ public class ProcessServiceImpl implements ProcessService
{
}
}
logger.info("sub process instance is not found,parent task:{},parent
instance:{}",
- parentTask.getId(), parentProcessInstance.getId());
+ parentTask.getId(), parentProcessInstance.getId());
return null;
}
@@ -1390,21 +1413,21 @@ public class ProcessServiceImpl implements
ProcessService {
String processParam = getSubWorkFlowParam(instanceMap,
parentProcessInstance, fatherParams);
int subProcessInstanceId = childInstance == null ? 0 :
childInstance.getId();
return new Command(
- commandType,
- TaskDependType.TASK_POST,
- parentProcessInstance.getFailureStrategy(),
- parentProcessInstance.getExecutorId(),
- subProcessDefinition.getCode(),
- processParam,
- parentProcessInstance.getWarningType(),
- parentProcessInstance.getWarningGroupId(),
- parentProcessInstance.getScheduleTime(),
- task.getWorkerGroup(),
- task.getEnvironmentCode(),
- parentProcessInstance.getProcessInstancePriority(),
- parentProcessInstance.getDryRun(),
- subProcessInstanceId,
- subProcessDefinition.getVersion()
+ commandType,
+ TaskDependType.TASK_POST,
+ parentProcessInstance.getFailureStrategy(),
+ parentProcessInstance.getExecutorId(),
+ subProcessDefinition.getCode(),
+ processParam,
+ parentProcessInstance.getWarningType(),
+ parentProcessInstance.getWarningGroupId(),
+ parentProcessInstance.getScheduleTime(),
+ task.getWorkerGroup(),
+ task.getEnvironmentCode(),
+ parentProcessInstance.getProcessInstancePriority(),
+ parentProcessInstance.getDryRun(),
+ subProcessInstanceId,
+ subProcessDefinition.getVersion()
);
}
@@ -1441,7 +1464,7 @@ public class ProcessServiceImpl implements ProcessService
{
*/
private void updateSubProcessDefinitionByParent(ProcessInstance
parentProcessInstance, long childDefinitionCode) {
ProcessDefinition fatherDefinition =
this.findProcessDefinition(parentProcessInstance.getProcessDefinitionCode(),
- parentProcessInstance.getProcessDefinitionVersion());
+ parentProcessInstance.getProcessDefinitionVersion());
ProcessDefinition childDefinition =
this.findProcessDefinitionByCode(childDefinitionCode);
if (childDefinition != null && fatherDefinition != null) {
childDefinition.setWarningGroupId(fatherDefinition.getWarningGroupId());
@@ -1460,8 +1483,8 @@ public class ProcessServiceImpl implements ProcessService
{
public TaskInstance submitTaskInstanceToDB(TaskInstance taskInstance,
ProcessInstance processInstance) {
ExecutionStatus processInstanceState = processInstance.getState();
if (processInstanceState.typeIsFinished()
- || processInstanceState == ExecutionStatus.READY_PAUSE
- || processInstanceState == ExecutionStatus.READY_STOP) {
+ || processInstanceState == ExecutionStatus.READY_PAUSE
+ || processInstanceState == ExecutionStatus.READY_STOP) {
logger.warn("processInstance {} was {}, skip submit task",
processInstance.getProcessDefinitionCode(), processInstanceState);
return null;
}
@@ -1500,10 +1523,10 @@ public class ProcessServiceImpl implements
ProcessService {
// the task already exists in task queue
// return state
if (
- state == ExecutionStatus.RUNNING_EXECUTION
- || state == ExecutionStatus.DELAY_EXECUTION
- || state == ExecutionStatus.KILL
- || state == ExecutionStatus.DISPATCH
+ state == ExecutionStatus.RUNNING_EXECUTION
+ || state == ExecutionStatus.DELAY_EXECUTION
+ || state == ExecutionStatus.KILL
+ || state == ExecutionStatus.DISPATCH
) {
return state;
}
@@ -1512,7 +1535,7 @@ public class ProcessServiceImpl implements ProcessService
{
if (processInstance.getState() == ExecutionStatus.READY_PAUSE) {
state = ExecutionStatus.PAUSE;
} else if (processInstance.getState() == ExecutionStatus.READY_STOP
- || !checkProcessStrategy(taskInstance, processInstance)) {
+ || !checkProcessStrategy(taskInstance, processInstance)) {
state = ExecutionStatus.KILL;
} else {
state = ExecutionStatus.SUBMITTED_SUCCESS;
@@ -1535,7 +1558,7 @@ public class ProcessServiceImpl implements ProcessService
{
for (TaskInstance task : taskInstances) {
if (task.getState() == ExecutionStatus.FAILURE
- && task.getRetryTimes() >= task.getMaxRetryTimes()) {
+ && task.getRetryTimes() >= task.getMaxRetryTimes()) {
return false;
}
}
@@ -1647,8 +1670,8 @@ public class ProcessServiceImpl implements ProcessService
{
taskInstance.setProcessInstance(processInstance);
taskInstance.setProcessDefine(processInstance.getProcessDefinition());
TaskDefinition taskDefinition = this.findTaskDefinition(
- taskInstance.getTaskCode(),
- taskInstance.getTaskDefinitionVersion());
+ taskInstance.getTaskCode(),
+ taskInstance.getTaskDefinitionVersion());
this.updateTaskDefinitionResources(taskDefinition);
taskInstance.setTaskDefine(taskDefinition);
}
@@ -1661,17 +1684,17 @@ public class ProcessServiceImpl implements
ProcessService {
@Override
public void updateTaskDefinitionResources(TaskDefinition taskDefinition) {
Map<String, Object> taskParameters = JSONUtils.parseObject(
- taskDefinition.getTaskParams(),
- new TypeReference<Map<String, Object>>() {
- });
+ taskDefinition.getTaskParams(),
+ new TypeReference<Map<String, Object>>() {
+ });
if (taskParameters != null) {
// if contains mainJar field, query resource from database
// Flink, Spark, MR
if (taskParameters.containsKey("mainJar")) {
Object mainJarObj = taskParameters.get("mainJar");
ResourceInfo mainJar = JSONUtils.parseObject(
- JSONUtils.toJsonString(mainJarObj),
- ResourceInfo.class);
+ JSONUtils.toJsonString(mainJarObj),
+ ResourceInfo.class);
ResourceInfo resourceInfo = updateResourceInfo(mainJar);
if (resourceInfo != null) {
taskParameters.put("mainJar", resourceInfo);
@@ -1682,10 +1705,10 @@ public class ProcessServiceImpl implements
ProcessService {
String resourceListStr =
JSONUtils.toJsonString(taskParameters.get("resourceList"));
List<ResourceInfo> resourceInfos =
JSONUtils.toList(resourceListStr, ResourceInfo.class);
List<ResourceInfo> updatedResourceInfos = resourceInfos
- .stream()
- .map(this::updateResourceInfo)
- .filter(Objects::nonNull)
- .collect(Collectors.toList());
+ .stream()
+ .map(this::updateResourceInfo)
+ .filter(Objects::nonNull)
+ .collect(Collectors.toList());
taskParameters.put("resourceList", updatedResourceInfos);
}
// set task parameters
@@ -1716,7 +1739,7 @@ public class ProcessServiceImpl implements ProcessService
{
resourceInfo.setResourceName(resource.getFullName());
if (logger.isInfoEnabled()) {
logger.info("updated resource info {}",
- JSONUtils.toJsonString(resourceInfo));
+ JSONUtils.toJsonString(resourceInfo));
}
}
return resourceInfo;
@@ -1938,7 +1961,7 @@ public class ProcessServiceImpl implements ProcessService
{
public Map<Long, String>
queryWorkerGroupByProcessDefinitionCodes(List<Long> processDefinitionCodeList) {
List<Schedule> processDefinitionScheduleList =
scheduleMapper.querySchedulesByProcessDefinitionCodes(processDefinitionCodeList);
return
processDefinitionScheduleList.stream().collect(Collectors.toMap(Schedule::getProcessDefinitionCode,
- Schedule::getWorkerGroup));
+ Schedule::getWorkerGroup));
}
/**
@@ -2002,7 +2025,7 @@ public class ProcessServiceImpl implements ProcessService
{
@Override
public List<TaskInstance> queryNeedFailoverTaskInstances(String host) {
return taskInstanceMapper.queryByHostAndStatus(host,
- stateArray);
+ stateArray);
}
/**
@@ -2105,8 +2128,8 @@ public class ProcessServiceImpl implements ProcessService
{
@Override
public ProcessInstance findLastSchedulerProcessInterval(Long
definitionCode, DateInterval dateInterval) {
return processInstanceMapper.queryLastSchedulerProcess(definitionCode,
- dateInterval.getStartTime(),
- dateInterval.getEndTime());
+ dateInterval.getStartTime(),
+ dateInterval.getEndTime());
}
/**
@@ -2119,8 +2142,8 @@ public class ProcessServiceImpl implements ProcessService
{
@Override
public ProcessInstance findLastManualProcessInterval(Long definitionCode,
DateInterval dateInterval) {
return processInstanceMapper.queryLastManualProcess(definitionCode,
- dateInterval.getStartTime(),
- dateInterval.getEndTime());
+ dateInterval.getStartTime(),
+ dateInterval.getEndTime());
}
/**
@@ -2134,9 +2157,9 @@ public class ProcessServiceImpl implements ProcessService
{
@Override
public ProcessInstance findLastRunningProcess(Long definitionCode, Date
startTime, Date endTime) {
return processInstanceMapper.queryLastRunningProcess(definitionCode,
- startTime,
- endTime,
- stateArray);
+ startTime,
+ endTime,
+ stateArray);
}
/**
@@ -2384,10 +2407,10 @@ public class ProcessServiceImpl implements
ProcessService {
if (params != null &&
CollectionUtils.isNotEmpty(params.getResourceFilesList())) {
resourceIds = params.getResourceFilesList().
- stream()
- .filter(t -> t.getId() != 0)
- .map(ResourceInfo::getId)
- .collect(Collectors.toSet());
+ stream()
+ .filter(t -> t.getId() != 0)
+ .map(ResourceInfo::getId)
+ .collect(toSet());
}
if (CollectionUtils.isEmpty(resourceIds)) {
return StringUtils.EMPTY;
@@ -2420,7 +2443,7 @@ public class ProcessServiceImpl implements ProcessService
{
}
TaskDefinitionLog definitionCodeAndVersion =
taskDefinitionLogMapper
-
.queryByDefinitionCodeAndVersion(taskDefinitionLog.getCode(),
taskDefinitionLog.getVersion());
+ .queryByDefinitionCodeAndVersion(taskDefinitionLog.getCode(),
taskDefinitionLog.getVersion());
if (definitionCodeAndVersion == null) {
taskDefinitionLog.setUserId(operator.getId());
taskDefinitionLog.setCreateTime(now);
@@ -2502,7 +2525,7 @@ public class ProcessServiceImpl implements ProcessService
{
Map<Long, TaskDefinitionLog> taskDefinitionLogMap = null;
if (CollectionUtils.isNotEmpty(taskDefinitionLogs)) {
taskDefinitionLogMap = taskDefinitionLogs.stream()
- .collect(Collectors.toMap(TaskDefinition::getCode,
taskDefinitionLog -> taskDefinitionLog));
+ .collect(Collectors.toMap(TaskDefinition::getCode,
taskDefinitionLog -> taskDefinitionLog));
}
Date now = new Date();
for (ProcessTaskRelationLog processTaskRelationLog : taskRelationList)
{
@@ -2547,9 +2570,9 @@ public class ProcessServiceImpl implements ProcessService
{
List<ProcessTaskRelation> processTaskRelationList =
processTaskRelationMapper.queryByTaskCode(taskCode);
if (!processTaskRelationList.isEmpty()) {
Set<Long> processDefinitionCodes = processTaskRelationList
- .stream()
- .map(ProcessTaskRelation::getProcessDefinitionCode)
- .collect(Collectors.toSet());
+ .stream()
+ .map(ProcessTaskRelation::getProcessDefinitionCode)
+ .collect(toSet());
List<ProcessDefinition> processDefinitionList =
processDefineMapper.queryByCodes(processDefinitionCodes);
// check process definition is already online
for (ProcessDefinition processDefinition : processDefinitionList) {
@@ -2673,7 +2696,7 @@ public class ProcessServiceImpl implements ProcessService
{
taskDefinitionLogs = genTaskDefineList(taskRelationList);
}
Map<Long, TaskDefinitionLog> taskDefinitionLogMap =
taskDefinitionLogs.stream()
- .collect(Collectors.toMap(TaskDefinitionLog::getCode,
taskDefinitionLog -> taskDefinitionLog));
+ .collect(Collectors.toMap(TaskDefinitionLog::getCode,
taskDefinitionLog -> taskDefinitionLog));
List<TaskNode> taskNodeList = new ArrayList<>();
for (Entry<Long, List<Long>> code : taskCodeMap.entrySet()) {
TaskDefinitionLog taskDefinitionLog =
taskDefinitionLogMap.get(code.getKey());
@@ -2698,8 +2721,8 @@ public class ProcessServiceImpl implements ProcessService
{
taskNode.setWorkerGroup(taskDefinitionLog.getWorkerGroup());
taskNode.setEnvironmentCode(taskDefinitionLog.getEnvironmentCode());
taskNode.setTimeout(JSONUtils.toJsonString(new
TaskTimeoutParameter(taskDefinitionLog.getTimeoutFlag() == TimeoutFlag.OPEN,
- taskDefinitionLog.getTimeoutNotifyStrategy(),
- taskDefinitionLog.getTimeout())));
+ taskDefinitionLog.getTimeoutNotifyStrategy(),
+ taskDefinitionLog.getTimeout())));
taskNode.setDelayTime(taskDefinitionLog.getDelayTime());
taskNode.setPreTasks(JSONUtils.toJsonString(code.getValue().stream().map(taskDefinitionLogMap::get).map(TaskDefinition::getCode).collect(Collectors.toList())));
taskNode.setTaskGroupId(taskDefinitionLog.getTaskGroupId());
@@ -2735,7 +2758,7 @@ public class ProcessServiceImpl implements ProcessService
{
@Override
public int updateDqExecuteResultUserId(int taskInstanceId) {
DqExecuteResult dqExecuteResult =
- dqExecuteResultMapper.selectOne(new
QueryWrapper<DqExecuteResult>().eq(TASK_INSTANCE_ID, taskInstanceId));
+ dqExecuteResultMapper.selectOne(new
QueryWrapper<DqExecuteResult>().eq(TASK_INSTANCE_ID, taskInstanceId));
if (dqExecuteResult == null) {
return -1;
}
@@ -2764,15 +2787,15 @@ public class ProcessServiceImpl implements
ProcessService {
@Override
public int deleteDqExecuteResultByTaskInstanceId(int taskInstanceId) {
return dqExecuteResultMapper.delete(
- new QueryWrapper<DqExecuteResult>()
- .eq(TASK_INSTANCE_ID, taskInstanceId));
+ new QueryWrapper<DqExecuteResult>()
+ .eq(TASK_INSTANCE_ID, taskInstanceId));
}
@Override
public int deleteTaskStatisticsValueByTaskInstanceId(int taskInstanceId) {
return dqTaskStatisticsValueMapper.delete(
- new QueryWrapper<DqTaskStatisticsValue>()
- .eq(TASK_INSTANCE_ID, taskInstanceId));
+ new QueryWrapper<DqTaskStatisticsValue>()
+ .eq(TASK_INSTANCE_ID, taskInstanceId));
}
@Override
@@ -2845,7 +2868,7 @@ public class ProcessServiceImpl implements ProcessService
{
public boolean robTaskGroupResouce(TaskGroupQueue taskGroupQueue) {
TaskGroup taskGroup =
taskGroupMapper.selectById(taskGroupQueue.getGroupId());
int affectedCount =
taskGroupMapper.updateTaskGroupResource(taskGroup.getId(),
taskGroupQueue.getId(),
- TaskGroupQueueStatus.WAIT_QUEUE.getCode());
+ TaskGroupQueueStatus.WAIT_QUEUE.getCode());
if (affectedCount > 0) {
taskGroupQueue.setStatus(TaskGroupQueueStatus.ACQUIRE_SUCCESS);
this.taskGroupQueueMapper.updateById(taskGroupQueue);
@@ -2886,7 +2909,7 @@ public class ProcessServiceImpl implements ProcessService
{
}
try {
while (taskGroupMapper.releaseTaskGroupResource(taskGroup.getId(),
taskGroup.getUseSize()
- , thisTaskGroupQueue.getId(),
TaskGroupQueueStatus.ACQUIRE_SUCCESS.getCode()) != 1) {
+ , thisTaskGroupQueue.getId(),
TaskGroupQueueStatus.ACQUIRE_SUCCESS.getCode()) != 1) {
thisTaskGroupQueue =
this.taskGroupQueueMapper.queryByTaskId(taskInstance.getId());
if (thisTaskGroupQueue.getStatus() ==
TaskGroupQueueStatus.RELEASE) {
return null;
@@ -2899,13 +2922,13 @@ public class ProcessServiceImpl implements
ProcessService {
logger.info("updateTask:{}", taskInstance.getName());
changeTaskGroupQueueStatus(taskInstance.getId(),
TaskGroupQueueStatus.RELEASE);
TaskGroupQueue taskGroupQueue =
this.taskGroupQueueMapper.queryTheHighestPriorityTasks(taskGroup.getId(),
- TaskGroupQueueStatus.WAIT_QUEUE.getCode(), Flag.NO.getCode(),
Flag.NO.getCode());
+ TaskGroupQueueStatus.WAIT_QUEUE.getCode(), Flag.NO.getCode(),
Flag.NO.getCode());
if (taskGroupQueue == null) {
return null;
}
while (this.taskGroupQueueMapper.updateInQueueCAS(Flag.NO.getCode(),
Flag.YES.getCode(), taskGroupQueue.getId()) != 1) {
taskGroupQueue =
this.taskGroupQueueMapper.queryTheHighestPriorityTasks(taskGroup.getId(),
- TaskGroupQueueStatus.WAIT_QUEUE.getCode(),
Flag.NO.getCode(), Flag.NO.getCode());
+ TaskGroupQueueStatus.WAIT_QUEUE.getCode(), Flag.NO.getCode(),
Flag.NO.getCode());
if (taskGroupQueue == null) {
return null;
}
@@ -2971,7 +2994,7 @@ public class ProcessServiceImpl implements ProcessService
{
String address = host.split(":")[0];
int port = Integer.parseInt(host.split(":")[1]);
TaskEventChangeCommand taskEventChangeCommand = new
TaskEventChangeCommand(
- processInstance.getId(), taskId
+ processInstance.getId(), taskId
);
stateEventCallbackService.sendResult(address, port,
taskEventChangeCommand.convert2Command(taskType));
}
diff --git
a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/parameters/AbstractParameters.java
b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/parameters/AbstractParameters.java
index 743fe1b945..c556925f90 100644
---
a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/parameters/AbstractParameters.java
+++
b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/parameters/AbstractParameters.java
@@ -77,7 +77,6 @@ public abstract class AbstractParameters implements
IParameters {
public Map<String, Property> getLocalParametersMap() {
Map<String, Property> localParametersMaps = new LinkedHashMap<>();
if (localParams != null) {
-
for (Property property : localParams) {
localParametersMaps.put(property.getProp(),property);
}
@@ -113,14 +112,14 @@ public abstract class AbstractParameters implements
IParameters {
}
public void dealOutParam(String result) {
- if
(org.apache.commons.collections4.CollectionUtils.isEmpty(localParams)) {
+ if (CollectionUtils.isEmpty(localParams)) {
return;
}
List<Property> outProperty = getOutProperty(localParams);
- if
(org.apache.commons.collections4.CollectionUtils.isEmpty(outProperty)) {
+ if (CollectionUtils.isEmpty(outProperty)) {
return;
}
- if (org.apache.dolphinscheduler.spi.utils.StringUtils.isEmpty(result))
{
+ if (StringUtils.isEmpty(result)) {
varPool.addAll(outProperty);
return;
}
@@ -130,9 +129,9 @@ public abstract class AbstractParameters implements
IParameters {
}
for (Property info : outProperty) {
String propValue = taskResult.get(info.getProp());
- if
(org.apache.dolphinscheduler.spi.utils.StringUtils.isNotEmpty(propValue)) {
+ if (StringUtils.isNotEmpty(propValue)) {
info.setValue(propValue);
- varPool.add(info);
+ addPropertyToValPool(info);
}
}
}
@@ -180,4 +179,9 @@ public abstract class AbstractParameters implements
IParameters {
public ResourceParametersHelper getResources() {
return new ResourceParametersHelper();
}
+
+ private void addPropertyToValPool(Property property) {
+ varPool.removeIf(p -> p.getProp().equals(property.getProp()));
+ varPool.add(property);
+ }
}