ljjjjjjjjjjjjjjjjjjjjj commented on issue #13637:
URL: 
https://github.com/apache/dolphinscheduler/issues/13637#issuecomment-1465476783

       @Override
       @Transactional
       public ProcessInstance handleCommand(String host,
                                            Command command) throws 
CronParseException, CodeGenerateException {
           ProcessInstance processInstance = 
**constructProcessInstance**(command, host);
           // cannot construct process instance, return null
           if (processInstance == null) {
               logger.error("scan command, command parameter is error: {}", 
command);
               moveToErrorCommand(command, "process instance is null");
               return null;
           }
           processInstance.setCommandType(command.getCommandType());
           processInstance.addHistoryCmd(command.getCommandType());
           // if the processDefinition is serial
           ProcessDefinition processDefinition = 
this.findProcessDefinition(processInstance.getProcessDefinitionCode(),
                   processInstance.getProcessDefinitionVersion());
           if (processDefinition.getExecutionType().typeIsSerial()) {
               saveSerialProcess(processInstance, processDefinition);
               if (processInstance.getState() != 
WorkflowExecutionStatus.SUBMITTED_SUCCESS) {
                   setSubProcessParam(processInstance);
                   deleteCommandWithCheck(command.getId());
                   return null;
               }
           } else {
               processInstanceDao.upsertProcessInstance(processInstance);
           }
           setSubProcessParam(processInstance);
           deleteCommandWithCheck(command.getId());
           return processInstance;
       }
   
   
    protected @Nullable ProcessInstance constructProcessInstance(Command 
command,
                                                                    String 
host) throws CronParseException, CodeGenerateException {
           ProcessInstance processInstance;
           ProcessDefinition processDefinition;
           CommandType commandType = command.getCommandType();
   
           processDefinition =
                   
this.findProcessDefinition(command.getProcessDefinitionCode(), 
command.getProcessDefinitionVersion());
           if (processDefinition == null) {
               logger.error("cannot find the work process define! define code : 
{}", command.getProcessDefinitionCode());
               throw new IllegalArgumentException("Cannot find the process 
definition for this workflowInstance");
           }
           Map<String, String> cmdParam = 
JSONUtils.toMap(command.getCommandParam());
           int processInstanceId = command.getProcessInstanceId();
           if (processInstanceId == 0) {
               processInstance = generateNewProcessInstance(processDefinition, 
command, cmdParam);
           } else {
               processInstance = 
this.findProcessInstanceDetailById(processInstanceId).orElse(null);
               if (processInstance == null) {
                   return null;
               }
           }
           if (cmdParam != null) { **//this is the point, RECOVER_SERIAL_WAIT 
command do not have the orignal params, and deleted global params in the 
record** 
               CommandType commandTypeIfComplement = 
getCommandTypeIfComplement(processInstance, command);
               // reset global params while repeat running is needed by cmdParam
               if (commandTypeIfComplement == CommandType.REPEAT_RUNNING) {
                   setGlobalParamIfCommanded(processDefinition, cmdParam);
               }
   
               // time zone
               String timezoneId = cmdParam.get(Constants.SCHEDULE_TIMEZONE);
   
               // Recalculate global parameters after rerun.
               String globalParams = 
curingGlobalParamsService.curingGlobalParams(processInstance.getId(),
                       processDefinition.getGlobalParamMap(),
                       processDefinition.getGlobalParamList(),
                       commandTypeIfComplement,
                       processInstance.getScheduleTime(), timezoneId);
               processInstance.setGlobalParams(globalParams);
               processInstance.setProcessDefinition(processDefinition);
           }
           // reset command parameter
           if (processInstance.getCommandParam() != null) {
               Map<String, String> processCmdParam = 
JSONUtils.toMap(processInstance.getCommandParam());
               processCmdParam.forEach((key, value) -> {
                   if (!cmdParam.containsKey(key)) {
                       cmdParam.put(key, value);
                   }
               });
           }
           // reset command parameter if sub process
           if (cmdParam.containsKey(CommandKeyConstants.CMD_PARAM_SUB_PROCESS)) 
{
               processInstance.setCommandParam(command.getCommandParam());
           }
           if (Boolean.FALSE.equals(checkCmdParam(command, cmdParam))) {
               logger.error("command parameter check failed!");
               return null;
           }
           if (command.getScheduleTime() != null) {
               processInstance.setScheduleTime(command.getScheduleTime());
           }
           processInstance.setHost(host);
           processInstance.setRestartTime(new Date());
           WorkflowExecutionStatus runStatus = 
WorkflowExecutionStatus.RUNNING_EXECUTION;
           int runTime = processInstance.getRunTimes();
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to