This is an automated email from the ASF dual-hosted git repository.
kirs pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-dolphinscheduler.git
The following commit(s) were added to refs/heads/dev by this push:
new 29d42fd [Feature-#3805][server-master] Task parameter transfer (#5077)
29d42fd is described below
commit 29d42fd92d6720a8a0641e37923c6e6f38a5ae85
Author: wangxj3 <[email protected]>
AuthorDate: Thu Mar 18 11:50:10 2021 +0800
[Feature-#3805][server-master] Task parameter transfer (#5077)
* fix out param format bug
Co-authored-by: wangxj <wangxj31>
---
.../apache/dolphinscheduler/common/Constants.java | 2 +
.../server/master/runner/MasterExecThread.java | 13 +-----
.../service/process/ProcessService.java | 46 ++++++++++++++++++----
3 files changed, 41 insertions(+), 20 deletions(-)
diff --git
a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java
b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java
index d6d571f..eb71239 100644
---
a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java
+++
b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java
@@ -469,6 +469,8 @@ public final class Constants {
public static final String CMD_PARAM_START_PARAMS = "StartParams";
+ public static final String CMD_PARAM_FATHER_PARAMS = "fatherParams";
+
/**
* complement data start date
*/
diff --git
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterExecThread.java
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterExecThread.java
index 80bd3f7..b7a4d00 100644
---
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterExecThread.java
+++
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterExecThread.java
@@ -557,7 +557,7 @@ public class MasterExecThread implements Runnable {
private void setProcessGlobal(TaskNode taskNode, TaskInstance
taskInstance) {
String globalParams = this.processInstance.getGlobalParams();
if (StringUtils.isNotEmpty(globalParams)) {
- Map<String, String> globalMap = getGlobalParamMap(globalParams);
+ Map<String, String> globalMap =
processService.getGlobalParamMap(globalParams);
if (globalMap != null && globalMap.size() != 0) {
setGlobalMapToTask(taskNode, taskInstance, globalMap);
}
@@ -586,17 +586,6 @@ public class MasterExecThread implements Runnable {
}
}
- public Map<String, String> getGlobalParamMap(String globalParams) {
- List<Property> propList;
- Map<String,String> globalParamMap = new HashMap<>();
- if (StringUtils.isNotEmpty(globalParams)) {
- propList = JSONUtils.toList(globalParams, Property.class);
- globalParamMap =
propList.stream().collect(Collectors.toMap(Property::getProp,
Property::getValue));
- }
-
- return globalParamMap;
- }
-
private void submitPostNode(String parentNodeName) {
Set<String> submitTaskNodeList =
DagHelper.parsePostNodes(parentNodeName, skipTaskNodeList, dag,
completeTaskList);
List<TaskInstance> taskInstances = new ArrayList<>();
diff --git
a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
index a759c24..fe73eae 100644
---
a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
+++
b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
@@ -20,6 +20,7 @@ package org.apache.dolphinscheduler.service.process;
import static
org.apache.dolphinscheduler.common.Constants.CMDPARAM_COMPLEMENT_DATA_END_DATE;
import static
org.apache.dolphinscheduler.common.Constants.CMDPARAM_COMPLEMENT_DATA_START_DATE;
import static
org.apache.dolphinscheduler.common.Constants.CMD_PARAM_EMPTY_SUB_PROCESS;
+import static
org.apache.dolphinscheduler.common.Constants.CMD_PARAM_FATHER_PARAMS;
import static
org.apache.dolphinscheduler.common.Constants.CMD_PARAM_RECOVER_PROCESS_ID_STRING;
import static
org.apache.dolphinscheduler.common.Constants.CMD_PARAM_SUB_PROCESS;
import static
org.apache.dolphinscheduler.common.Constants.CMD_PARAM_SUB_PROCESS_DEFINE_ID;
@@ -586,14 +587,19 @@ public class ProcessService {
private void setGlobalParamIfCommanded(ProcessDefinition
processDefinition, Map<String, String> cmdParam) {
// get start params from command param
- Map<String, String> startParamMap = null;
+ Map<String, String> startParamMap = new HashMap<>();
if (cmdParam != null &&
cmdParam.containsKey(Constants.CMD_PARAM_START_PARAMS)) {
String startParamJson =
cmdParam.get(Constants.CMD_PARAM_START_PARAMS);
startParamMap = JSONUtils.toMap(startParamJson);
}
-
+ Map<String, String> fatherParamMap = new HashMap<>();
+ if (cmdParam != null &&
cmdParam.containsKey(Constants.CMD_PARAM_FATHER_PARAMS)) {
+ String fatherParamJson =
cmdParam.get(Constants.CMD_PARAM_FATHER_PARAMS);
+ fatherParamMap = JSONUtils.toMap(fatherParamJson);
+ }
+ startParamMap.putAll(fatherParamMap);
// set start param into global params
- if (startParamMap != null && startParamMap.size() > 0
+ if (startParamMap.size() > 0
&& processDefinition.getGlobalParamMap() != null) {
for (Map.Entry<String, String> param :
processDefinition.getGlobalParamMap().entrySet()) {
String val = startParamMap.get(param.getKey());
@@ -1065,7 +1071,7 @@ public class ProcessService {
/**
* complement data needs transform parent parameter to child.
*/
- private String getSubWorkFlowParam(ProcessInstanceMap instanceMap,
ProcessInstance parentProcessInstance) {
+ private String getSubWorkFlowParam(ProcessInstanceMap instanceMap,
ProcessInstance parentProcessInstance,Map<String,String> fatherParams) {
// set sub work process command
String processMapStr = JSONUtils.toJsonString(instanceMap);
Map<String, String> cmdParam = JSONUtils.toMap(processMapStr);
@@ -1077,9 +1083,24 @@ public class ProcessService {
cmdParam.put(CMDPARAM_COMPLEMENT_DATA_START_DATE, startTime);
processMapStr = JSONUtils.toJsonString(cmdParam);
}
+ if (fatherParams.size() != 0) {
+ cmdParam.put(CMD_PARAM_FATHER_PARAMS,
JSONUtils.toJsonString(fatherParams));
+ processMapStr = JSONUtils.toJsonString(cmdParam);
+ }
return processMapStr;
}
+ public Map<String, String> getGlobalParamMap(String globalParams) {
+ List<Property> propList;
+ Map<String, String> globalParamMap = new HashMap<>();
+ if (StringUtils.isNotEmpty(globalParams)) {
+ propList = JSONUtils.toList(globalParams, Property.class);
+ globalParamMap =
propList.stream().collect(Collectors.toMap(Property::getProp,
Property::getValue));
+ }
+
+ return globalParamMap;
+ }
+
/**
* create sub work process command
*/
@@ -1089,9 +1110,18 @@ public class ProcessService {
TaskInstance task) {
CommandType commandType = getSubCommandType(parentProcessInstance,
childInstance);
TaskNode taskNode = JSONUtils.parseObject(task.getTaskJson(),
TaskNode.class);
- Map<String, String> subProcessParam =
JSONUtils.toMap(taskNode.getParams());
- Integer childDefineId =
Integer.parseInt(subProcessParam.get(Constants.CMD_PARAM_SUB_PROCESS_DEFINE_ID));
- String processParam = getSubWorkFlowParam(instanceMap,
parentProcessInstance);
+ Map<String, Object> subProcessParam =
JSONUtils.toMap(taskNode.getParams(), String.class, Object.class);
+ Integer childDefineId =
Integer.parseInt(String.valueOf(subProcessParam.get(Constants.CMD_PARAM_SUB_PROCESS_DEFINE_ID)));
+ Object localParams = subProcessParam.get(Constants.LOCAL_PARAMS);
+ List<Property> allParam =
JSONUtils.toList(JSONUtils.toJsonString(localParams), Property.class);
+ Map<String, String> globalMap =
this.getGlobalParamMap(parentProcessInstance.getGlobalParams());
+ Map<String,String> fatherParams = new HashMap<>();
+ if (CollectionUtils.isNotEmpty(allParam)) {
+ for (Property info : allParam) {
+ fatherParams.put(info.getProp(),
globalMap.get(info.getProp()));
+ }
+ }
+ String processParam = getSubWorkFlowParam(instanceMap,
parentProcessInstance,fatherParams);
return new Command(
commandType,
@@ -1601,7 +1631,7 @@ public class ProcessService {
if (property == null) {
continue;
}
- String value = row.get(paramName);
+ String value = String.valueOf(row.get(paramName));
if (StringUtils.isNotEmpty(value)) {
property.setValue(value);
info.setValue(value);