wen-hemin commented on a change in pull request #4743:
URL:
https://github.com/apache/incubator-dolphinscheduler/pull/4743#discussion_r578103352
##########
File path:
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ProcessInstanceMapper.java
##########
@@ -221,4 +221,7 @@ ProcessInstance
queryLastManualProcess(@Param("processDefinitionId") int definit
@Param("processDefinitionId") int processDefinitionId,
@Param("states") int[] states);
+ int updateGlobalParamById(
Review comment:
GlobalParam -> GlobalParams
##########
File path:
dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/TaskExecuteResponseCommand.java
##########
@@ -68,7 +68,10 @@ public TaskExecuteResponseCommand(int taskInstanceId) {
* varPool string
*/
private String varPool;
-
+ /**
+ * task return result
+ */
+ private String result;
public void setVarPool(String varPool) {
Review comment:
Add blank line
##########
File path:
dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
##########
@@ -686,11 +691,6 @@ private ProcessInstance constructProcessInstance(Command
command, String host) {
} else {
processInstance =
this.findProcessInstanceDetailById(processInstanceId);
// Recalculate global parameters after rerun.
-
processInstance.setGlobalParams(ParameterUtils.curingGlobalParams(
- processDefinition.getGlobalParamMap(),
- processDefinition.getGlobalParamList(),
- getCommandTypeIfComplement(processInstance, command),
- processInstance.getScheduleTime()));
Review comment:
Why is it deleted here?
##########
File path:
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterExecThread.java
##########
@@ -540,13 +546,53 @@ private TaskInstance createTaskInstance(ProcessInstance
processInstance, String
} else {
taskInstance.setWorkerGroup(taskWorkerGroup);
}
-
+ //get process global
+ setProcessGlobal(taskNode, taskInstance);
// delay execution time
taskInstance.setDelayTime(taskNode.getDelayTime());
}
return taskInstance;
}
+ private void setProcessGlobal(TaskNode taskNode, TaskInstance
taskInstance) {
+ String globalParams = this.processInstance.getGlobalParams();
+ if (StringUtils.isNotEmpty(globalParams)) {
+ Map<String, String> globalMap = getGlobalParamMap(globalParams);
+ if (globalMap != null) {
Review comment:
add globalMap.size() != 0
##########
File path:
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/AbstractCommandExecutor.java
##########
@@ -84,6 +85,11 @@
*/
protected final List<String> logBuffer;
+ /**
+ * SHELL result string
+ */
+ protected String resultString;
Review comment:
resultString is inconsistent with the previous name
##########
File path:
dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
##########
@@ -161,10 +166,10 @@
/**
* handle Command (construct ProcessInstance from Command) , wrapped in
transaction
*
- * @param logger logger
- * @param host host
+ * @param logger logger
Review comment:
Formatting error
##########
File path:
dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ProcessInstanceMapper.xml
##########
@@ -219,5 +219,9 @@
</foreach>
order by id asc
</select>
-
+ <update id="updateGlobalParamById">
Review comment:
GlobalParam -> GlobalParams
##########
File path:
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterExecThread.java
##########
@@ -952,6 +998,7 @@ private void runProcess() {
// node success , post node submit
if (task.getState() == ExecutionStatus.SUCCESS) {
processInstance.setVarPool(task.getVarPool());
+ processInstance =
processService.findProcessInstanceById(processInstance.getId());
Review comment:
The query db here refreshes the processInstance, The set method of the
previous line of code will not take effect.
##########
File path:
dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
##########
@@ -1531,25 +1531,85 @@ public int updateProcessInstance(Integer
processInstanceId, String processJson,
/**
* change task state
*
- * @param state state
- * @param endTime endTime
+ * @param state state
+ * @param endTime endTime
* @param taskInstId taskInstId
- * @param varPool varPool
+ * @param varPool varPool
*/
public void changeTaskState(TaskInstance taskInstance, ExecutionStatus
state,
Date endTime,
int processId,
String appIds,
int taskInstId,
- String varPool) {
+ String varPool,
+ String result) {
taskInstance.setPid(processId);
taskInstance.setAppLink(appIds);
taskInstance.setState(state);
taskInstance.setEndTime(endTime);
taskInstance.setVarPool(varPool);
+ changeOutParam(result, taskInstance);
saveTaskInstance(taskInstance);
}
+ public void changeOutParam(String result, TaskInstance taskInstance) {
+ if (StringUtils.isEmpty(result)) {
+ return;
+ }
+ List<Map<String, String>> workerResultParam =
getListMapByString(result);
+ if (CollectionUtils.isEmpty(workerResultParam)) {
+ return;
+ }
+ //if the result more than one line,just get the first .
+ Map<String, String> row = workerResultParam.get(0);
+ if (row == null || row.size() == 0) {
+ return;
+ }
+ TaskNode taskNode = JSONUtils.parseObject(taskInstance.getTaskJson(),
TaskNode.class);
+ Map<String, Object> taskParams = JSONUtils.toMap(taskNode.getParams(),
String.class, Object.class);
+ Object localParams = taskParams.get(LOCAL_PARAMS);
+ if (localParams == null) {
+ return;
+ }
+ ProcessInstance processInstance =
this.processInstanceMapper.queryDetailById(taskInstance.getProcessInstanceId());
+ List<Property> params4Process =
JSONUtils.toList(processInstance.getGlobalParams(), Property.class);
Review comment:
params4Process -> params4Property
##########
File path:
dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
##########
@@ -1531,25 +1531,85 @@ public int updateProcessInstance(Integer
processInstanceId, String processJson,
/**
* change task state
*
- * @param state state
- * @param endTime endTime
+ * @param state state
+ * @param endTime endTime
* @param taskInstId taskInstId
- * @param varPool varPool
+ * @param varPool varPool
*/
public void changeTaskState(TaskInstance taskInstance, ExecutionStatus
state,
Date endTime,
int processId,
String appIds,
int taskInstId,
- String varPool) {
+ String varPool,
+ String result) {
taskInstance.setPid(processId);
taskInstance.setAppLink(appIds);
taskInstance.setState(state);
taskInstance.setEndTime(endTime);
taskInstance.setVarPool(varPool);
+ changeOutParam(result, taskInstance);
saveTaskInstance(taskInstance);
}
+ public void changeOutParam(String result, TaskInstance taskInstance) {
+ if (StringUtils.isEmpty(result)) {
+ return;
+ }
+ List<Map<String, String>> workerResultParam =
getListMapByString(result);
Review comment:
Why is it not a Map here
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]