This is an automated email from the ASF dual-hosted git repository. wenjun pushed a commit to branch dev_wenjun_coronationTask in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git
commit d92c65b72eec8e9d0389ada55040f1882d1e9c88 Author: Wenjun Ruan <[email protected]> AuthorDate: Mon Oct 31 23:13:48 2022 +0800 Fix coronation task cannot recovery --- .../api/controller/CoronationTaskController.java | 13 ++++++++++++- .../api/dto/request/CoronationTaskListingRequest.java | 2 ++ .../api/dto/request/CoronationTaskSubmitRequest.java | 4 +++- .../api/service/impl/CoronationTaskServiceImpl.java | 5 +++-- .../api/service/impl/ProcessInstanceServiceImpl.java | 12 ++++++++++++ .../dolphinscheduler/dao/dto/TaskSimpleInfoDTO.java | 4 +++- .../apache/dolphinscheduler/dao/mapper/CommandMapper.java | 7 ++++--- .../dolphinscheduler/dao/mapper/CoronationTaskMapper.java | 1 + .../dolphinscheduler/dao/mapper/IsolationTaskMapper.java | 1 + .../dao/repository/CoronationTaskDao.java | 1 + .../dolphinscheduler/dao/repository/IsolationTaskDao.java | 2 ++ .../dao/repository/impl/CommandDaoImpl.java | 2 +- .../dao/repository/impl/CoronationTaskDaoImpl.java | 5 +++++ .../dao/repository/impl/IsolationTaskDaoImpl.java | 5 +++++ .../apache/dolphinscheduler/dao/mapper/CommandMapper.xml | 9 +++++++++ .../dolphinscheduler/dao/mapper/CoronationTaskMapper.xml | 6 ++++++ .../dolphinscheduler/dao/mapper/IsolationTaskMapper.xml | 6 ++++++ .../dolphinscheduler/dao/mapper/ProcessInstanceMapper.xml | 6 ++++++ .../dolphinscheduler/dao/mapper/TaskInstanceMapper.xml | 15 ++++++++++++--- .../coronation/RefreshCoronationMetadataProcessor.java | 2 +- .../server/master/rpc/MasterRPCClient.java | 4 ++++ .../server/master/rpc/MasterRPCServer.java | 1 + .../server/master/runner/WorkflowExecuteRunnable.java | 15 ++++++++++++++- .../server/master/service/CoronationMetadataManager.java | 14 +++++++------- .../service/process/ProcessServiceImpl.java | 3 ++- 25 files changed, 123 insertions(+), 22 deletions(-) diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/CoronationTaskController.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/CoronationTaskController.java index 80ee7a1107..7540c118b9 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/CoronationTaskController.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/CoronationTaskController.java @@ -9,6 +9,7 @@ import org.apache.dolphinscheduler.api.exceptions.ApiException; import org.apache.dolphinscheduler.api.service.CoronationTaskService; import org.apache.dolphinscheduler.api.utils.PageInfo; import org.apache.dolphinscheduler.api.utils.Result; +import org.apache.dolphinscheduler.api.vo.CoronationTaskExcelImportVO; import org.apache.dolphinscheduler.api.vo.CoronationTaskParseVO; import org.apache.dolphinscheduler.common.Constants; import org.apache.dolphinscheduler.dao.dto.CoronationTaskDTO; @@ -24,6 +25,7 @@ import org.springframework.web.bind.annotation.PutMapping; import org.springframework.web.bind.annotation.RequestAttribute; import org.springframework.web.bind.annotation.RequestBody; import org.springframework.web.bind.annotation.RequestMapping; +import org.springframework.web.bind.annotation.RequestParam; import org.springframework.web.bind.annotation.ResponseStatus; import org.springframework.web.bind.annotation.RestController; import springfox.documentation.annotations.ApiIgnore; @@ -83,7 +85,16 @@ public class CoronationTaskController { @AccessLogAnnotation(ignoreRequestArgs = "loginUser") public Result<PageInfo<CoronationTaskDTO>> listingCoronationTasks(@ApiIgnore @RequestAttribute(value = Constants.SESSION_USER) User loginUser, @PathVariable("projectCode") long projectCode, - @RequestBody CoronationTaskListingRequest request) { + @RequestParam(required = false) String workflowInstanceName, + @RequestParam(required = false) String taskName, + @RequestParam Integer pageNo, + @RequestParam Integer pageSize) { + CoronationTaskListingRequest request = CoronationTaskListingRequest.builder() + .workflowInstanceName(workflowInstanceName) + .taskName(taskName) + .pageNo(pageNo) + .pageSize(pageSize) + .build(); return Result.success(coronationTaskService.listingCoronationTasks(loginUser, projectCode, request)); } diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/dto/request/CoronationTaskListingRequest.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/dto/request/CoronationTaskListingRequest.java index 7ecded2c1c..5e8a384e17 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/dto/request/CoronationTaskListingRequest.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/dto/request/CoronationTaskListingRequest.java @@ -1,12 +1,14 @@ package org.apache.dolphinscheduler.api.dto.request; import lombok.AllArgsConstructor; +import lombok.Builder; import lombok.Data; import lombok.NoArgsConstructor; import javax.validation.constraints.NotNull; @Data +@Builder @NoArgsConstructor @AllArgsConstructor public class CoronationTaskListingRequest { diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/dto/request/CoronationTaskSubmitRequest.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/dto/request/CoronationTaskSubmitRequest.java index 2597a1206e..77d9008ee5 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/dto/request/CoronationTaskSubmitRequest.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/dto/request/CoronationTaskSubmitRequest.java @@ -1,6 +1,7 @@ package org.apache.dolphinscheduler.api.dto.request; import lombok.AllArgsConstructor; +import lombok.Builder; import lombok.Data; import lombok.NoArgsConstructor; import org.apache.dolphinscheduler.api.vo.CoronationTaskParseVO; @@ -8,9 +9,10 @@ import org.apache.dolphinscheduler.api.vo.CoronationTaskParseVO; import java.util.List; @Data +@Builder @NoArgsConstructor @AllArgsConstructor public class CoronationTaskSubmitRequest { - private List<CoronationTaskParseVO> CoronationTasks; + private List<CoronationTaskParseVO> coronationTasks; } diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/CoronationTaskServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/CoronationTaskServiceImpl.java index a6c0691c8b..05269693cb 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/CoronationTaskServiceImpl.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/CoronationTaskServiceImpl.java @@ -31,6 +31,7 @@ import org.apache.dolphinscheduler.dao.entity.Project; import org.apache.dolphinscheduler.dao.entity.User; import org.apache.dolphinscheduler.dao.repository.CoronationTaskDao; import org.apache.dolphinscheduler.dao.repository.ProcessInstanceDao; +import org.apache.dolphinscheduler.dao.utils.DagHelper; import org.apache.dolphinscheduler.remote.command.Command; import org.apache.dolphinscheduler.remote.command.coronation.RefreshCoronationMetadataRequest; import org.apache.dolphinscheduler.remote.exceptions.RemotingException; @@ -89,7 +90,7 @@ public class CoronationTaskServiceImpl implements CoronationTaskService { throw new ServiceException(Status.CORONATION_TASK_PARSE_ERROR_TASK_NODE_NAME_IS_NOT_VALIDATED); } List<TaskSimpleInfoDTO> previousTaskNodeDTO = - workflowDAG.getPreviousNodes(Long.toString(vo.getTaskCode())) + DagHelper.getAllPreNodes(Long.toString(vo.getTaskCode()), workflowDAG) .stream() .map(previousNodeCode -> { TaskNode node = workflowDAG.getNode(previousNodeCode); @@ -126,7 +127,7 @@ public class CoronationTaskServiceImpl implements CoronationTaskService { List<CoronationTask> coronationTasks = vos.stream() .map(vo -> { - Set<String> previousNodes = workflowDAG.getPreviousNodes(vo.getTaskCode().toString()); + Set<String> previousNodes = DagHelper.getAllPreNodes(vo.getTaskCode().toString(), workflowDAG); Set<String> selectNodes = vo.getUpstreamTasks() .stream() .map(taskNode -> Long.toString(taskNode.getTaskCode())) diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessInstanceServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessInstanceServiceImpl.java index c5a84a2514..5aa0827221 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessInstanceServiceImpl.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessInstanceServiceImpl.java @@ -61,6 +61,8 @@ import org.apache.dolphinscheduler.dao.mapper.TaskDefinitionLogMapper; import org.apache.dolphinscheduler.dao.mapper.TaskDefinitionMapper; import org.apache.dolphinscheduler.dao.mapper.TaskInstanceMapper; import org.apache.dolphinscheduler.dao.mapper.TenantMapper; +import org.apache.dolphinscheduler.dao.repository.CoronationTaskDao; +import org.apache.dolphinscheduler.dao.repository.IsolationTaskDao; import org.apache.dolphinscheduler.dao.repository.ProcessInstanceDao; import org.apache.dolphinscheduler.plugin.task.api.enums.DependResult; import org.apache.dolphinscheduler.plugin.task.api.enums.ExecutionStatus; @@ -164,6 +166,12 @@ public class ProcessInstanceServiceImpl extends BaseServiceImpl implements Proce @Autowired private CuringParamsService curingGlobalParamsService; + @Autowired + private IsolationTaskDao isolationTaskDao; + + @Autowired + private CoronationTaskDao coronationTaskDao; + /** * return top n SUCCESS process instance order by running time which started between startTime and endTime */ @@ -656,6 +664,10 @@ public class ProcessInstanceServiceImpl extends BaseServiceImpl implements Proce processService.deleteWorkProcessMapByParentId(processInstanceId); processService.deleteWorkTaskInstanceByProcessInstanceId(processInstanceId); + // todo: send refresh RPC request + isolationTaskDao.deleteByWorkflowInstanceId(processInstanceId); + coronationTaskDao.deleteByWorkflowInstanceId(processInstanceId); + Map<String, Object> result = new HashMap<>(); if (delete > 0) { putMsg(result, Status.SUCCESS); diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/dto/TaskSimpleInfoDTO.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/dto/TaskSimpleInfoDTO.java index 9fe05ca9ca..a917a6014c 100644 --- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/dto/TaskSimpleInfoDTO.java +++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/dto/TaskSimpleInfoDTO.java @@ -1,15 +1,17 @@ package org.apache.dolphinscheduler.dao.dto; import lombok.AllArgsConstructor; +import lombok.Builder; import lombok.Data; import lombok.NoArgsConstructor; import org.apache.dolphinscheduler.plugin.task.api.enums.ExecutionStatus; @Data +@Builder @NoArgsConstructor @AllArgsConstructor public class TaskSimpleInfoDTO { - private String taskName; + private String taskNode; private long taskCode; } diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/CommandMapper.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/CommandMapper.java index fd1c8d7204..7066a3a9e1 100644 --- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/CommandMapper.java +++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/CommandMapper.java @@ -50,14 +50,15 @@ public interface CommandMapper extends BaseMapper<Command> { /** * query command page by slot + * * @return command list */ List<Command> queryCommandPageBySlot(@Param("limit") int limit, @Param("offset") int offset, @Param("masterCount") int masterCount, @Param("thisMasterSlot") int thisMasterSlot); - void batchInsertCommand(List<Command> commands); + void batchInsertCommand(@Param("commands") List<Command> commands); - List<Command> queryRecoveryCoronationCommandByWorkflowInstanceId(@Param("workflowInstanceId") long workflowInstanceId, - @Param("command_type") int commandType); + List<Command> queryCommandByWorkflowInstanceIdAndCommandType(@Param("workflowInstanceId") long workflowInstanceId, + @Param("commandType") int commandType); } diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/CoronationTaskMapper.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/CoronationTaskMapper.java index d5829de216..5aa7465303 100644 --- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/CoronationTaskMapper.java +++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/CoronationTaskMapper.java @@ -21,4 +21,5 @@ public interface CoronationTaskMapper extends BaseMapper<CoronationTask> { int queryAllCoronationTaskNumber(); + int deleteByWorkflowInstanceId(@Param("workflowInstanceId") Integer workflowInstanceId); } diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/IsolationTaskMapper.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/IsolationTaskMapper.java index 3bd1045682..7dd14299fd 100644 --- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/IsolationTaskMapper.java +++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/IsolationTaskMapper.java @@ -23,4 +23,5 @@ public interface IsolationTaskMapper extends BaseMapper<IsolationTask> { List<IsolationTask> queryAllIsolationTask(); + int deleteByWorkflowInstanceId(@Param("workflowInstanceId") Integer workflowInstanceId); } diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/CoronationTaskDao.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/CoronationTaskDao.java index 1893e8ffd6..f747f8bb47 100644 --- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/CoronationTaskDao.java +++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/CoronationTaskDao.java @@ -25,4 +25,5 @@ public interface CoronationTaskDao { int queryAllCoronationTaskNumber(); + int deleteByWorkflowInstanceId(Integer processInstanceId); } diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/IsolationTaskDao.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/IsolationTaskDao.java index b38b30569a..e9757cf63b 100644 --- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/IsolationTaskDao.java +++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/IsolationTaskDao.java @@ -30,4 +30,6 @@ public interface IsolationTaskDao { void insert(IsolationTask isolationTaskDTO); void batchInsert(List<IsolationTask> isolationTasks); + + int deleteByWorkflowInstanceId(Integer processInstanceId); } diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/CommandDaoImpl.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/CommandDaoImpl.java index 2365b5f2bb..f66e78d4bd 100644 --- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/CommandDaoImpl.java +++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/CommandDaoImpl.java @@ -26,7 +26,7 @@ public class CommandDaoImpl implements CommandDao { @Override public List<Command> queryRecoveryCoronationCommandByWorkflowInstanceId(long workflowInstanceId) { - return commandMapper.queryRecoveryCoronationCommandByWorkflowInstanceId(workflowInstanceId, + return commandMapper.queryCommandByWorkflowInstanceIdAndCommandType(workflowInstanceId, CommandType.RECOVERY_FROM_CORONATION_PAUSE_TASKS.getCode()); } } diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/CoronationTaskDaoImpl.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/CoronationTaskDaoImpl.java index 893b28fbbb..b2892781de 100644 --- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/CoronationTaskDaoImpl.java +++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/CoronationTaskDaoImpl.java @@ -62,4 +62,9 @@ public class CoronationTaskDaoImpl implements CoronationTaskDao { public int queryAllCoronationTaskNumber() { return coronationTaskMapper.queryAllCoronationTaskNumber(); } + + @Override + public int deleteByWorkflowInstanceId(Integer workflowInstanceId) { + return coronationTaskMapper.deleteByWorkflowInstanceId(workflowInstanceId); + } } diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/IsolationTaskDaoImpl.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/IsolationTaskDaoImpl.java index 0608adba0b..57d5b31de4 100644 --- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/IsolationTaskDaoImpl.java +++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/IsolationTaskDaoImpl.java @@ -71,4 +71,9 @@ public class IsolationTaskDaoImpl implements IsolationTaskDao { } isolationTaskMapper.batchInsert(isolationTasks); } + + @Override + public int deleteByWorkflowInstanceId(Integer workflowInstanceId) { + return isolationTaskMapper.deleteByWorkflowInstanceId(workflowInstanceId); + } } diff --git a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/CommandMapper.xml b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/CommandMapper.xml index 043a2827b6..15083ba52c 100644 --- a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/CommandMapper.xml +++ b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/CommandMapper.xml @@ -80,6 +80,7 @@ #{command.taskDependType}, #{command.failureStrategy}, #{command.warningType}, + #{command.warningGroupId}, #{command.scheduleTime}, #{command.startTime}, #{command.executorId}, @@ -91,4 +92,12 @@ ) </foreach> </insert> + + <select id="queryCommandByWorkflowInstanceIdAndCommandType" + resultType="org.apache.dolphinscheduler.dao.entity.Command"> + select * + from t_ds_command + where process_instance_id = #{workflowInstanceId} + and command_type = #{commandType} + </select> </mapper> diff --git a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/CoronationTaskMapper.xml b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/CoronationTaskMapper.xml index 0967fa71ec..fc7bfc9660 100644 --- a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/CoronationTaskMapper.xml +++ b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/CoronationTaskMapper.xml @@ -66,4 +66,10 @@ </foreach> </insert> + <delete id="deleteByWorkflowInstanceId"> + delete + from t_ds_coronation_task + where workflow_instance_id = #{workflowInstanceId} + </delete> + </mapper> \ No newline at end of file diff --git a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/IsolationTaskMapper.xml b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/IsolationTaskMapper.xml index 390591be26..a6f72bddc2 100644 --- a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/IsolationTaskMapper.xml +++ b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/IsolationTaskMapper.xml @@ -78,4 +78,10 @@ </foreach> </insert> + <delete id="deleteByWorkflowInstanceId"> + delete + from t_ds_isolation_task + where workflow_instance_id = #{workflowInstanceId} + </delete> + </mapper> diff --git a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ProcessInstanceMapper.xml b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ProcessInstanceMapper.xml index f4c0623b70..7fda26c0ad 100644 --- a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ProcessInstanceMapper.xml +++ b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ProcessInstanceMapper.xml @@ -234,6 +234,12 @@ </foreach> order by id asc </select> + <select id="queryByStatus" resultType="org.apache.dolphinscheduler.dao.entity.ProcessInstance"> + select + <include refid="baseSql"/> + from t_ds_process_instance + where state = #{state} + </select> <select id="queryByProcessDefineCodeAndProcessDefinitionVersionAndStatusAndNextId" resultType="org.apache.dolphinscheduler.dao.entity.ProcessInstance"> select <include refid="baseSql"/> diff --git a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/TaskInstanceMapper.xml b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/TaskInstanceMapper.xml index 7c63a84e2c..87a4a7c0ab 100644 --- a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/TaskInstanceMapper.xml +++ b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/TaskInstanceMapper.xml @@ -42,9 +42,9 @@ <select id="queryTaskByProcessIdAndState" resultType="java.lang.Integer"> select id from t_ds_task_instance - WHERE process_instance_id = #{processInstanceId} - and state = #{state} - and flag = 1 + WHERE process_instance_id = #{processInstanceId} + and state = #{state} + and flag = 1 </select> <select id="findValidTaskListByProcessId" resultType="org.apache.dolphinscheduler.dao.entity.TaskInstance"> select @@ -54,6 +54,15 @@ and flag = #{flag} order by start_time desc </select> + <select id="findValidTaskListByProcessIdAndTaskStatus" + resultType="org.apache.dolphinscheduler.dao.entity.TaskInstance"> + select + <include refid="baseSql"/> + from t_ds_task_instance + WHERE process_instance_id = #{processInstanceId} + and state = #{status} + and flag = #{flag} + </select> <select id="queryByHostAndStatus" resultType="org.apache.dolphinscheduler.dao.entity.TaskInstance"> select <include refid="baseSql"/> diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/coronation/RefreshCoronationMetadataProcessor.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/coronation/RefreshCoronationMetadataProcessor.java index 1a88860490..2faeee6f5a 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/coronation/RefreshCoronationMetadataProcessor.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/coronation/RefreshCoronationMetadataProcessor.java @@ -18,7 +18,7 @@ public class RefreshCoronationMetadataProcessor implements NettyRequestProcessor @Override public void process(Channel channel, Command command) { - if (command.getType() != CommandType.REFRESH_ISOLATION_METADATA_REQUEST) { + if (command.getType() != CommandType.REFRESH_CORONATION_METADATA_REQUEST) { throw new IllegalArgumentException(String.format("The current rpc command : %s is invalidated", command)); } coronationMetadataManager.refreshCoronationTaskMetadata(); diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/rpc/MasterRPCClient.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/rpc/MasterRPCClient.java index 3522ac4fec..f56aef464a 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/rpc/MasterRPCClient.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/rpc/MasterRPCClient.java @@ -33,4 +33,8 @@ public class MasterRPCClient { client.sendSync(host, rpcCommand, timeoutMills); } + public void sendCommand(@NonNull Host host, @NonNull Command rpcCommand) throws RemotingException { + client.send(host, rpcCommand); + } + } diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/rpc/MasterRPCServer.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/rpc/MasterRPCServer.java index 4644611a7f..3aba5c6bde 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/rpc/MasterRPCServer.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/rpc/MasterRPCServer.java @@ -74,6 +74,7 @@ public class MasterRPCServer implements AutoCloseable { @Autowired private WorkflowExecutingDataRequestProcessor workflowExecutingDataRequestProcessor; + @Autowired private RefreshCoronationMetadataProcessor refreshCoronationMetadataProcessor; @Autowired diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java index b284c80ef2..ebe2000626 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java @@ -1655,6 +1655,15 @@ public class WorkflowExecuteRunnable implements Callable<WorkflowSubmitStatue> { // No task need to submit, and exist isolation task, the workflow instance need to be PAUSE_BY_ISOLATION return ExecutionStatus.PAUSE_BY_ISOLATION; } + if (coronationMetadataManager.isInCoronationMode()) { + Optional<TaskInstance> pauseByCoronationTaskInstance = taskInstanceMap.values().stream() + .filter(taskInstance -> taskInstance.getState().typeIsPauseByCoronation()) + .findAny(); + if (pauseByCoronationTaskInstance.isPresent()) { + return ExecutionStatus.PAUSE_BY_CORONATION; + } + } + // if the waiting queue is empty and the status is in progress, then success return ExecutionStatus.SUCCESS; } @@ -1999,7 +2008,11 @@ public class WorkflowExecuteRunnable implements Callable<WorkflowSubmitStatue> { coronattedTaskCodeToTimesMap.getOrDefault(taskCode, 0) + 1); Integer parentNodeInstanceId = validTaskMap.get(parentNodeCode); if (parentNodeInstanceId != null) { - TaskInstance taskInstance = activeTaskProcessorMaps.get(parentNodeInstanceId).taskInstance(); + ITaskProcessor iTaskProcessor = activeTaskProcessorMaps.get(parentNodeInstanceId); + if (iTaskProcessor == null) { + continue; + } + TaskInstance taskInstance = iTaskProcessor.taskInstance(); if (taskInstance.getState().typeIsPauseByCoronation()) { // resubmit the task to standbylist, this task will be resubmit again. taskInstance.setState(ExecutionStatus.SUBMITTED_SUCCESS); diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/service/CoronationMetadataManager.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/service/CoronationMetadataManager.java index 68aba94bda..d698c3f20b 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/service/CoronationMetadataManager.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/service/CoronationMetadataManager.java @@ -94,10 +94,10 @@ public class CoronationMetadataManager { if (coronationTaskInMemory.isEmpty()) { if (coronationMode == CoronationMode.IN_CORONATION) { log.info("There is not coronation tasks, will begin to close coronation mode..."); - closeCoronation(); coronationMode = CoronationMode.NOT_IN_CORONATION; log.info("Close coronation mode success..."); } + insertRecoveryCoronationCommandIfNeeded(); } else { addCoronationTasks(addCoronationTasks); cancelCoronationTasks(deleteCoronationTasks); @@ -107,7 +107,7 @@ public class CoronationMetadataManager { } } stopWatch.stop(); - log.info("Refresh coronation task from DB finished, cost: {}", stopWatch.getTime()); + log.info("Refresh coronation task from DB finished, cost: {} ms", stopWatch.getTime()); } public boolean isCoronationTask(int workflowInstanceId, long taskCode) { @@ -135,7 +135,7 @@ public class CoronationMetadataManager { RefreshCoronationMetadataRequest request = new RefreshCoronationMetadataRequest(); for (Server master : masters) { try { - masterRPCClient.sendSyncCommand(new Host(master.getHost(), master.getPort()), + masterRPCClient.sendCommand(new Host(master.getHost(), master.getPort()), request.convert2Command()); } catch (Exception e) { log.error( @@ -151,19 +151,19 @@ public class CoronationMetadataManager { } } - private void closeCoronation() { + private void insertRecoveryCoronationCommandIfNeeded() { // The current server is in coronation mode, need to close coronation. // Need to acquire a lock to guarantee there is only one master recovery the pause_by_coronation workflow + // block to acquire the master lock try { - // block to acquire the master lock if (!registryClient.getLock(NodeType.MASTER.getRegistryPath())) { - log.error("Cannot acquire the master lock: {} to close coronation", NodeType.MASTER.getRegistryPath()); + log.warn("Cannot acquire the master lock: {} to close coronation", NodeType.MASTER.getRegistryPath()); return; } // find the all instance that need to be recovery // create recovery command List<Command> needToInsertCommand = - processInstanceDao.queryProcessInstanceByStatus(ExecutionStatus.PAUSE_BY_ISOLATION) + processInstanceDao.queryProcessInstanceByStatus(ExecutionStatus.PAUSE_BY_CORONATION) .stream() .filter(processInstance -> { List<Command> commands = commandDao diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java index 43000df020..38d6535f1d 100644 --- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java +++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java @@ -1633,7 +1633,8 @@ public class ProcessServiceImpl implements ProcessService { || state == ExecutionStatus.DELAY_EXECUTION || state == ExecutionStatus.KILL || state == ExecutionStatus.DISPATCH - || state == ExecutionStatus.PAUSE_BY_ISOLATION) { + || state == ExecutionStatus.PAUSE_BY_ISOLATION + || state == ExecutionStatus.PAUSE_BY_CORONATION) { return state; } // return pasue /stop if process instance state is ready pause / stop
