This is an automated email from the ASF dual-hosted git repository.

kirs pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-dolphinscheduler.git


The following commit(s) were added to refs/heads/dev by this push:
     new 29d42fd  [Feature-#3805][server-master] Task parameter transfer (#5077)
29d42fd is described below

commit 29d42fd92d6720a8a0641e37923c6e6f38a5ae85
Author: wangxj3 <[email protected]>
AuthorDate: Thu Mar 18 11:50:10 2021 +0800

    [Feature-#3805][server-master] Task parameter transfer (#5077)
    
    
    
    * fix out param format bug
    
    Co-authored-by: wangxj <wangxj31>
---
 .../apache/dolphinscheduler/common/Constants.java  |  2 +
 .../server/master/runner/MasterExecThread.java     | 13 +-----
 .../service/process/ProcessService.java            | 46 ++++++++++++++++++----
 3 files changed, 41 insertions(+), 20 deletions(-)

diff --git 
a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java
 
b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java
index d6d571f..eb71239 100644
--- 
a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java
+++ 
b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java
@@ -469,6 +469,8 @@ public final class Constants {
 
     public static final String CMD_PARAM_START_PARAMS = "StartParams";
 
+    public static final String CMD_PARAM_FATHER_PARAMS = "fatherParams";
+
     /**
      * complement data start date
      */
diff --git 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterExecThread.java
 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterExecThread.java
index 80bd3f7..b7a4d00 100644
--- 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterExecThread.java
+++ 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterExecThread.java
@@ -557,7 +557,7 @@ public class MasterExecThread implements Runnable {
     private void setProcessGlobal(TaskNode taskNode, TaskInstance 
taskInstance) {
         String globalParams = this.processInstance.getGlobalParams();
         if (StringUtils.isNotEmpty(globalParams)) {
-            Map<String, String> globalMap = getGlobalParamMap(globalParams);
+            Map<String, String> globalMap = 
processService.getGlobalParamMap(globalParams);
             if (globalMap != null && globalMap.size() != 0) {
                 setGlobalMapToTask(taskNode, taskInstance, globalMap);
             }
@@ -586,17 +586,6 @@ public class MasterExecThread implements Runnable {
         }
     }
 
-    public Map<String, String> getGlobalParamMap(String globalParams) {
-        List<Property> propList;
-        Map<String,String> globalParamMap = new HashMap<>();
-        if (StringUtils.isNotEmpty(globalParams)) {
-            propList = JSONUtils.toList(globalParams, Property.class);
-            globalParamMap = 
propList.stream().collect(Collectors.toMap(Property::getProp, 
Property::getValue));
-        }
-
-        return globalParamMap;
-    }
-
     private void submitPostNode(String parentNodeName) {
         Set<String> submitTaskNodeList = 
DagHelper.parsePostNodes(parentNodeName, skipTaskNodeList, dag, 
completeTaskList);
         List<TaskInstance> taskInstances = new ArrayList<>();
diff --git 
a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
 
b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
index a759c24..fe73eae 100644
--- 
a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
+++ 
b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
@@ -20,6 +20,7 @@ package org.apache.dolphinscheduler.service.process;
 import static 
org.apache.dolphinscheduler.common.Constants.CMDPARAM_COMPLEMENT_DATA_END_DATE;
 import static 
org.apache.dolphinscheduler.common.Constants.CMDPARAM_COMPLEMENT_DATA_START_DATE;
 import static 
org.apache.dolphinscheduler.common.Constants.CMD_PARAM_EMPTY_SUB_PROCESS;
+import static 
org.apache.dolphinscheduler.common.Constants.CMD_PARAM_FATHER_PARAMS;
 import static 
org.apache.dolphinscheduler.common.Constants.CMD_PARAM_RECOVER_PROCESS_ID_STRING;
 import static 
org.apache.dolphinscheduler.common.Constants.CMD_PARAM_SUB_PROCESS;
 import static 
org.apache.dolphinscheduler.common.Constants.CMD_PARAM_SUB_PROCESS_DEFINE_ID;
@@ -586,14 +587,19 @@ public class ProcessService {
 
     private void setGlobalParamIfCommanded(ProcessDefinition 
processDefinition, Map<String, String> cmdParam) {
         // get start params from command param
-        Map<String, String> startParamMap = null;
+        Map<String, String> startParamMap = new HashMap<>();
         if (cmdParam != null && 
cmdParam.containsKey(Constants.CMD_PARAM_START_PARAMS)) {
             String startParamJson = 
cmdParam.get(Constants.CMD_PARAM_START_PARAMS);
             startParamMap = JSONUtils.toMap(startParamJson);
         }
-
+        Map<String, String> fatherParamMap = new HashMap<>();
+        if (cmdParam != null && 
cmdParam.containsKey(Constants.CMD_PARAM_FATHER_PARAMS)) {
+            String fatherParamJson = 
cmdParam.get(Constants.CMD_PARAM_FATHER_PARAMS);
+            fatherParamMap = JSONUtils.toMap(fatherParamJson);
+        }
+        startParamMap.putAll(fatherParamMap);
         // set start param into global params
-        if (startParamMap != null && startParamMap.size() > 0
+        if (startParamMap.size() > 0
                 && processDefinition.getGlobalParamMap() != null) {
             for (Map.Entry<String, String> param : 
processDefinition.getGlobalParamMap().entrySet()) {
                 String val = startParamMap.get(param.getKey());
@@ -1065,7 +1071,7 @@ public class ProcessService {
     /**
      * complement data needs transform parent parameter to child.
      */
-    private String getSubWorkFlowParam(ProcessInstanceMap instanceMap, 
ProcessInstance parentProcessInstance) {
+    private String getSubWorkFlowParam(ProcessInstanceMap instanceMap, 
ProcessInstance parentProcessInstance,Map<String,String> fatherParams) {
         // set sub work process command
         String processMapStr = JSONUtils.toJsonString(instanceMap);
         Map<String, String> cmdParam = JSONUtils.toMap(processMapStr);
@@ -1077,9 +1083,24 @@ public class ProcessService {
             cmdParam.put(CMDPARAM_COMPLEMENT_DATA_START_DATE, startTime);
             processMapStr = JSONUtils.toJsonString(cmdParam);
         }
+        if (fatherParams.size() != 0) {
+            cmdParam.put(CMD_PARAM_FATHER_PARAMS, 
JSONUtils.toJsonString(fatherParams));
+            processMapStr = JSONUtils.toJsonString(cmdParam);
+        }
         return processMapStr;
     }
 
+    public Map<String, String> getGlobalParamMap(String globalParams) {
+        List<Property> propList;
+        Map<String, String> globalParamMap = new HashMap<>();
+        if (StringUtils.isNotEmpty(globalParams)) {
+            propList = JSONUtils.toList(globalParams, Property.class);
+            globalParamMap = 
propList.stream().collect(Collectors.toMap(Property::getProp, 
Property::getValue));
+        }
+
+        return globalParamMap;
+    }
+
     /**
      * create sub work process command
      */
@@ -1089,9 +1110,18 @@ public class ProcessService {
                                            TaskInstance task) {
         CommandType commandType = getSubCommandType(parentProcessInstance, 
childInstance);
         TaskNode taskNode = JSONUtils.parseObject(task.getTaskJson(), 
TaskNode.class);
-        Map<String, String> subProcessParam = 
JSONUtils.toMap(taskNode.getParams());
-        Integer childDefineId = 
Integer.parseInt(subProcessParam.get(Constants.CMD_PARAM_SUB_PROCESS_DEFINE_ID));
-        String processParam = getSubWorkFlowParam(instanceMap, 
parentProcessInstance);
+        Map<String, Object> subProcessParam = 
JSONUtils.toMap(taskNode.getParams(), String.class, Object.class);
+        Integer childDefineId = 
Integer.parseInt(String.valueOf(subProcessParam.get(Constants.CMD_PARAM_SUB_PROCESS_DEFINE_ID)));
+        Object localParams = subProcessParam.get(Constants.LOCAL_PARAMS);
+        List<Property> allParam = 
JSONUtils.toList(JSONUtils.toJsonString(localParams), Property.class);
+        Map<String, String> globalMap = 
this.getGlobalParamMap(parentProcessInstance.getGlobalParams());
+        Map<String,String> fatherParams = new HashMap<>();
+        if (CollectionUtils.isNotEmpty(allParam)) {
+            for (Property info : allParam) {
+                fatherParams.put(info.getProp(), 
globalMap.get(info.getProp()));
+            }
+        }
+        String processParam = getSubWorkFlowParam(instanceMap, 
parentProcessInstance,fatherParams);
 
         return new Command(
                 commandType,
@@ -1601,7 +1631,7 @@ public class ProcessService {
                 if (property == null) {
                     continue;
                 }
-                String value = row.get(paramName);
+                String value = String.valueOf(row.get(paramName));
                 if (StringUtils.isNotEmpty(value)) {
                     property.setValue(value);
                     info.setValue(value);

Reply via email to