ruanwenjun commented on code in PR #16327: URL: https://github.com/apache/dolphinscheduler/pull/16327#discussion_r1730696598
########## dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/task/statemachine/AbstractTaskStateAction.java: ########## @@ -0,0 +1,310 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.server.master.engine.task.statemachine; + +import static com.google.common.base.Preconditions.checkNotNull; +import static org.apache.dolphinscheduler.plugin.task.api.enums.TaskExecutionStatus.DISPATCH; + +import org.apache.dolphinscheduler.dao.entity.ProcessInstance; +import org.apache.dolphinscheduler.dao.entity.TaskInstance; +import org.apache.dolphinscheduler.dao.repository.TaskInstanceDao; +import org.apache.dolphinscheduler.extract.base.client.SingletonJdkDynamicRpcClientProxyFactory; +import org.apache.dolphinscheduler.extract.worker.ITaskInstanceOperator; +import org.apache.dolphinscheduler.extract.worker.transportor.TakeOverTaskRequest; +import org.apache.dolphinscheduler.extract.worker.transportor.TakeOverTaskResponse; +import org.apache.dolphinscheduler.plugin.task.api.enums.TaskExecutionStatus; +import org.apache.dolphinscheduler.plugin.task.api.utils.TaskTypeUtils; +import org.apache.dolphinscheduler.plugin.task.api.utils.VarPoolUtils; +import org.apache.dolphinscheduler.server.master.config.MasterConfig; +import org.apache.dolphinscheduler.server.master.engine.AbstractLifecycleEvent; +import org.apache.dolphinscheduler.server.master.engine.IWorkflowRepository; +import org.apache.dolphinscheduler.server.master.engine.TaskGroupCoordinator; +import org.apache.dolphinscheduler.server.master.engine.graph.IWorkflowExecutionGraph; +import org.apache.dolphinscheduler.server.master.engine.task.client.ITaskExecutorClient; +import org.apache.dolphinscheduler.server.master.engine.task.lifecycle.event.TaskDispatchLifecycleEvent; +import org.apache.dolphinscheduler.server.master.engine.task.lifecycle.event.TaskDispatchedLifecycleEvent; +import org.apache.dolphinscheduler.server.master.engine.task.lifecycle.event.TaskFailedLifecycleEvent; +import org.apache.dolphinscheduler.server.master.engine.task.lifecycle.event.TaskKilledLifecycleEvent; +import org.apache.dolphinscheduler.server.master.engine.task.lifecycle.event.TaskPausedLifecycleEvent; +import org.apache.dolphinscheduler.server.master.engine.task.lifecycle.event.TaskRetryLifecycleEvent; +import org.apache.dolphinscheduler.server.master.engine.task.lifecycle.event.TaskRunningLifecycleEvent; +import org.apache.dolphinscheduler.server.master.engine.task.lifecycle.event.TaskSuccessLifecycleEvent; +import org.apache.dolphinscheduler.server.master.engine.task.runnable.ITaskExecutionRunnable; +import org.apache.dolphinscheduler.server.master.engine.task.runnable.TaskInstanceFactories; +import org.apache.dolphinscheduler.server.master.engine.workflow.lifecycle.event.WorkflowTopologyLogicalTransitionWithTaskFinishLifecycleEvent; +import org.apache.dolphinscheduler.server.master.engine.workflow.runnable.IWorkflowExecutionRunnable; + +import org.apache.commons.lang3.StringUtils; + +import lombok.extern.slf4j.Slf4j; + +import org.springframework.beans.factory.annotation.Autowired; + +import com.google.common.collect.Lists; + +@Slf4j +public abstract class AbstractTaskStateAction implements ITaskStateAction { + + @Autowired + protected TaskGroupCoordinator taskGroupCoordinator; + + @Autowired + protected TaskInstanceDao taskInstanceDao; + + @Autowired + protected TaskInstanceFactories taskInstanceFactories; + + @Autowired + protected IWorkflowRepository workflowRepository; + + @Autowired + private MasterConfig masterConfig; + + @Autowired + protected ITaskExecutorClient taskExecutorClient; + + /** + * Whether the task needs to acquire the task group slot. + */ + protected boolean isTaskNeedAcquireTaskGroupSlot(final ITaskExecutionRunnable taskExecutionRunnable) { + final TaskInstance taskInstance = taskExecutionRunnable.getTaskInstance(); + return taskGroupCoordinator.needAcquireTaskGroupSlot(taskInstance); + } + + /** + * Acquire the resources needed by the task instance. + * <p> If the task instance is using a task group, the task group slot will be acquired. + */ + protected void acquireTaskGroupSlot(final ITaskExecutionRunnable taskExecutionRunnable) { + final TaskInstance taskInstance = taskExecutionRunnable.getTaskInstance(); + taskGroupCoordinator.acquireTaskGroupSlot(taskInstance); + } + + /** + * Release the resources needed by the task instance. + */ + protected void releaseTaskInstanceResourcesIfNeeded(final ITaskExecutionRunnable taskExecutionRunnable) { + final TaskInstance taskInstance = taskExecutionRunnable.getTaskInstance(); + if (taskGroupCoordinator.needToReleaseTaskGroupSlot(taskInstance)) { + taskGroupCoordinator.releaseTaskGroupSlot(taskInstance); + } + } + + @Override + public void dispatchedEventAction(final IWorkflowExecutionRunnable workflowExecutionRunnable, + final ITaskExecutionRunnable taskExecutionRunnable, + final TaskDispatchedLifecycleEvent taskDispatchedEvent) { + final TaskInstance taskInstance = taskExecutionRunnable.getTaskInstance(); + taskInstance.setState(DISPATCH); + taskInstance.setHost(taskDispatchedEvent.getExecutorHost()); + taskInstanceDao.updateById(taskInstance); + } + + protected void persistentTaskInstanceStartedEventToDB(final ITaskExecutionRunnable taskExecutionRunnable, + final TaskRunningLifecycleEvent taskRunningEvent) { + final TaskInstance taskInstance = taskExecutionRunnable.getTaskInstance(); + taskInstance.setState(TaskExecutionStatus.RUNNING_EXECUTION); + taskInstance.setStartTime(taskRunningEvent.getStartTime()); + taskInstance.setLogPath(taskRunningEvent.getLogPath()); + if (StringUtils.isNotEmpty(taskRunningEvent.getRuntimeContext())) { + taskInstance.setAppLink(taskRunningEvent.getRuntimeContext()); + } + taskInstanceDao.updateById(taskInstance); + } + + @Override + public void pausedEventAction(final IWorkflowExecutionRunnable workflowExecutionRunnable, + final ITaskExecutionRunnable taskExecutionRunnable, + final TaskPausedLifecycleEvent taskPausedEvent) { + releaseTaskInstanceResourcesIfNeeded(taskExecutionRunnable); + persistentTaskInstancePausedEventToDB(taskExecutionRunnable, taskPausedEvent); + taskExecutionRunnable.getWorkflowExecutionGraph().markTaskExecutionRunnableChainPause(taskExecutionRunnable); + publishWorkflowInstanceTopologyLogicalTransitionEvent(taskExecutionRunnable); + } + + private void persistentTaskInstancePausedEventToDB(final ITaskExecutionRunnable taskExecutionRunnable, + final TaskPausedLifecycleEvent taskPausedEvent) { + final TaskInstance taskInstance = taskExecutionRunnable.getTaskInstance(); + taskInstance.setState(TaskExecutionStatus.PAUSE); + taskInstanceDao.updateById(taskInstance); + } + + @Override + public void killedEventAction(final IWorkflowExecutionRunnable workflowExecutionRunnable, + final ITaskExecutionRunnable taskExecutionRunnable, + final TaskKilledLifecycleEvent taskInstanceKillEvent) { + releaseTaskInstanceResourcesIfNeeded(taskExecutionRunnable); + persistentTaskInstanceKilledEventToDB(taskExecutionRunnable, taskInstanceKillEvent); + taskExecutionRunnable.getWorkflowExecutionGraph().markTaskExecutionRunnableChainKill(taskExecutionRunnable); + publishWorkflowInstanceTopologyLogicalTransitionEvent(taskExecutionRunnable); + } + + private void persistentTaskInstanceKilledEventToDB(final ITaskExecutionRunnable taskExecutionRunnable, + final TaskKilledLifecycleEvent taskKilledEvent) { + final TaskInstance taskInstance = taskExecutionRunnable.getTaskInstance(); + taskInstance.setState(TaskExecutionStatus.KILL); + taskInstance.setEndTime(taskKilledEvent.getEndTime()); + taskInstanceDao.updateById(taskInstance); + + } + + @Override + public void failedEventAction(final IWorkflowExecutionRunnable workflowExecutionRunnable, + final ITaskExecutionRunnable taskExecutionRunnable, + final TaskFailedLifecycleEvent taskFailedEvent) { + releaseTaskInstanceResourcesIfNeeded(taskExecutionRunnable); + persistentTaskInstanceFailedEventToDB(taskExecutionRunnable, taskFailedEvent); + + if (taskExecutionRunnable.isTaskInstanceNeedRetry()) { + taskExecutionRunnable.getWorkflowEventBus().publish(TaskRetryLifecycleEvent.of(taskExecutionRunnable)); + return; + } + // If all successors are condition tasks, then the task will not be marked as failure. + // And the DAG will continue to execute. + final IWorkflowExecutionGraph workflowExecutionGraph = taskExecutionRunnable.getWorkflowExecutionGraph(); + if (workflowExecutionGraph.isAllSuccessorsAreConditionTask(taskExecutionRunnable)) { + publishWorkflowInstanceTopologyLogicalTransitionEvent(taskExecutionRunnable); + return; + } + taskExecutionRunnable.getWorkflowExecutionGraph().markTaskExecutionRunnableChainFailure(taskExecutionRunnable); + publishWorkflowInstanceTopologyLogicalTransitionEvent(taskExecutionRunnable); + } + + private void persistentTaskInstanceFailedEventToDB(final ITaskExecutionRunnable taskExecutionRunnable, + final TaskFailedLifecycleEvent taskFailedEvent) { + final TaskInstance taskInstance = taskExecutionRunnable.getTaskInstance(); + taskInstance.setState(TaskExecutionStatus.FAILURE); + taskInstance.setEndTime(taskFailedEvent.getEndTime()); + taskInstanceDao.updateById(taskInstance); + } + + @Override + public void succeedEventAction(final IWorkflowExecutionRunnable workflowExecutionRunnable, + final ITaskExecutionRunnable taskExecutionRunnable, + final TaskSuccessLifecycleEvent taskSuccessEvent) { + releaseTaskInstanceResourcesIfNeeded(taskExecutionRunnable); + persistentTaskInstanceSuccessEventToDB(taskExecutionRunnable, taskSuccessEvent); + mergeTaskVarPoolToWorkflow(workflowExecutionRunnable, taskExecutionRunnable); + publishWorkflowInstanceTopologyLogicalTransitionEvent(taskExecutionRunnable); + } + + protected void mergeTaskVarPoolToWorkflow(final IWorkflowExecutionRunnable workflowExecutionRunnable, + final ITaskExecutionRunnable taskExecutionRunnable) { + final TaskInstance taskInstance = taskExecutionRunnable.getTaskInstance(); + final ProcessInstance workflowInstance = workflowExecutionRunnable.getWorkflowInstance(); + final String finalVarPool = VarPoolUtils.mergeVarPoolJsonString( + Lists.newArrayList(workflowInstance.getVarPool(), taskInstance.getVarPool())); + workflowInstance.setVarPool(finalVarPool); + } + + protected void persistentTaskInstanceSuccessEventToDB(final ITaskExecutionRunnable taskExecutionRunnable, + final TaskSuccessLifecycleEvent taskSuccessEvent) { + final TaskInstance taskInstance = taskExecutionRunnable.getTaskInstance(); + taskInstance.setState(TaskExecutionStatus.SUCCESS); + taskInstance.setEndTime(taskSuccessEvent.getEndTime()); + taskInstance.setVarPool(taskSuccessEvent.getVarPool()); + taskInstanceDao.updateById(taskInstance); + } + + /** + * Failover task. + * <p> Will try to take over the task from remote executor, if take-over success, the task has no effect. + * <p> If the take-over fails, will generate a failover task-instance and mark the task instance status to {@link TaskExecutionStatus#NEED_FAULT_TOLERANCE}. + */ + protected void failoverTask(final ITaskExecutionRunnable taskExecutionRunnable) { + if (!taskExecutionRunnable.isTaskInstanceInitialized()) { + throw new IllegalStateException("The task instance hasn't been initialized, cannot take over the task"); + } + if (takeOverTask(taskExecutionRunnable)) { + log.info("Failover task success, the task {} has been taken-over", taskExecutionRunnable.getName()); + return; + } + taskExecutionRunnable.initializeFailoverTaskInstance(); + tryToDispatchTask(taskExecutionRunnable); + log.info("Failover task success, the task {} has been resubmitted.", taskExecutionRunnable.getName()); + } + + private boolean takeOverTask(final ITaskExecutionRunnable taskExecutionRunnable) { + if (!taskExecutionRunnable.isTaskInstanceInitialized()) { + log.debug("Task: {} doesn't initialized yet, cannot take over the task", taskExecutionRunnable.getName()); + return false; + } + if (TaskTypeUtils.isLogicTask(taskExecutionRunnable.getTaskInstance().getTaskType())) { + return false; + } + if (StringUtils.isEmpty(taskExecutionRunnable.getTaskInstance().getHost())) { + log.debug("Task: {} host is empty, cannot take over the task", taskExecutionRunnable.getName()); + return false; + } + try { + final TakeOverTaskRequest takeOverTaskRequest = TakeOverTaskRequest.builder() + .taskInstanceId(taskExecutionRunnable.getTaskInstance().getId()) + .workflowHost(masterConfig.getMasterAddress()) + .build(); + final TakeOverTaskResponse takeOverTaskResponse = SingletonJdkDynamicRpcClientProxyFactory + .withService(ITaskInstanceOperator.class) + .withHost(taskExecutionRunnable.getTaskInstance().getHost()) + .takeOverTask(takeOverTaskRequest); + return takeOverTaskResponse.isSuccess(); + } catch (Exception ex) { + log.info("Take over task: {} failed", taskExecutionRunnable.getName(), ex); Review Comment: I changed to warn since if the worker is not alive, there will be exceptions here. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
