This is an automated email from the ASF dual-hosted git repository.

chufenggao pushed a commit to branch 3.0.2-prepare
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git


The following commit(s) were added to refs/heads/3.0.2-prepare by this push:
     new 1105a05fae [Improvement][Master] Construct processInstance may NPE 
when master handling command (#12056) (#12776)
1105a05fae is described below

commit 1105a05fae76a965b1e7e305df0eaf0aad466c11
Author: Eric Gao <[email protected]>
AuthorDate: Mon Nov 7 17:02:20 2022 +0800

    [Improvement][Master] Construct processInstance may NPE when master 
handling command (#12056) (#12776)
    
    Co-authored-by: xuhaihui <[email protected]>
    Co-authored-by: xuhhui <[email protected]>
    Co-authored-by: xuhaihui <[email protected]>
---
 .../service/process/ProcessServiceImpl.java        | 43 +++++++++++++---------
 1 file changed, 25 insertions(+), 18 deletions(-)

diff --git 
a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java
 
b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java
index 81a70939ac..c552740021 100644
--- 
a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java
+++ 
b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java
@@ -909,6 +909,9 @@ public class ProcessServiceImpl implements ProcessService {
             throw new IllegalArgumentException("Cannot find the process 
definition for this workflowInstance");
         }
         Map<String, String> cmdParam = 
JSONUtils.toMap(command.getCommandParam());
+        if(cmdParam == null){
+            cmdParam = new HashMap<>();
+        }
         int processInstanceId = command.getProcessInstanceId();
         if (processInstanceId == 0) {
             processInstance = generateNewProcessInstance(processDefinition, 
command, cmdParam);
@@ -918,35 +921,37 @@ public class ProcessServiceImpl implements ProcessService 
{
                 return null;
             }
         }
-        if (cmdParam != null) {
-            CommandType commandTypeIfComplement = 
getCommandTypeIfComplement(processInstance, command);
-            // reset global params while repeat running is needed by cmdParam
-            if (commandTypeIfComplement == CommandType.REPEAT_RUNNING) {
-                setGlobalParamIfCommanded(processDefinition, cmdParam);
-            }
 
-            // time zone
-            String timezoneId = cmdParam.get(Constants.SCHEDULE_TIMEZONE);
+        CommandType commandTypeIfComplement = 
getCommandTypeIfComplement(processInstance, command);
+        // reset global params while repeat running is needed by cmdParam
+        if (commandTypeIfComplement == CommandType.REPEAT_RUNNING) {
+            setGlobalParamIfCommanded(processDefinition, cmdParam);
+        }
 
-            // Recalculate global parameters after rerun.
-            processInstance.setGlobalParams(ParameterUtils.curingGlobalParams(
+        // time zone
+        String timezoneId = cmdParam.get(Constants.SCHEDULE_TIMEZONE);
+
+        // Recalculate global parameters after rerun.
+        String globalParams = ParameterUtils.curingGlobalParams(
                 processDefinition.getGlobalParamMap(),
                 processDefinition.getGlobalParamList(),
                 commandTypeIfComplement,
-                processInstance.getScheduleTime(), timezoneId));
-            processInstance.setProcessDefinition(processDefinition);
-        }
-        //reset command parameter
+                processInstance.getScheduleTime(), timezoneId);
+        processInstance.setGlobalParams(globalParams);
+        processInstance.setProcessDefinition(processDefinition);
+
+        // reset command parameter
         if (processInstance.getCommandParam() != null) {
             Map<String, String> processCmdParam = 
JSONUtils.toMap(processInstance.getCommandParam());
+            Map<String, String> finalCmdParam = cmdParam;
             processCmdParam.forEach((key, value) -> {
-                if (!cmdParam.containsKey(key)) {
-                    cmdParam.put(key, value);
+                if (!finalCmdParam.containsKey(key)) {
+                    finalCmdParam.put(key, value);
                 }
             });
         }
         // reset command parameter if sub process
-        if (cmdParam != null && 
cmdParam.containsKey(Constants.CMD_PARAM_SUB_PROCESS)) {
+        if (cmdParam.containsKey(Constants.CMD_PARAM_SUB_PROCESS)) {
             processInstance.setCommandParam(command.getCommandParam());
         }
         if (Boolean.FALSE.equals(checkCmdParam(command, cmdParam))) {
@@ -995,7 +1000,9 @@ public class ProcessServiceImpl implements ProcessService {
                     // initialize the pause state
                     initTaskInstance(this.findTaskInstanceById(taskId));
                 }
-                cmdParam.put(Constants.CMD_PARAM_RECOVERY_START_NODE_STRING, 
String.join(",", convertIntListToString(suspendedNodeList)));
+
+                cmdParam.put(Constants.CMD_PARAM_RECOVERY_START_NODE_STRING,
+                    String.join(Constants.COMMA, 
convertIntListToString(stopNodeList)));
                 
processInstance.setCommandParam(JSONUtils.toJsonString(cmdParam));
                 processInstance.setRunTimes(runTime + 1);
                 break;

Reply via email to