This is an automated email from the ASF dual-hosted git repository.

kerwin pushed a commit to branch 3.1.7-prepare
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git


The following commit(s) were added to refs/heads/3.1.7-prepare by this push:
     new 9b094e980b cherry-pick [Bug-13951 ][dolphinscheduler-service] 
StartParams is not applied when task is 
failover(RECOVER_TOLERANCE_FAULT_PROCESS CommandType) #13958
9b094e980b is described below

commit 9b094e980bc6f67f0e5b9216cc8f38cf22809a8c
Author: Drake Youngkun Min <[email protected]>
AuthorDate: Wed May 10 14:02:21 2023 +0900

    cherry-pick [Bug-13951 ][dolphinscheduler-service] StartParams is not 
applied when task is failover(RECOVER_TOLERANCE_FAULT_PROCESS CommandType) 
#13958
---
 .../service/process/ProcessServiceImpl.java        | 19 ++++++++++++----
 .../service/process/ProcessServiceTest.java        | 25 ++++++++++++++++++++++
 2 files changed, 40 insertions(+), 4 deletions(-)

diff --git 
a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java
 
b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java
index 9a5780aaf2..93fc7e2490 100644
--- 
a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java
+++ 
b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java
@@ -938,8 +938,9 @@ public class ProcessServiceImpl implements ProcessService {
         }
         if (cmdParam != null) {
             CommandType commandTypeIfComplement = 
getCommandTypeIfComplement(processInstance, command);
-            // reset global params while repeat running is needed by cmdParam
-            if (commandTypeIfComplement == CommandType.REPEAT_RUNNING) {
+            // reset global params while repeat running and recover tolerance 
fault process is needed by cmdParam
+            if (commandTypeIfComplement == CommandType.REPEAT_RUNNING ||
+                    commandTypeIfComplement == 
CommandType.RECOVER_TOLERANCE_FAULT_PROCESS) {
                 setGlobalParamIfCommanded(processDefinition, cmdParam);
             }
 
@@ -2081,8 +2082,7 @@ public class ProcessServiceImpl implements ProcessService 
{
         cmd.setProcessDefinitionCode(processDefinition.getCode());
         cmd.setProcessDefinitionVersion(processDefinition.getVersion());
         cmd.setProcessInstanceId(processInstance.getId());
-        cmd.setCommandParam(
-                String.format("{\"%s\":%d}", 
CMD_PARAM_RECOVER_PROCESS_ID_STRING, processInstance.getId()));
+        
cmd.setCommandParam(JSONUtils.toJsonString(createCommandParams(processInstance)));
         cmd.setExecutorId(processInstance.getExecutorId());
         cmd.setCommandType(CommandType.RECOVER_TOLERANCE_FAULT_PROCESS);
         
cmd.setProcessInstancePriority(processInstance.getProcessInstancePriority());
@@ -3202,4 +3202,15 @@ public class ProcessServiceImpl implements 
ProcessService {
             }
         }
     }
+
+    private Map<String, Object> createCommandParams(ProcessInstance 
processInstance) {
+        Map<String, Object> commandMap =
+                JSONUtils.parseObject(processInstance.getCommandParam(), new 
TypeReference<Map<String, Object>>() {
+                });
+        Map<String, Object> recoverFailoverCommandParams = new HashMap<>();
+        Optional.ofNullable(MapUtils.getObject(commandMap, 
CMD_PARAM_START_PARAMS))
+                .ifPresent(startParams -> 
recoverFailoverCommandParams.put(CMD_PARAM_START_PARAMS, startParams));
+        recoverFailoverCommandParams.put(CMD_PARAM_RECOVER_PROCESS_ID_STRING, 
processInstance.getId());
+        return recoverFailoverCommandParams;
+    }
 }
diff --git 
a/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/process/ProcessServiceTest.java
 
b/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/process/ProcessServiceTest.java
index 3e379ebcf4..d44c8c01c5 100644
--- 
a/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/process/ProcessServiceTest.java
+++ 
b/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/process/ProcessServiceTest.java
@@ -477,6 +477,31 @@ public class ProcessServiceTest {
         Mockito.when(commandMapper.deleteById(9)).thenReturn(1);
         ProcessInstance processInstance10 = processService.handleCommand(host, 
command9);
         Assert.assertTrue(processInstance10 != null);
+
+        // build command same as 
processService.processNeedFailoverProcessInstances(processInstance);
+        Command command12 = new Command();
+        command12.setId(12);
+        command12.setProcessDefinitionCode(definitionCode);
+        command12.setProcessDefinitionVersion(definitionVersion);
+        command12.setProcessInstanceId(processInstanceId);
+        command12.setCommandType(CommandType.RECOVER_TOLERANCE_FAULT_PROCESS);
+        HashMap<String, String> startParams12 = new HashMap<>();
+        startParams12.put("startParam11", "testStartParam11");
+        HashMap<String, String> commandParams12 = new HashMap<>();
+        commandParams12.put(CMD_PARAM_START_PARAMS, 
JSONUtils.toJsonString(startParams12));
+        commandParams12.put("ProcessInstanceId", "222");
+        command12.setCommandParam(JSONUtils.toJsonString(commandParams12));
+        
Mockito.when(processInstanceMapper.queryDetailById(222)).thenReturn(processInstance);
+        Mockito.when(commandMapper.deleteById(12)).thenReturn(1);
+        Mockito.when(curingGlobalParamsService.curingGlobalParams(222,
+                processDefinition.getGlobalParamMap(),
+                processDefinition.getGlobalParamList(),
+                CommandType.RECOVER_TOLERANCE_FAULT_PROCESS,
+                processInstance.getScheduleTime(), 
null)).thenReturn("\"testStartParam11\"");
+        ProcessInstance processInstance13 = processService.handleCommand(host, 
command12);
+        Assert.assertNotNull(processInstance13);
+        Assert.assertNotNull(processInstance13.getGlobalParams());
+        
Assert.assertTrue(processInstance13.getGlobalParams().contains("\"testStartParam11\""));
     }
 
     @Test(expected = ServiceException.class)

Reply via email to