This is an automated email from the ASF dual-hosted git repository.
caishunfeng pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git
The following commit(s) were added to refs/heads/dev by this push:
new d4aeee16e5 [Bug] [MASTER-9811]fix cmd param to overwrite global param
when executing complement (#9952)
d4aeee16e5 is described below
commit d4aeee16e5f3f67e37a31fa606acf0b313189655
Author: Tq <[email protected]>
AuthorDate: Wed May 11 18:37:03 2022 +0800
[Bug] [MASTER-9811]fix cmd param to overwrite global param when executing
complement (#9952)
* fix cmd param to overwrite global param when executing complement
* fix cmd param to overwrite global param when executing complement
---
.../master/runner/WorkflowExecuteThread.java | 38 ++++++++++++++++++++++
.../service/process/ProcessServiceImpl.java | 5 +--
2 files changed, 41 insertions(+), 2 deletions(-)
diff --git
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThread.java
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThread.java
index e4f09e5a9c..98556dc0f0 100644
---
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThread.java
+++
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThread.java
@@ -24,6 +24,8 @@ import static
org.apache.dolphinscheduler.common.Constants.CMD_PARAM_RECOVER_PRO
import static
org.apache.dolphinscheduler.common.Constants.CMD_PARAM_START_NODES;
import static
org.apache.dolphinscheduler.common.Constants.DEFAULT_WORKER_GROUP;
import static
org.apache.dolphinscheduler.plugin.task.api.TaskConstants.TASK_TYPE_BLOCKING;
+import static
org.apache.dolphinscheduler.plugin.task.api.enums.DataType.VARCHAR;
+import static org.apache.dolphinscheduler.plugin.task.api.enums.Direct.IN;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.CommandType;
@@ -942,6 +944,9 @@ public class WorkflowExecuteThread {
if (processInstance.isComplementData() && complementListDate.size() ==
0) {
Map<String, String> cmdParam =
JSONUtils.toMap(processInstance.getCommandParam());
if (cmdParam != null &&
cmdParam.containsKey(CMDPARAM_COMPLEMENT_DATA_START_DATE)) {
+ // reset global params while there are start parameters
+ setGlobalParamIfCommanded(processDefinition, cmdParam);
+
Date start =
DateUtils.stringToDate(cmdParam.get(CMDPARAM_COMPLEMENT_DATA_START_DATE));
Date end =
DateUtils.stringToDate(cmdParam.get(CMDPARAM_COMPLEMENT_DATA_END_DATE));
List<Schedule> schedules =
processService.queryReleaseSchedulerListByProcessDefinitionCode(processInstance.getProcessDefinitionCode());
@@ -1960,4 +1965,37 @@ public class WorkflowExecuteThread {
}
}
+ private void setGlobalParamIfCommanded(ProcessDefinition
processDefinition, Map<String, String> cmdParam) {
+ // get start params from command param
+ Map<String, String> startParamMap = new HashMap<>();
+ if (cmdParam.containsKey(Constants.CMD_PARAM_START_PARAMS)) {
+ String startParamJson =
cmdParam.get(Constants.CMD_PARAM_START_PARAMS);
+ startParamMap = JSONUtils.toMap(startParamJson);
+ }
+ Map<String, String> fatherParamMap = new HashMap<>();
+ if (cmdParam.containsKey(Constants.CMD_PARAM_FATHER_PARAMS)) {
+ String fatherParamJson =
cmdParam.get(Constants.CMD_PARAM_FATHER_PARAMS);
+ fatherParamMap = JSONUtils.toMap(fatherParamJson);
+ }
+ startParamMap.putAll(fatherParamMap);
+ // set start param into global params
+ Map<String, String> globalMap = processDefinition.getGlobalParamMap();
+ List<Property> globalParamList =
processDefinition.getGlobalParamList();
+ if (startParamMap.size() > 0 && globalMap != null) {
+ //start param to overwrite global param
+ for (Map.Entry<String, String> param : globalMap.entrySet()) {
+ String val = startParamMap.get(param.getKey());
+ if (val != null) {
+ param.setValue(val);
+ }
+ }
+ //start param to create new global param if global not exist
+ for (Map.Entry<String, String> startParam :
startParamMap.entrySet()) {
+ if (!globalMap.containsKey(startParam.getKey())) {
+ globalMap.put(startParam.getKey(), startParam.getValue());
+ globalParamList.add(new Property(startParam.getKey(), IN,
VARCHAR, startParam.getValue()));
+ }
+ }
+ }
+ }
}
diff --git
a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java
b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java
index 53c20a5ae8..5136891280 100644
---
a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java
+++
b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java
@@ -818,14 +818,15 @@ public class ProcessServiceImpl implements ProcessService
{
// set start param into global params
Map<String, String> globalMap = processDefinition.getGlobalParamMap();
List<Property> globalParamList =
processDefinition.getGlobalParamList();
- if (startParamMap.size() > 0
- && globalMap != null) {
+ if (startParamMap.size() > 0 && globalMap != null) {
+ //start param to overwrite global param
for (Map.Entry<String, String> param : globalMap.entrySet()) {
String val = startParamMap.get(param.getKey());
if (val != null) {
param.setValue(val);
}
}
+ //start param to create new global param if global not exist
for (Entry<String, String> startParam : startParamMap.entrySet()) {
if (!globalMap.containsKey(startParam.getKey())) {
globalMap.put(startParam.getKey(), startParam.getValue());