ljjjjjjjjjjjjjjjjjjjjj commented on issue #13637:
URL:
https://github.com/apache/dolphinscheduler/issues/13637#issuecomment-1465476783
@Override
@Transactional
public ProcessInstance handleCommand(String host,
Command command) throws
CronParseException, CodeGenerateException {
ProcessInstance processInstance =
**constructProcessInstance**(command, host);
// cannot construct process instance, return null
if (processInstance == null) {
logger.error("scan command, command parameter is error: {}",
command);
moveToErrorCommand(command, "process instance is null");
return null;
}
processInstance.setCommandType(command.getCommandType());
processInstance.addHistoryCmd(command.getCommandType());
// if the processDefinition is serial
ProcessDefinition processDefinition =
this.findProcessDefinition(processInstance.getProcessDefinitionCode(),
processInstance.getProcessDefinitionVersion());
if (processDefinition.getExecutionType().typeIsSerial()) {
saveSerialProcess(processInstance, processDefinition);
if (processInstance.getState() !=
WorkflowExecutionStatus.SUBMITTED_SUCCESS) {
setSubProcessParam(processInstance);
deleteCommandWithCheck(command.getId());
return null;
}
} else {
processInstanceDao.upsertProcessInstance(processInstance);
}
setSubProcessParam(processInstance);
deleteCommandWithCheck(command.getId());
return processInstance;
}
protected @Nullable ProcessInstance constructProcessInstance(Command
command,
String
host) throws CronParseException, CodeGenerateException {
ProcessInstance processInstance;
ProcessDefinition processDefinition;
CommandType commandType = command.getCommandType();
processDefinition =
this.findProcessDefinition(command.getProcessDefinitionCode(),
command.getProcessDefinitionVersion());
if (processDefinition == null) {
logger.error("cannot find the work process define! define code :
{}", command.getProcessDefinitionCode());
throw new IllegalArgumentException("Cannot find the process
definition for this workflowInstance");
}
Map<String, String> cmdParam =
JSONUtils.toMap(command.getCommandParam());
int processInstanceId = command.getProcessInstanceId();
if (processInstanceId == 0) {
processInstance = generateNewProcessInstance(processDefinition,
command, cmdParam);
} else {
processInstance =
this.findProcessInstanceDetailById(processInstanceId).orElse(null);
if (processInstance == null) {
return null;
}
}
if (cmdParam != null) { **//this is the point, RECOVER_SERIAL_WAIT
command do not have the orignal params, and deleted global params in the
record**
CommandType commandTypeIfComplement =
getCommandTypeIfComplement(processInstance, command);
// reset global params while repeat running is needed by cmdParam
if (commandTypeIfComplement == CommandType.REPEAT_RUNNING) {
setGlobalParamIfCommanded(processDefinition, cmdParam);
}
// time zone
String timezoneId = cmdParam.get(Constants.SCHEDULE_TIMEZONE);
// Recalculate global parameters after rerun.
String globalParams =
curingGlobalParamsService.curingGlobalParams(processInstance.getId(),
processDefinition.getGlobalParamMap(),
processDefinition.getGlobalParamList(),
commandTypeIfComplement,
processInstance.getScheduleTime(), timezoneId);
processInstance.setGlobalParams(globalParams);
processInstance.setProcessDefinition(processDefinition);
}
// reset command parameter
if (processInstance.getCommandParam() != null) {
Map<String, String> processCmdParam =
JSONUtils.toMap(processInstance.getCommandParam());
processCmdParam.forEach((key, value) -> {
if (!cmdParam.containsKey(key)) {
cmdParam.put(key, value);
}
});
}
// reset command parameter if sub process
if (cmdParam.containsKey(CommandKeyConstants.CMD_PARAM_SUB_PROCESS))
{
processInstance.setCommandParam(command.getCommandParam());
}
if (Boolean.FALSE.equals(checkCmdParam(command, cmdParam))) {
logger.error("command parameter check failed!");
return null;
}
if (command.getScheduleTime() != null) {
processInstance.setScheduleTime(command.getScheduleTime());
}
processInstance.setHost(host);
processInstance.setRestartTime(new Date());
WorkflowExecutionStatus runStatus =
WorkflowExecutionStatus.RUNNING_EXECUTION;
int runTime = processInstance.getRunTimes();
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]