wangxj3 commented on a change in pull request #5603:
URL: https://github.com/apache/dolphinscheduler/pull/5603#discussion_r655047035
##########
File path:
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterExecThread.java
##########
@@ -518,42 +518,61 @@ private TaskInstance createTaskInstance(ProcessInstance
processInstance, TaskNod
} else {
taskInstance.setWorkerGroup(taskWorkerGroup);
}
-
taskInstance.setTaskParams(globalParamToTaskParams(taskNode.getTaskParams()));
// delay execution time
taskInstance.setDelayTime(taskNode.getDelayTime());
}
+
+ //get pre task ,get all the task varPool to this task
+ Set<String> preTask = dag.getPreviousNodes(taskInstance.getName());
+ getPreVarPool(taskInstance, preTask);
return taskInstance;
}
- private String globalParamToTaskParams(String params) {
- String globalParams = this.processInstance.getGlobalParams();
- if (StringUtils.isBlank(globalParams)) {
- return params;
- }
- Map<String, String> globalMap =
processService.getGlobalParamMap(globalParams);
- if (globalMap == null || globalMap.size() == 0) {
- return params;
- }
- // the process global param save in localParams
- Map<String, Object> result = JSONUtils.toMap(params, String.class,
Object.class);
- Object localParams = result.get(LOCAL_PARAMS);
- if (localParams != null) {
- List<Property> allParam =
JSONUtils.toList(JSONUtils.toJsonString(localParams), Property.class);
- for (Property info : allParam) {
- String paramName = info.getProp();
- if (StringUtils.isNotEmpty(paramName) &&
propToValue.containsKey(paramName)) {
- info.setValue((String) propToValue.get(paramName));
- }
- if (info.getDirect().equals(Direct.IN)) {
- String value = globalMap.get(paramName);
- if (StringUtils.isNotEmpty(value)) {
- info.setValue(value);
+ public void getPreVarPool(TaskInstance taskInstance, Set<String> preTask)
{
+ Map<String,Property> allProperty = new HashMap<>();
+ Map<String,TaskInstance> allTaskInstance = new HashMap<>();
+ if (CollectionUtils.isNotEmpty(preTask)) {
+ for (String preTaskName : preTask) {
+ TaskInstance preTaskInstance =
completeTaskList.get(preTaskName);
+ String preVarPool = preTaskInstance.getVarPool();
+ if (StringUtils.isNotEmpty(preVarPool)) {
+ List<Property> properties = JSONUtils.toList(preVarPool,
Property.class);
+ for (Property info : properties) {
+ //for this taskInstance all the param in this part is
IN.
+ info.setDirect(Direct.IN);
+ //get the pre taskInstance Property's name
+ String proName = info.getProp();
+ //if the Previous nodes have the Property of same name
+ if (allProperty.containsKey(proName)) {
+ //comparison the value of two Property
+ Property otherPro = allProperty.get(proName);
+ //if this property'value of loop is empty,use the
other,whether the other's value is empty or not
+ if (StringUtils.isEmpty(info.getValue())) {
+ allProperty.put(proName, otherPro);
+ //if property'value of loop is not empty,and
the other's value is not empty too, use the earlier value
+ } else if
(StringUtils.isNotEmpty(otherPro.getValue())) {
+ TaskInstance otherTask =
allTaskInstance.get(proName);
+ if (otherTask.getEndTime().getTime() >
preTaskInstance.getEndTime().getTime()) {
+ allProperty.put(proName, info);
+
allTaskInstance.put(proName,preTaskInstance);
+ } else {
+ allProperty.put(proName, otherPro);
+ }
+ } else {
+ allProperty.put(proName, info);
+ allTaskInstance.put(proName,preTaskInstance);
+ }
+ } else {
+ allProperty.put(proName, info);
+ allTaskInstance.put(proName,preTaskInstance);
+ }
}
}
}
- result.put(LOCAL_PARAMS, allParam);
+ if (allProperty.size() > 0) {
+
taskInstance.setVarPool(JSONUtils.toJsonString(allProperty.values()));
+ }
}
- return JSONUtils.toJsonString(result);
}
private void submitPostNode(String parentNodeName) {
Review comment:
>
>
> Please comfirm if it is reasonable:
> VarPoolUtils.convertVarPoolToMap(propToValue, processInstance.getVarPool())
This part is useless ,delete it.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]