This is an automated email from the ASF dual-hosted git repository.

kerwin pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git


The following commit(s) were added to refs/heads/dev by this push:
     new 801216bd35 RECOVER_TOLERANCE_FAULT_PROCESS CommandType needs the start 
parameters (#13958)
801216bd35 is described below

commit 801216bd35634ed2b1bb1f0ea9fa80140fb17e78
Author: Drake Youngkun Min <[email protected]>
AuthorDate: Wed May 10 14:02:21 2023 +0900

    RECOVER_TOLERANCE_FAULT_PROCESS CommandType needs the start parameters 
(#13958)
---
 .../service/process/ProcessServiceImpl.java        | 20 +++++++++++++----
 .../service/process/ProcessServiceTest.java        | 25 ++++++++++++++++++++++
 2 files changed, 41 insertions(+), 4 deletions(-)

diff --git 
a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java
 
b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java
index de19938dd9..d0f56f329f 100644
--- 
a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java
+++ 
b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java
@@ -23,6 +23,7 @@ import static 
org.apache.dolphinscheduler.common.constants.CommandKeyConstants.C
 import static 
org.apache.dolphinscheduler.common.constants.CommandKeyConstants.CMD_PARAM_COMPLEMENT_DATA_START_DATE;
 import static 
org.apache.dolphinscheduler.common.constants.CommandKeyConstants.CMD_PARAM_EMPTY_SUB_PROCESS;
 import static 
org.apache.dolphinscheduler.common.constants.CommandKeyConstants.CMD_PARAM_RECOVER_PROCESS_ID_STRING;
+import static 
org.apache.dolphinscheduler.common.constants.CommandKeyConstants.CMD_PARAM_START_PARAMS;
 import static 
org.apache.dolphinscheduler.common.constants.CommandKeyConstants.CMD_PARAM_SUB_PROCESS;
 import static 
org.apache.dolphinscheduler.common.constants.CommandKeyConstants.CMD_PARAM_SUB_PROCESS_DEFINE_CODE;
 import static 
org.apache.dolphinscheduler.common.constants.CommandKeyConstants.CMD_PARAM_SUB_PROCESS_PARENT_INSTANCE_ID;
@@ -770,8 +771,9 @@ public class ProcessServiceImpl implements ProcessService {
         }
 
         CommandType commandTypeIfComplement = 
getCommandTypeIfComplement(processInstance, command);
-        // reset global params while repeat running is needed by cmdParam
-        if (commandTypeIfComplement == CommandType.REPEAT_RUNNING) {
+        // reset global params while repeat running and recover tolerance 
fault process is needed by cmdParam
+        if (commandTypeIfComplement == CommandType.REPEAT_RUNNING ||
+                commandTypeIfComplement == 
CommandType.RECOVER_TOLERANCE_FAULT_PROCESS) {
             setGlobalParamIfCommanded(processDefinition, cmdParam);
         }
 
@@ -1590,8 +1592,7 @@ public class ProcessServiceImpl implements ProcessService 
{
         
cmd.setProcessDefinitionCode(processInstance.getProcessDefinitionCode());
         
cmd.setProcessDefinitionVersion(processInstance.getProcessDefinitionVersion());
         cmd.setProcessInstanceId(processInstance.getId());
-        cmd.setCommandParam(
-                String.format("{\"%s\":%d}", 
CMD_PARAM_RECOVER_PROCESS_ID_STRING, processInstance.getId()));
+        
cmd.setCommandParam(JSONUtils.toJsonString(createCommandParams(processInstance)));
         cmd.setExecutorId(processInstance.getExecutorId());
         cmd.setCommandType(CommandType.RECOVER_TOLERANCE_FAULT_PROCESS);
         
cmd.setProcessInstancePriority(processInstance.getProcessInstancePriority());
@@ -2605,4 +2606,15 @@ public class ProcessServiceImpl implements 
ProcessService {
         triggerRelationService.saveCommandTrigger(commandId, 
processInstanceId);
     }
 
+    private Map<String, Object> createCommandParams(ProcessInstance 
processInstance) {
+        Map<String, Object> commandMap =
+                JSONUtils.parseObject(processInstance.getCommandParam(), new 
TypeReference<Map<String, Object>>() {
+                });
+        Map<String, Object> recoverFailoverCommandParams = new HashMap<>();
+        Optional.ofNullable(MapUtils.getObject(commandMap, 
CMD_PARAM_START_PARAMS))
+                .ifPresent(startParams -> 
recoverFailoverCommandParams.put(CMD_PARAM_START_PARAMS, startParams));
+        recoverFailoverCommandParams.put(CMD_PARAM_RECOVER_PROCESS_ID_STRING, 
processInstance.getId());
+        return recoverFailoverCommandParams;
+    }
+
 }
diff --git 
a/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/process/ProcessServiceTest.java
 
b/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/process/ProcessServiceTest.java
index 81ef7a214f..1e94407a4b 100644
--- 
a/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/process/ProcessServiceTest.java
+++ 
b/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/process/ProcessServiceTest.java
@@ -370,6 +370,31 @@ public class ProcessServiceTest {
         Mockito.when(commandMapper.deleteById(9)).thenReturn(1);
         ProcessInstance processInstance10 = processService.handleCommand(host, 
command9);
         Assertions.assertNotNull(processInstance10);
+
+        // build command same as 
processService.processNeedFailoverProcessInstances(processInstance);
+        Command command12 = new Command();
+        command12.setId(12);
+        command12.setProcessDefinitionCode(definitionCode);
+        command12.setProcessDefinitionVersion(definitionVersion);
+        command12.setProcessInstanceId(processInstanceId);
+        command12.setCommandType(CommandType.RECOVER_TOLERANCE_FAULT_PROCESS);
+        HashMap<String, String> startParams12 = new HashMap<>();
+        startParams12.put("startParam11", "testStartParam11");
+        HashMap<String, String> commandParams12 = new HashMap<>();
+        commandParams12.put(CMD_PARAM_START_PARAMS, 
JSONUtils.toJsonString(startParams12));
+        commandParams12.put("ProcessInstanceId", "222");
+        command12.setCommandParam(JSONUtils.toJsonString(commandParams12));
+        
Mockito.when(processInstanceMapper.queryDetailById(222)).thenReturn(processInstance);
+        Mockito.when(commandMapper.deleteById(12)).thenReturn(1);
+        Mockito.when(curingGlobalParamsService.curingGlobalParams(222,
+                processDefinition.getGlobalParamMap(),
+                processDefinition.getGlobalParamList(),
+                CommandType.RECOVER_TOLERANCE_FAULT_PROCESS,
+                processInstance.getScheduleTime(), 
null)).thenReturn("\"testStartParam11\"");
+        ProcessInstance processInstance13 = processService.handleCommand(host, 
command12);
+        Assertions.assertNotNull(processInstance13);
+        Assertions.assertNotNull(processInstance13.getGlobalParams());
+        
Assertions.assertTrue(processInstance13.getGlobalParams().contains("\"testStartParam11\""));
     }
 
     @Test

Reply via email to