This is an automated email from the ASF dual-hosted git repository.
kerwin pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git
The following commit(s) were added to refs/heads/dev by this push:
new 801216bd35 RECOVER_TOLERANCE_FAULT_PROCESS CommandType needs the start
parameters (#13958)
801216bd35 is described below
commit 801216bd35634ed2b1bb1f0ea9fa80140fb17e78
Author: Drake Youngkun Min <[email protected]>
AuthorDate: Wed May 10 14:02:21 2023 +0900
RECOVER_TOLERANCE_FAULT_PROCESS CommandType needs the start parameters
(#13958)
---
.../service/process/ProcessServiceImpl.java | 20 +++++++++++++----
.../service/process/ProcessServiceTest.java | 25 ++++++++++++++++++++++
2 files changed, 41 insertions(+), 4 deletions(-)
diff --git
a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java
b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java
index de19938dd9..d0f56f329f 100644
---
a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java
+++
b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java
@@ -23,6 +23,7 @@ import static
org.apache.dolphinscheduler.common.constants.CommandKeyConstants.C
import static
org.apache.dolphinscheduler.common.constants.CommandKeyConstants.CMD_PARAM_COMPLEMENT_DATA_START_DATE;
import static
org.apache.dolphinscheduler.common.constants.CommandKeyConstants.CMD_PARAM_EMPTY_SUB_PROCESS;
import static
org.apache.dolphinscheduler.common.constants.CommandKeyConstants.CMD_PARAM_RECOVER_PROCESS_ID_STRING;
+import static
org.apache.dolphinscheduler.common.constants.CommandKeyConstants.CMD_PARAM_START_PARAMS;
import static
org.apache.dolphinscheduler.common.constants.CommandKeyConstants.CMD_PARAM_SUB_PROCESS;
import static
org.apache.dolphinscheduler.common.constants.CommandKeyConstants.CMD_PARAM_SUB_PROCESS_DEFINE_CODE;
import static
org.apache.dolphinscheduler.common.constants.CommandKeyConstants.CMD_PARAM_SUB_PROCESS_PARENT_INSTANCE_ID;
@@ -770,8 +771,9 @@ public class ProcessServiceImpl implements ProcessService {
}
CommandType commandTypeIfComplement =
getCommandTypeIfComplement(processInstance, command);
- // reset global params while repeat running is needed by cmdParam
- if (commandTypeIfComplement == CommandType.REPEAT_RUNNING) {
+ // reset global params while repeat running and recover tolerance
fault process is needed by cmdParam
+ if (commandTypeIfComplement == CommandType.REPEAT_RUNNING ||
+ commandTypeIfComplement ==
CommandType.RECOVER_TOLERANCE_FAULT_PROCESS) {
setGlobalParamIfCommanded(processDefinition, cmdParam);
}
@@ -1590,8 +1592,7 @@ public class ProcessServiceImpl implements ProcessService
{
cmd.setProcessDefinitionCode(processInstance.getProcessDefinitionCode());
cmd.setProcessDefinitionVersion(processInstance.getProcessDefinitionVersion());
cmd.setProcessInstanceId(processInstance.getId());
- cmd.setCommandParam(
- String.format("{\"%s\":%d}",
CMD_PARAM_RECOVER_PROCESS_ID_STRING, processInstance.getId()));
+
cmd.setCommandParam(JSONUtils.toJsonString(createCommandParams(processInstance)));
cmd.setExecutorId(processInstance.getExecutorId());
cmd.setCommandType(CommandType.RECOVER_TOLERANCE_FAULT_PROCESS);
cmd.setProcessInstancePriority(processInstance.getProcessInstancePriority());
@@ -2605,4 +2606,15 @@ public class ProcessServiceImpl implements
ProcessService {
triggerRelationService.saveCommandTrigger(commandId,
processInstanceId);
}
+ private Map<String, Object> createCommandParams(ProcessInstance
processInstance) {
+ Map<String, Object> commandMap =
+ JSONUtils.parseObject(processInstance.getCommandParam(), new
TypeReference<Map<String, Object>>() {
+ });
+ Map<String, Object> recoverFailoverCommandParams = new HashMap<>();
+ Optional.ofNullable(MapUtils.getObject(commandMap,
CMD_PARAM_START_PARAMS))
+ .ifPresent(startParams ->
recoverFailoverCommandParams.put(CMD_PARAM_START_PARAMS, startParams));
+ recoverFailoverCommandParams.put(CMD_PARAM_RECOVER_PROCESS_ID_STRING,
processInstance.getId());
+ return recoverFailoverCommandParams;
+ }
+
}
diff --git
a/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/process/ProcessServiceTest.java
b/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/process/ProcessServiceTest.java
index 81ef7a214f..1e94407a4b 100644
---
a/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/process/ProcessServiceTest.java
+++
b/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/process/ProcessServiceTest.java
@@ -370,6 +370,31 @@ public class ProcessServiceTest {
Mockito.when(commandMapper.deleteById(9)).thenReturn(1);
ProcessInstance processInstance10 = processService.handleCommand(host,
command9);
Assertions.assertNotNull(processInstance10);
+
+ // build command same as
processService.processNeedFailoverProcessInstances(processInstance);
+ Command command12 = new Command();
+ command12.setId(12);
+ command12.setProcessDefinitionCode(definitionCode);
+ command12.setProcessDefinitionVersion(definitionVersion);
+ command12.setProcessInstanceId(processInstanceId);
+ command12.setCommandType(CommandType.RECOVER_TOLERANCE_FAULT_PROCESS);
+ HashMap<String, String> startParams12 = new HashMap<>();
+ startParams12.put("startParam11", "testStartParam11");
+ HashMap<String, String> commandParams12 = new HashMap<>();
+ commandParams12.put(CMD_PARAM_START_PARAMS,
JSONUtils.toJsonString(startParams12));
+ commandParams12.put("ProcessInstanceId", "222");
+ command12.setCommandParam(JSONUtils.toJsonString(commandParams12));
+
Mockito.when(processInstanceMapper.queryDetailById(222)).thenReturn(processInstance);
+ Mockito.when(commandMapper.deleteById(12)).thenReturn(1);
+ Mockito.when(curingGlobalParamsService.curingGlobalParams(222,
+ processDefinition.getGlobalParamMap(),
+ processDefinition.getGlobalParamList(),
+ CommandType.RECOVER_TOLERANCE_FAULT_PROCESS,
+ processInstance.getScheduleTime(),
null)).thenReturn("\"testStartParam11\"");
+ ProcessInstance processInstance13 = processService.handleCommand(host,
command12);
+ Assertions.assertNotNull(processInstance13);
+ Assertions.assertNotNull(processInstance13.getGlobalParams());
+
Assertions.assertTrue(processInstance13.getGlobalParams().contains("\"testStartParam11\""));
}
@Test