This is an automated email from the ASF dual-hosted git repository.

zhongjiajie pushed a commit to branch 3.0.0-beta-prepare
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git

commit 104f67d30689007278a6bb9f2e050521798258e6
Author: Tq <[email protected]>
AuthorDate: Wed May 11 18:37:03 2022 +0800

    [Bug] [MASTER-9811]fix cmd param to overwrite global param when executing 
complement (#9952)
    
    * fix cmd param to overwrite global param when executing complement
    
    * fix cmd param to overwrite global param when executing complement
    
    (cherry picked from commit d4aeee16e5f3f67e37a31fa606acf0b313189655)
---
 .../master/runner/WorkflowExecuteThread.java       | 38 ++++++++++++++++++++++
 .../service/process/ProcessServiceImpl.java        |  5 +--
 2 files changed, 41 insertions(+), 2 deletions(-)

diff --git 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThread.java
 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThread.java
index e4f09e5a9c..98556dc0f0 100644
--- 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThread.java
+++ 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThread.java
@@ -24,6 +24,8 @@ import static 
org.apache.dolphinscheduler.common.Constants.CMD_PARAM_RECOVER_PRO
 import static 
org.apache.dolphinscheduler.common.Constants.CMD_PARAM_START_NODES;
 import static 
org.apache.dolphinscheduler.common.Constants.DEFAULT_WORKER_GROUP;
 import static 
org.apache.dolphinscheduler.plugin.task.api.TaskConstants.TASK_TYPE_BLOCKING;
+import static 
org.apache.dolphinscheduler.plugin.task.api.enums.DataType.VARCHAR;
+import static org.apache.dolphinscheduler.plugin.task.api.enums.Direct.IN;
 
 import org.apache.dolphinscheduler.common.Constants;
 import org.apache.dolphinscheduler.common.enums.CommandType;
@@ -942,6 +944,9 @@ public class WorkflowExecuteThread {
         if (processInstance.isComplementData() && complementListDate.size() == 
0) {
             Map<String, String> cmdParam = 
JSONUtils.toMap(processInstance.getCommandParam());
             if (cmdParam != null && 
cmdParam.containsKey(CMDPARAM_COMPLEMENT_DATA_START_DATE)) {
+                // reset global params while there are start parameters
+                setGlobalParamIfCommanded(processDefinition, cmdParam);
+
                 Date start = 
DateUtils.stringToDate(cmdParam.get(CMDPARAM_COMPLEMENT_DATA_START_DATE));
                 Date end = 
DateUtils.stringToDate(cmdParam.get(CMDPARAM_COMPLEMENT_DATA_END_DATE));
                 List<Schedule> schedules = 
processService.queryReleaseSchedulerListByProcessDefinitionCode(processInstance.getProcessDefinitionCode());
@@ -1960,4 +1965,37 @@ public class WorkflowExecuteThread {
         }
     }
 
+    private void setGlobalParamIfCommanded(ProcessDefinition 
processDefinition, Map<String, String> cmdParam) {
+        // get start params from command param
+        Map<String, String> startParamMap = new HashMap<>();
+        if (cmdParam.containsKey(Constants.CMD_PARAM_START_PARAMS)) {
+            String startParamJson = 
cmdParam.get(Constants.CMD_PARAM_START_PARAMS);
+            startParamMap = JSONUtils.toMap(startParamJson);
+        }
+        Map<String, String> fatherParamMap = new HashMap<>();
+        if (cmdParam.containsKey(Constants.CMD_PARAM_FATHER_PARAMS)) {
+            String fatherParamJson = 
cmdParam.get(Constants.CMD_PARAM_FATHER_PARAMS);
+            fatherParamMap = JSONUtils.toMap(fatherParamJson);
+        }
+        startParamMap.putAll(fatherParamMap);
+        // set start param into global params
+        Map<String, String> globalMap = processDefinition.getGlobalParamMap();
+        List<Property> globalParamList = 
processDefinition.getGlobalParamList();
+        if (startParamMap.size() > 0 && globalMap != null) {
+            //start param to overwrite global param
+            for (Map.Entry<String, String> param : globalMap.entrySet()) {
+                String val = startParamMap.get(param.getKey());
+                if (val != null) {
+                    param.setValue(val);
+                }
+            }
+            //start param to create new global param if global not exist
+            for (Map.Entry<String, String> startParam : 
startParamMap.entrySet()) {
+                if (!globalMap.containsKey(startParam.getKey())) {
+                    globalMap.put(startParam.getKey(), startParam.getValue());
+                    globalParamList.add(new Property(startParam.getKey(), IN, 
VARCHAR, startParam.getValue()));
+                }
+            }
+        }
+    }
 }
diff --git 
a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java
 
b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java
index 53c20a5ae8..5136891280 100644
--- 
a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java
+++ 
b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java
@@ -818,14 +818,15 @@ public class ProcessServiceImpl implements ProcessService 
{
         // set start param into global params
         Map<String, String> globalMap = processDefinition.getGlobalParamMap();
         List<Property> globalParamList = 
processDefinition.getGlobalParamList();
-        if (startParamMap.size() > 0
-            && globalMap != null) {
+        if (startParamMap.size() > 0 && globalMap != null) {
+            //start param to overwrite global param
             for (Map.Entry<String, String> param : globalMap.entrySet()) {
                 String val = startParamMap.get(param.getKey());
                 if (val != null) {
                     param.setValue(val);
                 }
             }
+            //start param to create new global param if global not exist
             for (Entry<String, String> startParam : startParamMap.entrySet()) {
                 if (!globalMap.containsKey(startParam.getKey())) {
                     globalMap.put(startParam.getKey(), startParam.getValue());

Reply via email to