This is an automated email from the ASF dual-hosted git repository. zhongjiajie pushed a commit to branch 3.0.0-beta-prepare in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git
commit 104f67d30689007278a6bb9f2e050521798258e6 Author: Tq <[email protected]> AuthorDate: Wed May 11 18:37:03 2022 +0800 [Bug] [MASTER-9811]fix cmd param to overwrite global param when executing complement (#9952) * fix cmd param to overwrite global param when executing complement * fix cmd param to overwrite global param when executing complement (cherry picked from commit d4aeee16e5f3f67e37a31fa606acf0b313189655) --- .../master/runner/WorkflowExecuteThread.java | 38 ++++++++++++++++++++++ .../service/process/ProcessServiceImpl.java | 5 +-- 2 files changed, 41 insertions(+), 2 deletions(-) diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThread.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThread.java index e4f09e5a9c..98556dc0f0 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThread.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThread.java @@ -24,6 +24,8 @@ import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_RECOVER_PRO import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_START_NODES; import static org.apache.dolphinscheduler.common.Constants.DEFAULT_WORKER_GROUP; import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.TASK_TYPE_BLOCKING; +import static org.apache.dolphinscheduler.plugin.task.api.enums.DataType.VARCHAR; +import static org.apache.dolphinscheduler.plugin.task.api.enums.Direct.IN; import org.apache.dolphinscheduler.common.Constants; import org.apache.dolphinscheduler.common.enums.CommandType; @@ -942,6 +944,9 @@ public class WorkflowExecuteThread { if (processInstance.isComplementData() && complementListDate.size() == 0) { Map<String, String> cmdParam = JSONUtils.toMap(processInstance.getCommandParam()); if (cmdParam != null && cmdParam.containsKey(CMDPARAM_COMPLEMENT_DATA_START_DATE)) { + // reset global params while there are start parameters + setGlobalParamIfCommanded(processDefinition, cmdParam); + Date start = DateUtils.stringToDate(cmdParam.get(CMDPARAM_COMPLEMENT_DATA_START_DATE)); Date end = DateUtils.stringToDate(cmdParam.get(CMDPARAM_COMPLEMENT_DATA_END_DATE)); List<Schedule> schedules = processService.queryReleaseSchedulerListByProcessDefinitionCode(processInstance.getProcessDefinitionCode()); @@ -1960,4 +1965,37 @@ public class WorkflowExecuteThread { } } + private void setGlobalParamIfCommanded(ProcessDefinition processDefinition, Map<String, String> cmdParam) { + // get start params from command param + Map<String, String> startParamMap = new HashMap<>(); + if (cmdParam.containsKey(Constants.CMD_PARAM_START_PARAMS)) { + String startParamJson = cmdParam.get(Constants.CMD_PARAM_START_PARAMS); + startParamMap = JSONUtils.toMap(startParamJson); + } + Map<String, String> fatherParamMap = new HashMap<>(); + if (cmdParam.containsKey(Constants.CMD_PARAM_FATHER_PARAMS)) { + String fatherParamJson = cmdParam.get(Constants.CMD_PARAM_FATHER_PARAMS); + fatherParamMap = JSONUtils.toMap(fatherParamJson); + } + startParamMap.putAll(fatherParamMap); + // set start param into global params + Map<String, String> globalMap = processDefinition.getGlobalParamMap(); + List<Property> globalParamList = processDefinition.getGlobalParamList(); + if (startParamMap.size() > 0 && globalMap != null) { + //start param to overwrite global param + for (Map.Entry<String, String> param : globalMap.entrySet()) { + String val = startParamMap.get(param.getKey()); + if (val != null) { + param.setValue(val); + } + } + //start param to create new global param if global not exist + for (Map.Entry<String, String> startParam : startParamMap.entrySet()) { + if (!globalMap.containsKey(startParam.getKey())) { + globalMap.put(startParam.getKey(), startParam.getValue()); + globalParamList.add(new Property(startParam.getKey(), IN, VARCHAR, startParam.getValue())); + } + } + } + } } diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java index 53c20a5ae8..5136891280 100644 --- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java +++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java @@ -818,14 +818,15 @@ public class ProcessServiceImpl implements ProcessService { // set start param into global params Map<String, String> globalMap = processDefinition.getGlobalParamMap(); List<Property> globalParamList = processDefinition.getGlobalParamList(); - if (startParamMap.size() > 0 - && globalMap != null) { + if (startParamMap.size() > 0 && globalMap != null) { + //start param to overwrite global param for (Map.Entry<String, String> param : globalMap.entrySet()) { String val = startParamMap.get(param.getKey()); if (val != null) { param.setValue(val); } } + //start param to create new global param if global not exist for (Entry<String, String> startParam : startParamMap.entrySet()) { if (!globalMap.containsKey(startParam.getKey())) { globalMap.put(startParam.getKey(), startParam.getValue());
