This is an automated email from the ASF dual-hosted git repository. wenjun pushed a commit to branch dev_wenjun_refactorMaster in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git
commit aed4e80a268727382ad6828ee7ac64a62608cf03 Author: Wenjun Ruan <[email protected]> AuthorDate: Tue Mar 5 15:48:00 2024 +0800 Refactor Master --- .../server/master/dag/DAGEngine.java | 87 ++++++++++++++++++++ .../server/master/dag/IDAGEngine.java | 42 ++++++++++ .../server/master/dag/IDAGEngineBuilder.java | 7 ++ .../master/dag/IEventfulExecutionRunnable.java | 11 +++ .../server/master/dag/IWorkflowDAG.java | 23 ++++++ .../server/master/dag/IWorkflowDAGBuilder.java | 7 ++ .../server/master/dag/IWorkflowEngine.java | 49 ++++++++++++ .../server/master/dag/IWorkflowExecutionDAG.java | 16 ++++ .../master/dag/IWorkflowExecutionDAGBuilder.java | 6 ++ .../master/dag/IWorkflowExecutionRunnable.java | 68 ++++++++++++++++ .../dag/IWorkflowExecutionRunnableBuilder.java | 9 +++ .../dag/TaskExecutionRunnableRepository.java | 25 ++++++ .../server/master/dag/TaskIdentify.java | 5 ++ .../master/dag/TaskTriggerConditionChecker.java | 7 ++ .../server/master/dag/WorkflowEngine.java | 75 +++++++++++++++++ .../master/dag/WorkflowExecuteRunnableFactory.java | 61 ++++++++++++++ .../dag/WorkflowExecuteRunnableRepository.java | 28 +++++++ .../master/dag/WorkflowExecutionContext.java | 21 +++++ .../master/dag/WorkflowExecutionRunnable.java | 64 +++++++++++++++ .../server/master/dag/WorkflowIdentify.java | 18 +++++ .../server/master/events/EventDispatcher.java | 53 ++++++++++++ .../server/master/events/EventEngine.java | 93 ++++++++++++++++++++++ .../server/master/events/EventFirer.java | 78 ++++++++++++++++++ .../server/master/events/EventOperatorManager.java | 19 +++++ .../server/master/events/EventRepository.java | 36 +++++++++ .../server/master/events/IAsyncEvent.java | 7 ++ .../server/master/events/IEvent.java | 5 ++ .../server/master/events/IEventDispatcher.java | 15 ++++ .../server/master/events/IEventFirer.java | 19 +++++ .../server/master/events/IEventOperator.java | 15 ++++ .../master/events/IEventOperatorManager.java | 16 ++++ .../server/master/events/IEventRepository.java | 16 ++++ .../server/master/events/ISyncEvent.java | 4 + .../server/master/events/ITaskEvent.java | 9 +++ .../server/master/events/ITaskEventOperator.java | 5 ++ .../server/master/events/IWorkflowEvent.java | 12 +++ .../master/events/IWorkflowEventOperator.java | 6 ++ .../master/events/TaskLogSendToRemoteEvent.java | 19 +++++ .../events/TaskLogSendToRemoteEventOperator.java | 21 +++++ .../server/master/events/TaskOperationEvent.java | 18 +++++ .../master/events/TaskOperationEventOperator.java | 48 +++++++++++ .../server/master/events/TaskOperationType.java | 8 ++ .../server/master/events/TaskSuccessEvent.java | 16 ++++ .../master/events/TaskSuccessEventOperator.java | 30 +++++++ .../server/master/events/WorkflowFailedEvent.java | 18 +++++ .../master/events/WorkflowFailedEventOperator.java | 43 ++++++++++ .../master/events/WorkflowFinalizeEvent.java | 15 ++++ .../events/WorkflowFinalizeEventOperator.java | 26 ++++++ .../server/master/events/WorkflowFinishEvent.java | 20 +++++ .../master/events/WorkflowOperationEvent.java | 24 ++++++ .../events/WorkflowOperationEventOperator.java | 68 ++++++++++++++++ .../master/events/WorkflowOperationType.java | 10 +++ .../server/master/events/WorkflowTimeoutEvent.java | 16 ++++ .../events/WorkflowTimeoutEventOperator.java | 43 ++++++++++ .../events/WorkflowTriggerNextTaskEvent.java | 21 +++++ .../WorkflowTriggerNextTaskEventOperator.java | 27 +++++++ .../master/events/WorkflowTriggeredEvent.java | 16 ++++ .../events/WorkflowTriggeredEventOperator.java | 44 ++++++++++ .../TaskExecuteRunnableNotFoundException.java | 12 +++ .../TaskExecutionContextCreateException.java | 26 ------ .../WorkflowExecuteRunnableNotFoundException.java | 13 +++ 61 files changed, 1613 insertions(+), 26 deletions(-) diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dag/DAGEngine.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dag/DAGEngine.java new file mode 100644 index 0000000000..4748fb5084 --- /dev/null +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dag/DAGEngine.java @@ -0,0 +1,87 @@ +package org.apache.dolphinscheduler.server.master.dag; + +import org.apache.dolphinscheduler.server.master.events.EventRepository; +import org.apache.dolphinscheduler.server.master.events.TaskOperationEvent; +import org.apache.dolphinscheduler.server.master.events.TaskOperationType; +import org.apache.dolphinscheduler.server.master.runner.TaskExecuteRunnable; +import org.apache.dolphinscheduler.server.master.runner.TaskExecuteRunnableFactory; + +import java.util.List; + +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.SneakyThrows; +import lombok.extern.slf4j.Slf4j; + +@Slf4j +@Builder +@AllArgsConstructor +public class DAGEngine implements IDAGEngine { + + private final IWorkflowExecutionDAG workflowExecutionDAG; + + private final List<TaskTriggerConditionChecker> taskTriggerConditionCheckers; + + private final TaskExecuteRunnableFactory<TaskExecuteRunnable> taskExecuteRunnableFactory; + + private final EventRepository eventRepository; + + @Override + public void triggerNextTasks(String parentTaskNodeName) { + workflowExecutionDAG.getWorkflowDAG().getPostNodeNames(parentTaskNodeName).forEach(this::triggerTask); + } + + @Override + @SneakyThrows + public void triggerTask(String taskName) { + for (TaskTriggerConditionChecker taskTriggerConditionChecker : taskTriggerConditionCheckers) { + if (!taskTriggerConditionChecker.taskCanTrigger(taskName)) { + return; + } + } + // todo: create Task ExecutionRunnable + TaskExecuteRunnable taskExecuteRunnable = taskExecuteRunnableFactory.createTaskExecuteRunnable(null); + TaskOperationEvent taskOperationEvent = + TaskOperationEvent.builder().workflowInstanceId(taskExecuteRunnable.getWorkflowInstance().getId()) + .taskInstanceId(taskExecuteRunnable.getTaskInstance().getId()) + .taskOperationType(TaskOperationType.DISPATCH).build(); + eventRepository.storeEventToTail(taskOperationEvent); + workflowExecutionDAG.markTaskSubmitted(taskExecuteRunnable); + + } + + @Override + public void pauseTask(Integer taskInstanceId) { + TaskExecuteRunnable taskExecutionRunnable = workflowExecutionDAG.getTaskExecutionRunnableById(taskInstanceId); + if (taskExecutionRunnable == null) { + log.error("Cannot find the ITaskExecutionRunnable for: {}", taskInstanceId); + return; + } + + TaskOperationEvent taskOperationEvent = TaskOperationEvent.builder() + .workflowInstanceId(taskExecutionRunnable.getTaskInstance().getProcessInstanceId()) + .taskInstanceId(taskExecutionRunnable.getTaskInstance().getId()) + .taskOperationType(TaskOperationType.PAUSE).build(); + eventRepository.storeEventToTail(taskOperationEvent); + } + + @Override + public void killTask(Integer taskInstanceId) { + TaskExecuteRunnable taskExecutionRunnable = workflowExecutionDAG.getTaskExecutionRunnableById(taskInstanceId); + if (taskExecutionRunnable == null) { + log.error("Cannot find the ITaskExecutionRunnable for: {}", taskInstanceId); + return; + } + + TaskOperationEvent taskOperationEvent = + TaskOperationEvent.builder().workflowInstanceId(taskExecutionRunnable.getWorkflowInstance().getId()) + .taskInstanceId(taskExecutionRunnable.getTaskInstance().getId()) + .taskOperationType(TaskOperationType.KILL).build(); + eventRepository.storeEventToTail(taskOperationEvent); + } + + @Override + public IWorkflowExecutionDAG getWorkflowExecutionDAG() { + return workflowExecutionDAG; + } +} diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dag/IDAGEngine.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dag/IDAGEngine.java new file mode 100644 index 0000000000..5e3b651872 --- /dev/null +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dag/IDAGEngine.java @@ -0,0 +1,42 @@ +package org.apache.dolphinscheduler.server.master.dag; + +/** + * The IDAGEngine is responsible for triggering, killing, pausing, and finalizing task in {@link IWorkflowExecutionDAG}. + * <p>All DAG operation should directly use the method in IDAGEngine, new {@link IWorkflowExecutionDAG} should be triggered by new IDAGEngine. + */ +public interface IDAGEngine { + + /** + * Trigger the tasks which are post of the given task. + * <P> If there are no task after the given taskNode, will try to finish the WorkflowExecutionRunnable. + * <p> If the + * + * @param parentTaskNodeName the parent task name + */ + void triggerNextTasks(String parentTaskNodeName); + + /** + * Trigger the given task + * + * @param taskName task name + */ + void triggerTask(String taskName); + + /** + * Pause the given task. + */ + void pauseTask(Integer taskId); + + /** + * Kill the given task. + */ + void killTask(Integer taskId); + + /** + * Get {@link IWorkflowExecutionDAG} belong to the Engine. + * + * @return workflow execution DAG. + */ + IWorkflowExecutionDAG getWorkflowExecutionDAG(); + +} diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dag/IDAGEngineBuilder.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dag/IDAGEngineBuilder.java new file mode 100644 index 0000000000..8e713d1274 --- /dev/null +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dag/IDAGEngineBuilder.java @@ -0,0 +1,7 @@ +package org.apache.dolphinscheduler.server.master.dag; + +public interface IDAGEngineBuilder { + + IDAGEngine buildDAGEngine(IWorkflowExecutionDAG workflowExecutionDAG); + +} diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dag/IEventfulExecutionRunnable.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dag/IEventfulExecutionRunnable.java new file mode 100644 index 0000000000..dcc87e050d --- /dev/null +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dag/IEventfulExecutionRunnable.java @@ -0,0 +1,11 @@ +package org.apache.dolphinscheduler.server.master.dag; + +import org.apache.dolphinscheduler.server.master.events.IEvent; +import org.apache.dolphinscheduler.server.master.events.IEventRepository; + +public interface IEventfulExecutionRunnable { + + default IEventRepository<IEvent> getEventRepository() { + return null; + } +} diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dag/IWorkflowDAG.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dag/IWorkflowDAG.java new file mode 100644 index 0000000000..08a69e41ab --- /dev/null +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dag/IWorkflowDAG.java @@ -0,0 +1,23 @@ +package org.apache.dolphinscheduler.server.master.dag; + +import java.util.List; + +public interface IWorkflowDAG { + + /** + * Return the post task name of given parentTaskName. + * + * @param parentTaskName parent task name, can be null. + * @return post task name list, sort by priority. + */ + List<String> getPostNodeNames(String parentTaskName); + + /** + * Get the pre task name of given taskName + * + * @param taskName task name can be null. + * @return parent task name list. + */ + List<String> getParentNodeNames(String taskName); + +} diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dag/IWorkflowDAGBuilder.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dag/IWorkflowDAGBuilder.java new file mode 100644 index 0000000000..3b95fd07b1 --- /dev/null +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dag/IWorkflowDAGBuilder.java @@ -0,0 +1,7 @@ +package org.apache.dolphinscheduler.server.master.dag; + +public interface IWorkflowDAGBuilder { + + IWorkflowDAG buildWorkflowDAG(WorkflowIdentify workflowIdentify); + +} diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dag/IWorkflowEngine.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dag/IWorkflowEngine.java new file mode 100644 index 0000000000..b7c430cfe2 --- /dev/null +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dag/IWorkflowEngine.java @@ -0,0 +1,49 @@ +package org.apache.dolphinscheduler.server.master.dag; + +import org.apache.dolphinscheduler.server.master.exception.WorkflowExecuteRunnableNotFoundException; + +/** + * The WorkflowEngine is responsible for starting, stopping, pausing, and finalizing workflows. + */ +public interface IWorkflowEngine { + + /** + * Start the workflow engine. + */ + void start(); + + /** + * Trigger a workflow to start. + * + * @param workflowExecuteRunnable the workflow to start + */ + void triggerWorkflow(IWorkflowExecutionRunnable workflowExecuteRunnable); + + /** + * Pause a workflow instance. + * + * @param workflowInstanceId the ID of the workflow to pause + * @throws WorkflowExecuteRunnableNotFoundException if the workflow is not found + */ + void pauseWorkflow(Integer workflowInstanceId); + + /** + * Kill a workflow instance. + * + * @param workflowInstanceId the ID of the workflow to stop + * @throws WorkflowExecuteRunnableNotFoundException if the workflow is not found + */ + void killWorkflow(Integer workflowInstanceId); + + /** + * Finalize a workflow instance. Once a workflow has been finalized, then it cannot receive new operation, and will be removed from memory. + * + * @param workflowInstanceId the ID of the workflow to finalize + */ + void finalizeWorkflow(Integer workflowInstanceId); + + /** + * Stop the workflow engine. + */ + void stop(); +} diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dag/IWorkflowExecutionDAG.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dag/IWorkflowExecutionDAG.java new file mode 100644 index 0000000000..ed94fcd0de --- /dev/null +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dag/IWorkflowExecutionDAG.java @@ -0,0 +1,16 @@ +package org.apache.dolphinscheduler.server.master.dag; + +import org.apache.dolphinscheduler.server.master.runner.TaskExecuteRunnable; + +import java.util.List; + +public interface IWorkflowExecutionDAG { + + IWorkflowDAG getWorkflowDAG(); + + void markTaskSubmitted(TaskExecuteRunnable taskExecutionRunnable); + + TaskExecuteRunnable getTaskExecutionRunnableById(Integer taskInstanceId); + + List<TaskExecuteRunnable> getActiveTaskExecutionRunnables(); +} diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dag/IWorkflowExecutionDAGBuilder.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dag/IWorkflowExecutionDAGBuilder.java new file mode 100644 index 0000000000..a13865aa54 --- /dev/null +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dag/IWorkflowExecutionDAGBuilder.java @@ -0,0 +1,6 @@ +package org.apache.dolphinscheduler.server.master.dag; + +public interface IWorkflowExecutionDAGBuilder { + + IWorkflowExecutionDAG buildWorkflowExecutionDAG(IWorkflowDAG workflowDAG); +} diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dag/IWorkflowExecutionRunnable.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dag/IWorkflowExecutionRunnable.java new file mode 100644 index 0000000000..bff326c12c --- /dev/null +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dag/IWorkflowExecutionRunnable.java @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.server.master.dag; + +import org.apache.dolphinscheduler.dao.entity.ProcessInstance; + +/** + * The IWorkflowExecuteRunnable represent a running workflow instance, it is responsible for operate the workflow instance. e.g. start, kill, pause, timeout. + */ +public interface IWorkflowExecutionRunnable extends IEventfulExecutionRunnable { + + /** + * Start the workflow instance. + */ + void start(); + + /** + * Kill the workflow instance. + */ + void kill(); + + /** + * Pause the workflow instance. + */ + void pause(); + + /** + * Get the workflow execution context. + * + * @return the workflow execution context + */ + WorkflowExecutionContext getWorkflowExecutionContext(); + + /** + * Get the {@link IDAGEngine} which used to execute the dag of the workflow instance. + * + * @return dag engine. + */ + IDAGEngine getDagEngine(); + + default Integer getWorkflowInstanceId() { + return getWorkflowExecutionContext().getWorkflowInstance().getId(); + } + + default String getWorkflowInstanceName() { + return getWorkflowExecutionContext().getWorkflowInstance().getName(); + } + + default ProcessInstance getWorkflowInstance() { + return getWorkflowExecutionContext().getWorkflowInstance(); + } + +} diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dag/IWorkflowExecutionRunnableBuilder.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dag/IWorkflowExecutionRunnableBuilder.java new file mode 100644 index 0000000000..85cee76474 --- /dev/null +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dag/IWorkflowExecutionRunnableBuilder.java @@ -0,0 +1,9 @@ +package org.apache.dolphinscheduler.server.master.dag; + +import org.apache.dolphinscheduler.server.master.runner.IWorkflowExecutionContext; + +public interface IWorkflowExecutionRunnableBuilder { + + IWorkflowExecutionRunnable buildWorkflowExecutionRunnable(IWorkflowExecutionContext workflowExecutionContext); + +} diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dag/TaskExecutionRunnableRepository.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dag/TaskExecutionRunnableRepository.java new file mode 100644 index 0000000000..7ebf008568 --- /dev/null +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dag/TaskExecutionRunnableRepository.java @@ -0,0 +1,25 @@ +package org.apache.dolphinscheduler.server.master.dag; + +import org.apache.dolphinscheduler.server.master.runner.TaskExecuteRunnable; + +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.stream.Collectors; + +public class TaskExecutionRunnableRepository { + + private Map<Integer, TaskExecuteRunnable> taskExecuteRunnableMap = new ConcurrentHashMap<>(); + + public TaskExecuteRunnable getTaskExecutionRunnableById(Integer taskInstanceId) { + return taskExecuteRunnableMap.get(taskInstanceId); + } + + public List<TaskExecuteRunnable> getActiveTaskExecutionRunnables() { + return taskExecuteRunnableMap.values().stream().filter(taskExecuteRunnable -> { + // filter the status is not finished + return !taskExecuteRunnable.getTaskInstance().getState().isFinished(); + }).collect(Collectors.toList()); + } + +} diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dag/TaskIdentify.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dag/TaskIdentify.java new file mode 100644 index 0000000000..e3e7420ab4 --- /dev/null +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dag/TaskIdentify.java @@ -0,0 +1,5 @@ +package org.apache.dolphinscheduler.server.master.dag; + +public class TaskIdentify { + +} diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dag/TaskTriggerConditionChecker.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dag/TaskTriggerConditionChecker.java new file mode 100644 index 0000000000..daf652e0b8 --- /dev/null +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dag/TaskTriggerConditionChecker.java @@ -0,0 +1,7 @@ +package org.apache.dolphinscheduler.server.master.dag; + +public interface TaskTriggerConditionChecker { + + boolean taskCanTrigger(String taskName); + +} diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dag/WorkflowEngine.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dag/WorkflowEngine.java new file mode 100644 index 0000000000..eb8bc0cfa1 --- /dev/null +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dag/WorkflowEngine.java @@ -0,0 +1,75 @@ +package org.apache.dolphinscheduler.server.master.dag; + +import org.apache.dolphinscheduler.dao.entity.ProcessInstance; +import org.apache.dolphinscheduler.server.master.events.WorkflowOperationEvent; +import org.apache.dolphinscheduler.server.master.events.WorkflowOperationType; +import org.apache.dolphinscheduler.server.master.exception.WorkflowExecuteRunnableNotFoundException; + +import lombok.extern.slf4j.Slf4j; + +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Component; + +@Slf4j +@Component +public class WorkflowEngine implements IWorkflowEngine { + + @Autowired + private WorkflowExecuteRunnableRepository workflowExecuteRunnableRepository; + + @Override + public void start() { + log.info("{} started", getClass().getName()); + } + + @Override + public void triggerWorkflow(IWorkflowExecutionRunnable workflowExecuteRunnable) { + ProcessInstance workflowInstance = workflowExecuteRunnable.getWorkflowExecutionContext().getWorkflowInstance(); + Integer workflowInstanceId = workflowInstance.getId(); + log.info("Triggering WorkflowExecutionRunnable: {}", workflowInstance.getName()); + workflowExecuteRunnableRepository.storeWorkflowExecutionRunnable(workflowExecuteRunnable); + workflowExecuteRunnable.getEventRepository() + .storeEventToTail(WorkflowOperationEvent.of(workflowInstanceId, WorkflowOperationType.TRIGGER)); + } + + @Override + public void pauseWorkflow(Integer workflowInstanceId) { + IWorkflowExecutionRunnable workflowExecuteRunnable = + workflowExecuteRunnableRepository.getWorkflowExecutionRunnableById(workflowInstanceId); + if (workflowExecuteRunnable == null) { + throw new WorkflowExecuteRunnableNotFoundException(workflowInstanceId); + } + log.info("Pausing WorkflowExecutionRunnable: {}", workflowExecuteRunnable.getWorkflowInstanceName()); + workflowExecuteRunnable.getEventRepository() + .storeEventToTail(WorkflowOperationEvent.of(workflowInstanceId, WorkflowOperationType.PAUSE)); + } + + @Override + public void killWorkflow(Integer workflowInstanceId) { + IWorkflowExecutionRunnable workflowExecuteRunnable = + workflowExecuteRunnableRepository.getWorkflowExecutionRunnableById(workflowInstanceId); + if (workflowExecuteRunnable == null) { + throw new WorkflowExecuteRunnableNotFoundException(workflowInstanceId); + } + log.info("Killing WorkflowExecutionRunnable: {}", workflowExecuteRunnable.getWorkflowInstanceName()); + workflowExecuteRunnable.getEventRepository() + .storeEventToTail(WorkflowOperationEvent.of(workflowInstanceId, WorkflowOperationType.KILL)); + } + + @Override + public void finalizeWorkflow(Integer workflowInstanceId) { + IWorkflowExecutionRunnable workflowExecutionRunnable = + workflowExecuteRunnableRepository.getWorkflowExecutionRunnableById(workflowInstanceId); + if (workflowExecutionRunnable == null) { + return; + } + log.info("Finalizing WorkflowExecutionRunnable: {}", workflowExecutionRunnable.getWorkflowInstanceName()); + workflowExecuteRunnableRepository.removeWorkflowExecutionRunnable(workflowInstanceId); + } + + @Override + public void stop() { + log.info("{} stopped", getClass().getName()); + } + +} diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dag/WorkflowExecuteRunnableFactory.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dag/WorkflowExecuteRunnableFactory.java new file mode 100644 index 0000000000..b1503ed01a --- /dev/null +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dag/WorkflowExecuteRunnableFactory.java @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.server.master.dag; + +import org.apache.dolphinscheduler.dao.entity.Command; +import org.apache.dolphinscheduler.dao.repository.ProcessInstanceDao; +import org.apache.dolphinscheduler.dao.repository.TaskInstanceDao; +import org.apache.dolphinscheduler.server.master.config.MasterConfig; +import org.apache.dolphinscheduler.server.master.exception.WorkflowCreateException; +import org.apache.dolphinscheduler.server.master.runner.IWorkflowExecuteContext; +import org.apache.dolphinscheduler.server.master.runner.StateWheelExecuteThread; +import org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteContextFactory; +import org.apache.dolphinscheduler.server.master.runner.execute.DefaultTaskExecuteRunnableFactory; +import org.apache.dolphinscheduler.server.master.runner.taskgroup.TaskGroupCoordinator; +import org.apache.dolphinscheduler.service.alert.ListenerEventAlertManager; +import org.apache.dolphinscheduler.service.alert.ProcessAlertManager; +import org.apache.dolphinscheduler.service.command.CommandService; +import org.apache.dolphinscheduler.service.expand.CuringParamsService; +import org.apache.dolphinscheduler.service.process.ProcessService; + +import java.util.Optional; + +import lombok.extern.slf4j.Slf4j; + +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Component; + +@Slf4j +@Component +public class WorkflowExecuteRunnableFactory { + + @Autowired + private WorkflowExecuteContextFactory workflowExecuteContextFactory; + + public Optional<WorkflowExecutionRunnable> createWorkflowExecuteRunnable(Command command) throws WorkflowCreateException { + try { + Optional<IWorkflowExecuteContext> workflowExecuteRunnableContextOptional = workflowExecuteContextFactory.createWorkflowExecuteRunnableContext(command); + return workflowExecuteRunnableContextOptional.map(iWorkflowExecuteContext -> { + return WorkflowExecutionRunnable.builder().build(); + }); + } catch (Exception ex) { + throw new WorkflowCreateException("Create WorkflowExecuteRunnable failed", ex); + } + } + +} diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dag/WorkflowExecuteRunnableRepository.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dag/WorkflowExecuteRunnableRepository.java new file mode 100644 index 0000000000..acf61d46ad --- /dev/null +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dag/WorkflowExecuteRunnableRepository.java @@ -0,0 +1,28 @@ +package org.apache.dolphinscheduler.server.master.dag; + +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +import lombok.extern.slf4j.Slf4j; + +import org.springframework.stereotype.Component; + +@Slf4j +@Component +public class WorkflowExecuteRunnableRepository { + + private final Map<Integer, IWorkflowExecutionRunnable> workflowExecutionRunnableMap = new ConcurrentHashMap<>(); + + public void storeWorkflowExecutionRunnable(IWorkflowExecutionRunnable workflowExecutionRunnable) { + workflowExecutionRunnableMap.put(workflowExecutionRunnable.getWorkflowInstanceId(), workflowExecutionRunnable); + } + + public IWorkflowExecutionRunnable getWorkflowExecutionRunnableById(Integer workflowInstanceId) { + return workflowExecutionRunnableMap.get(workflowInstanceId); + } + + public void removeWorkflowExecutionRunnable(Integer workflowInstanceId) { + workflowExecutionRunnableMap.remove(workflowInstanceId); + } + +} diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dag/WorkflowExecutionContext.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dag/WorkflowExecutionContext.java new file mode 100644 index 0000000000..7df8cf9ad1 --- /dev/null +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dag/WorkflowExecutionContext.java @@ -0,0 +1,21 @@ +package org.apache.dolphinscheduler.server.master.dag; + +import org.apache.dolphinscheduler.dao.entity.ProcessDefinition; +import org.apache.dolphinscheduler.dao.entity.ProcessInstance; + +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.Data; +import lombok.NoArgsConstructor; + +@Data +@Builder +@NoArgsConstructor +@AllArgsConstructor +public class WorkflowExecutionContext { + + private ProcessDefinition workflowDefinition; + + private ProcessInstance workflowInstance; + +} diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dag/WorkflowExecutionRunnable.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dag/WorkflowExecutionRunnable.java new file mode 100644 index 0000000000..aeda4ed5f7 --- /dev/null +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dag/WorkflowExecutionRunnable.java @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.server.master.dag; + +import org.apache.dolphinscheduler.server.master.events.IEvent; +import org.apache.dolphinscheduler.server.master.events.IEventRepository; +import org.apache.dolphinscheduler.server.master.events.WorkflowTriggeredEvent; + +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.Getter; +import lombok.NoArgsConstructor; +import lombok.extern.slf4j.Slf4j; + +@Slf4j +@Builder +@AllArgsConstructor +@NoArgsConstructor +public class WorkflowExecutionRunnable implements IWorkflowExecutionRunnable { + + @Getter + private WorkflowExecutionContext workflowExecutionContext; + + @Getter + private IDAGEngine dagEngine; + + private IEventRepository<IEvent> eventRepository; + + public void start() { + Integer workflowInstanceId = getWorkflowInstanceId(); + eventRepository.storeEventToTail(new WorkflowTriggeredEvent(workflowInstanceId)); + dagEngine.triggerNextTasks(null); + } + + @Override + public void pause() { + dagEngine.getWorkflowExecutionDAG().getActiveTaskExecutionRunnables().stream().forEach(taskExecuteRunnable -> { + dagEngine.pauseTask(taskExecuteRunnable.getTaskInstance().getId()); + }); + } + + @Override + public void kill() { + dagEngine.getWorkflowExecutionDAG().getActiveTaskExecutionRunnables().forEach(taskExecuteRunnable -> { + dagEngine.killTask(taskExecuteRunnable.getTaskInstance().getId()); + }); + } + +} diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dag/WorkflowIdentify.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dag/WorkflowIdentify.java new file mode 100644 index 0000000000..84a8b94bb2 --- /dev/null +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dag/WorkflowIdentify.java @@ -0,0 +1,18 @@ +package org.apache.dolphinscheduler.server.master.dag; + +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.Data; +import lombok.NoArgsConstructor; + +@Data +@Builder +@NoArgsConstructor +@AllArgsConstructor +public class WorkflowIdentify { + + private long workflowCode; + + private int workflowVersion; + +} diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/events/EventDispatcher.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/events/EventDispatcher.java new file mode 100644 index 0000000000..948cb476d7 --- /dev/null +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/events/EventDispatcher.java @@ -0,0 +1,53 @@ +package org.apache.dolphinscheduler.server.master.events; + +import org.apache.dolphinscheduler.server.master.dag.IWorkflowExecutionRunnable; +import org.apache.dolphinscheduler.server.master.dag.WorkflowExecuteRunnableRepository; + +import lombok.extern.slf4j.Slf4j; + +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Component; + +@Slf4j +@Component +public class EventDispatcher implements IEventDispatcher<IEvent> { + + @Autowired + private EventEngine eventEngine; + + @Autowired + private WorkflowExecuteRunnableRepository workflowExecuteRunnableRepository; + + @Override + public void start() { + log.info(getClass().getName() + " started"); + } + + @Override + public void dispatchEvent(IEvent iEvent) { + Integer workflowInstanceId; + if (iEvent instanceof IWorkflowEvent) { + workflowInstanceId = ((IWorkflowEvent) iEvent).getWorkflowInstanceId(); + } else if (iEvent instanceof ITaskEvent) { + workflowInstanceId = ((ITaskEvent) iEvent).getWorkflowInstanceId(); + } else { + throw new IllegalArgumentException("Unsupported event type: " + iEvent.getClass().getName()); + } + + IWorkflowExecutionRunnable workflowExecuteRunnable = + workflowExecuteRunnableRepository.getWorkflowExecutionRunnableById(workflowInstanceId); + if (workflowExecuteRunnable == null) { + log.error("Cannot find the IWorkflowExecutionRunnable for event: {}", iEvent); + return; + } + workflowExecuteRunnable.getEventRepository().storeEventToTail(iEvent); + log.debug("Success dispatch event {} to EventRepository", iEvent); + eventEngine.notify(); + } + + @Override + public void stop() { + log.info(getClass().getName() + " stopped"); + } + +} diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/events/EventEngine.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/events/EventEngine.java new file mode 100644 index 0000000000..1d37f4b19b --- /dev/null +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/events/EventEngine.java @@ -0,0 +1,93 @@ +package org.apache.dolphinscheduler.server.master.events; + +import org.apache.dolphinscheduler.common.thread.BaseDaemonThread; +import org.apache.dolphinscheduler.common.thread.ThreadUtils; +import org.apache.dolphinscheduler.dao.entity.ProcessInstance; +import org.apache.dolphinscheduler.plugin.task.api.utils.LogUtils; +import org.apache.dolphinscheduler.server.master.cache.IWorkflowExecuteRunnableRepository; +import org.apache.dolphinscheduler.server.master.dag.IWorkflowExecutionRunnable; + +import org.apache.commons.lang3.time.StopWatch; + +import java.util.Collection; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; + +import lombok.extern.slf4j.Slf4j; + +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Component; + +@Slf4j +@Component +public class EventEngine extends BaseDaemonThread { + + @Autowired + private IWorkflowExecuteRunnableRepository workflowExecuteRunnableRepository; + + @Autowired + private EventFirer eventFirer; + + private final Set<Integer> firingWorkflowInstanceIds = ConcurrentHashMap.newKeySet(); + + public EventEngine() { + super("EventEngine"); + } + + @Override + public synchronized void start() { + super.start(); + log.info(getClass().getName() + " started"); + } + + @Override + public void run() { + for (;;) { + try { + StopWatch stopWatch = StopWatch.createStarted(); + fireAllActiveEvents(); + stopWatch.stop(); + log.info("Fire all active events cost: {} ms", stopWatch.getTime()); + this.wait(5_000); + } catch (Throwable throwable) { + log.error("Fire active event error", throwable); + ThreadUtils.sleep(3_000); + } + } + } + + public void fireAllActiveEvents() { + Collection<IWorkflowExecutionRunnable> workflowExecutionRunnableCollection = + workflowExecuteRunnableRepository.getAll(); + for (IWorkflowExecutionRunnable workflowExecutionRunnable : workflowExecutionRunnableCollection) { + ProcessInstance workflowInstance = workflowExecutionRunnable.getWorkflowInstance(); + final Integer workflowInstanceId = workflowInstance.getId(); + final String workflowInstanceName = workflowInstance.getName(); + try { + LogUtils.setWorkflowInstanceIdMDC(workflowInstanceId); + if (firingWorkflowInstanceIds.contains(workflowInstanceId)) { + log.debug("WorkflowExecutionRunnable: {} is already in firing", workflowInstanceName); + return; + } + IEventRepository<IEvent> workflowEventRepository = workflowExecutionRunnable.getEventRepository(); + firingWorkflowInstanceIds.add(workflowInstanceId); + eventFirer.fireActiveEvents(workflowEventRepository) + .whenComplete((fireCount, ex) -> { + firingWorkflowInstanceIds.remove(workflowInstanceId); + if (ex != null) { + log.error("Fire event for WorkflowExecutionRunnable: {} error", workflowInstanceName, + ex); + } else { + if (fireCount > 0) { + log.info("Fire {} events for WorkflowExecutionRunnable: {} success", fireCount, + workflowInstanceName); + } + } + }); + } finally { + LogUtils.removeWorkflowInstanceIdMDC(); + } + } + } + +} diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/events/EventFirer.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/events/EventFirer.java new file mode 100644 index 0000000000..2782528a3f --- /dev/null +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/events/EventFirer.java @@ -0,0 +1,78 @@ +package org.apache.dolphinscheduler.server.master.events; + +import org.apache.dolphinscheduler.common.thread.ThreadUtils; +import org.apache.dolphinscheduler.server.master.config.MasterConfig; +import org.apache.dolphinscheduler.server.master.utils.ExceptionUtils; + +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ThreadPoolExecutor; + +import lombok.extern.slf4j.Slf4j; + +import org.springframework.stereotype.Component; + +@Slf4j +@Component +public class EventFirer implements IEventFirer<IEvent> { + + private final IEventOperatorManager<IEvent> eventOperatorManager; + + private final ThreadPoolExecutor eventFireThreadPool; + + public EventFirer(IEventOperatorManager<IEvent> eventOperatorManager, MasterConfig masterConfig) { + this.eventOperatorManager = eventOperatorManager; + this.eventFireThreadPool = + ThreadUtils.newDaemonFixedThreadExecutor("EventFireThreadPool", masterConfig.getExecThreads()); + } + + @Override + public CompletableFuture<Integer> fireActiveEvents(IEventRepository<IEvent> eventRepository) { + if (eventRepository.getEventSize() == 0) { + return CompletableFuture.completedFuture(0); + } + return CompletableFuture.supplyAsync(() -> { + int fireCount = 0; + for (;;) { + IEvent event = eventRepository.poolEvent(); + if (event == null) { + break; + } + + if (event instanceof IAsyncEvent) { + fireAsyncEvent(event); + fireCount++; + continue; + } + try { + fireSyncEvent(event); + fireCount++; + } catch (Exception ex) { + if (ExceptionUtils.isDatabaseConnectedFailedException(ex)) { + // If the event is failed due to cannot connect to DB, we should retry it + eventRepository.storeEventToHead(event); + } + throw ex; + } + } + return fireCount; + }, eventFireThreadPool); + } + + private void fireAsyncEvent(IEvent event) { + CompletableFuture.runAsync(() -> { + log.info("Begin fire IAsyncEvent: {}", event); + eventOperatorManager.getEventOperator(event).handleEvent(event); + log.info("Success fire IAsyncEvent: {}", event); + }, eventFireThreadPool).exceptionally(ex -> { + log.error("Failed to fire IAsyncEvent: {}", event, ex); + return null; + }); + } + + private void fireSyncEvent(IEvent event) { + log.info("Begin fire SyncEvent: {}", event); + eventOperatorManager.getEventOperator(event).handleEvent(event); + log.info("Success fire SyncEvent: {}", event); + } + +} diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/events/EventOperatorManager.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/events/EventOperatorManager.java new file mode 100644 index 0000000000..a7915a168b --- /dev/null +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/events/EventOperatorManager.java @@ -0,0 +1,19 @@ +package org.apache.dolphinscheduler.server.master.events; + +import lombok.extern.slf4j.Slf4j; + +import org.springframework.stereotype.Component; + +/** + * The event operator manager interface used to get {@link ITaskEventOperator}. + */ +@Slf4j +@Component +public class EventOperatorManager implements IEventOperatorManager<IEvent> { + + @Override + public IEventOperator<IEvent> getEventOperator(IEvent event) { + return null; + } + +} diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/events/EventRepository.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/events/EventRepository.java new file mode 100644 index 0000000000..5cf0d38ff7 --- /dev/null +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/events/EventRepository.java @@ -0,0 +1,36 @@ +package org.apache.dolphinscheduler.server.master.events; + +import java.util.concurrent.LinkedBlockingDeque; + +import lombok.extern.slf4j.Slf4j; + +@Slf4j +public class EventRepository implements IEventRepository<IEvent> { + + protected final LinkedBlockingDeque<IEvent> eventQueue; + + public EventRepository() { + this.eventQueue = new LinkedBlockingDeque<>(); + } + + @Override + public void storeEventToTail(IEvent event) { + eventQueue.offerLast(event); + } + + @Override + public void storeEventToHead(IEvent event) { + eventQueue.offerFirst(event); + } + + @Override + public IEvent poolEvent() { + return eventQueue.poll(); + } + + @Override + public int getEventSize() { + return eventQueue.size(); + } + +} diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/events/IAsyncEvent.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/events/IAsyncEvent.java new file mode 100644 index 0000000000..1ea4553554 --- /dev/null +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/events/IAsyncEvent.java @@ -0,0 +1,7 @@ +package org.apache.dolphinscheduler.server.master.events; + +/** + * Mark the event as AsyncEvent, if the event is marked as AsyncEvent, the event will be handled asynchronously and we don't . + */ +public interface IAsyncEvent { +} diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/events/IEvent.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/events/IEvent.java new file mode 100644 index 0000000000..425ba97d49 --- /dev/null +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/events/IEvent.java @@ -0,0 +1,5 @@ +package org.apache.dolphinscheduler.server.master.events; + +public interface IEvent { + +} diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/events/IEventDispatcher.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/events/IEventDispatcher.java new file mode 100644 index 0000000000..e32f8bc619 --- /dev/null +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/events/IEventDispatcher.java @@ -0,0 +1,15 @@ +package org.apache.dolphinscheduler.server.master.events; + +/** + * The event dispatcher interface used to dispatch event. + * Each event should be dispatched to the corresponding workflow event queue. + */ +public interface IEventDispatcher<E> { + + void start(); + + void stop(); + + void dispatchEvent(E event); + +} diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/events/IEventFirer.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/events/IEventFirer.java new file mode 100644 index 0000000000..576942089d --- /dev/null +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/events/IEventFirer.java @@ -0,0 +1,19 @@ +package org.apache.dolphinscheduler.server.master.events; + +import java.util.concurrent.CompletableFuture; + +/** + * The event firer interface used to fire event. + * + * @param <E> event type + */ +public interface IEventFirer<E> { + + /** + * Fire all active events in the event repository + * + * @return the count of fired success events + */ + CompletableFuture<Integer> fireActiveEvents(IEventRepository<E> eventRepository); + +} diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/events/IEventOperator.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/events/IEventOperator.java new file mode 100644 index 0000000000..ef767e35d0 --- /dev/null +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/events/IEventOperator.java @@ -0,0 +1,15 @@ +package org.apache.dolphinscheduler.server.master.events; + +/** + * The event operator interface used to handle event. + */ +public interface IEventOperator<E> { + + /** + * Handle the given event + * + * @param event event + */ + void handleEvent(E event); + +} diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/events/IEventOperatorManager.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/events/IEventOperatorManager.java new file mode 100644 index 0000000000..0670dc6d57 --- /dev/null +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/events/IEventOperatorManager.java @@ -0,0 +1,16 @@ +package org.apache.dolphinscheduler.server.master.events; + +/** + * The event operator manager interface used to get event operator. + */ +public interface IEventOperatorManager<E> { + + /** + * Get the {@link IEventOperator} for the given event. + * + * @param event event + * @return event operator for the given event + */ + IEventOperator<E> getEventOperator(E event); + +} diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/events/IEventRepository.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/events/IEventRepository.java new file mode 100644 index 0000000000..b6f3d284f5 --- /dev/null +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/events/IEventRepository.java @@ -0,0 +1,16 @@ +package org.apache.dolphinscheduler.server.master.events; + +/** + * The event repository interface used to store event. + */ +public interface IEventRepository<E> { + + void storeEventToTail(E event); + + void storeEventToHead(E event); + + E poolEvent(); + + int getEventSize(); + +} diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/events/ISyncEvent.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/events/ISyncEvent.java new file mode 100644 index 0000000000..95de44d0b8 --- /dev/null +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/events/ISyncEvent.java @@ -0,0 +1,4 @@ +package org.apache.dolphinscheduler.server.master.events; + +public interface ISyncEvent { +} diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/events/ITaskEvent.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/events/ITaskEvent.java new file mode 100644 index 0000000000..37b2a99b15 --- /dev/null +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/events/ITaskEvent.java @@ -0,0 +1,9 @@ +package org.apache.dolphinscheduler.server.master.events; + +public interface ITaskEvent extends IEvent { + + Integer getWorkflowInstanceId(); + + Integer getTaskInstanceId(); + +} diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/events/ITaskEventOperator.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/events/ITaskEventOperator.java new file mode 100644 index 0000000000..d2588a7027 --- /dev/null +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/events/ITaskEventOperator.java @@ -0,0 +1,5 @@ +package org.apache.dolphinscheduler.server.master.events; + +public interface ITaskEventOperator<E extends ITaskEvent> extends IEventOperator<E> { + +} diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/events/IWorkflowEvent.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/events/IWorkflowEvent.java new file mode 100644 index 0000000000..a9dc0a31a5 --- /dev/null +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/events/IWorkflowEvent.java @@ -0,0 +1,12 @@ +package org.apache.dolphinscheduler.server.master.events; + +public interface IWorkflowEvent extends IEvent { + + /** + * The id of WorkflowInstance which the event is related to + * + * @return workflowInstanceId, shouldn't be null + */ + Integer getWorkflowInstanceId(); + +} diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/events/IWorkflowEventOperator.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/events/IWorkflowEventOperator.java new file mode 100644 index 0000000000..5d70dd1859 --- /dev/null +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/events/IWorkflowEventOperator.java @@ -0,0 +1,6 @@ +package org.apache.dolphinscheduler.server.master.events; + +public interface IWorkflowEventOperator<E extends IWorkflowEvent> + extends + IEventOperator<E> { +} diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/events/TaskLogSendToRemoteEvent.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/events/TaskLogSendToRemoteEvent.java new file mode 100644 index 0000000000..a54210606e --- /dev/null +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/events/TaskLogSendToRemoteEvent.java @@ -0,0 +1,19 @@ +package org.apache.dolphinscheduler.server.master.events; + +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.Data; +import lombok.NoArgsConstructor; + +@Data +@Builder +@NoArgsConstructor +@AllArgsConstructor +public class TaskLogSendToRemoteEvent implements ITaskEvent { + + private Integer workflowInstanceId; + private Integer taskInstanceId; + + private String taskType; + private String logPath; +} diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/events/TaskLogSendToRemoteEventOperator.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/events/TaskLogSendToRemoteEventOperator.java new file mode 100644 index 0000000000..667c0de0cc --- /dev/null +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/events/TaskLogSendToRemoteEventOperator.java @@ -0,0 +1,21 @@ +package org.apache.dolphinscheduler.server.master.events; + +import org.apache.dolphinscheduler.common.log.remote.RemoteLogUtils; +import org.apache.dolphinscheduler.server.master.utils.TaskUtils; + +import lombok.extern.slf4j.Slf4j; + +import org.springframework.stereotype.Component; + +@Slf4j +@Component +public class TaskLogSendToRemoteEventOperator implements ITaskEventOperator<TaskLogSendToRemoteEvent> { + + @Override + public void handleEvent(TaskLogSendToRemoteEvent event) { + if (RemoteLogUtils.isRemoteLoggingEnable() && TaskUtils.isMasterTask(event.getTaskType())) { + RemoteLogUtils.sendRemoteLog(event.getLogPath()); + log.info("Master sends task log {} to remote storage asynchronously.", event.getLogPath()); + } + } +} diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/events/TaskOperationEvent.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/events/TaskOperationEvent.java new file mode 100644 index 0000000000..79df2c093f --- /dev/null +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/events/TaskOperationEvent.java @@ -0,0 +1,18 @@ +package org.apache.dolphinscheduler.server.master.events; + +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.Data; +import lombok.NoArgsConstructor; + +@Data +@Builder +@NoArgsConstructor +@AllArgsConstructor +public class TaskOperationEvent implements ITaskEvent, ISyncEvent { + + private Integer workflowInstanceId; + private Integer taskInstanceId; + + private TaskOperationType taskOperationType; +} diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/events/TaskOperationEventOperator.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/events/TaskOperationEventOperator.java new file mode 100644 index 0000000000..c4b8b53c13 --- /dev/null +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/events/TaskOperationEventOperator.java @@ -0,0 +1,48 @@ +package org.apache.dolphinscheduler.server.master.events; + +import org.apache.dolphinscheduler.server.master.dag.IWorkflowExecutionDAG; +import org.apache.dolphinscheduler.server.master.dag.IWorkflowExecutionRunnable; +import org.apache.dolphinscheduler.server.master.dag.WorkflowExecuteRunnableRepository; +import org.apache.dolphinscheduler.server.master.runner.TaskExecuteRunnable; + +import lombok.extern.slf4j.Slf4j; + +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Component; + +@Slf4j +@Component +public class TaskOperationEventOperator implements ITaskEventOperator<TaskOperationEvent> { + + @Autowired + private WorkflowExecuteRunnableRepository workflowExecuteRunnableRepository; + + @Override + public void handleEvent(TaskOperationEvent event) { + IWorkflowExecutionRunnable workflowExecutionRunnable = + workflowExecuteRunnableRepository.getWorkflowExecutionRunnableById(event.getWorkflowInstanceId()); + if (workflowExecutionRunnable == null) { + log.error("Cannot find the IWorkflowExecutionRunnable for event: {}", event); + return; + } + IWorkflowExecutionDAG workflowExecutionDAG = workflowExecutionRunnable.getDagEngine().getWorkflowExecutionDAG(); + TaskExecuteRunnable taskExecuteRunnable = + workflowExecutionDAG.getTaskExecutionRunnableById(event.getTaskInstanceId()); + if (taskExecuteRunnable == null) { + log.error("Cannot find the ITaskExecutionRunnable for event: {}", event); + } + switch (event.getTaskOperationType()) { + case DISPATCH: + taskExecuteRunnable.dispatch(); + break; + case KILL: + taskExecuteRunnable.kill(); + break; + case PAUSE: + taskExecuteRunnable.pause(); + break; + default: + log.error("Unknown TaskOperationType for event: {}", event); + } + } +} diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/events/TaskOperationType.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/events/TaskOperationType.java new file mode 100644 index 0000000000..0029342f62 --- /dev/null +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/events/TaskOperationType.java @@ -0,0 +1,8 @@ +package org.apache.dolphinscheduler.server.master.events; + +public enum TaskOperationType { + + DISPATCH, KILL, PAUSE, + ; + +} diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/events/TaskSuccessEvent.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/events/TaskSuccessEvent.java new file mode 100644 index 0000000000..950db9b468 --- /dev/null +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/events/TaskSuccessEvent.java @@ -0,0 +1,16 @@ +package org.apache.dolphinscheduler.server.master.events; + +import lombok.AllArgsConstructor; +import lombok.Data; +import lombok.NoArgsConstructor; + +@Data +@NoArgsConstructor +@AllArgsConstructor +public class TaskSuccessEvent implements ITaskEvent { + + private Integer workflowInstanceId; + + private Integer taskInstanceId; + +} diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/events/TaskSuccessEventOperator.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/events/TaskSuccessEventOperator.java new file mode 100644 index 0000000000..48cf50b3b8 --- /dev/null +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/events/TaskSuccessEventOperator.java @@ -0,0 +1,30 @@ +package org.apache.dolphinscheduler.server.master.events; + +import org.apache.dolphinscheduler.server.master.cache.IWorkflowExecuteRunnableRepository; +import org.apache.dolphinscheduler.server.master.dag.IWorkflowExecutionRunnable; + +import lombok.extern.slf4j.Slf4j; + +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Component; + +@Slf4j +@Component +public class TaskSuccessEventOperator implements ITaskEventOperator { + + @Autowired + private IWorkflowExecuteRunnableRepository workflowExecuteRunnableRepository; + + @Override + public void handleEvent(ITaskEvent event) { + Integer workflowInstanceId = event.getWorkflowInstanceId(); + Integer taskInstanceId = event.getTaskInstanceId(); + IWorkflowExecutionRunnable workflowExecutionRunnable = + workflowExecuteRunnableRepository.getByProcessInstanceId(workflowInstanceId); + if (workflowExecutionRunnable == null) { + log.error("Cannot find the WorkflowExecutionRunnable, the event: {} will be dropped", event); + return; + } + + } +} diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/events/WorkflowFailedEvent.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/events/WorkflowFailedEvent.java new file mode 100644 index 0000000000..e9de91a6cf --- /dev/null +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/events/WorkflowFailedEvent.java @@ -0,0 +1,18 @@ +package org.apache.dolphinscheduler.server.master.events; + +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.Data; +import lombok.NoArgsConstructor; + +@Data +@Builder +@NoArgsConstructor +@AllArgsConstructor +public class WorkflowFailedEvent implements IWorkflowEvent, ISyncEvent { + + private Integer workflowInstanceId; + + private String failedReason; + +} diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/events/WorkflowFailedEventOperator.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/events/WorkflowFailedEventOperator.java new file mode 100644 index 0000000000..9400209224 --- /dev/null +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/events/WorkflowFailedEventOperator.java @@ -0,0 +1,43 @@ +package org.apache.dolphinscheduler.server.master.events; + +import org.apache.dolphinscheduler.common.enums.WorkflowExecutionStatus; +import org.apache.dolphinscheduler.dao.entity.ProcessInstance; +import org.apache.dolphinscheduler.dao.repository.ProcessInstanceDao; +import org.apache.dolphinscheduler.server.master.dag.IWorkflowExecutionRunnable; +import org.apache.dolphinscheduler.server.master.dag.WorkflowExecuteRunnableRepository; + +import lombok.extern.slf4j.Slf4j; + +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Component; + +@Slf4j +@Component +public class WorkflowFailedEventOperator + implements + IWorkflowEventOperator<WorkflowFailedEvent> { + + @Autowired + private WorkflowExecuteRunnableRepository workflowExecuteRunnableRepository; + + @Autowired + private EventDispatcher eventDispatcher; + + @Autowired + private ProcessInstanceDao processInstanceDao; + + @Override + public void handleEvent(WorkflowFailedEvent event) { + Integer workflowInstanceId = event.getWorkflowInstanceId(); + IWorkflowExecutionRunnable workflowExecutionRunnable = + workflowExecuteRunnableRepository.getWorkflowExecutionRunnableById(workflowInstanceId); + + ProcessInstance workflowInstance = workflowExecutionRunnable.getWorkflowInstance(); + workflowInstance.setState(WorkflowExecutionStatus.FAILURE); + processInstanceDao.updateById(workflowInstance); + log.info("Handle WorkflowExecutionRunnableFailedEvent success, set workflowInstance status to {}", + workflowInstance.getState()); + + eventDispatcher.dispatchEvent(new WorkflowFinalizeEvent(workflowInstanceId)); + } +} diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/events/WorkflowFinalizeEvent.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/events/WorkflowFinalizeEvent.java new file mode 100644 index 0000000000..e4a4acebb3 --- /dev/null +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/events/WorkflowFinalizeEvent.java @@ -0,0 +1,15 @@ +package org.apache.dolphinscheduler.server.master.events; + +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.Data; +import lombok.NoArgsConstructor; + +@Data +@Builder +@AllArgsConstructor +@NoArgsConstructor +public class WorkflowFinalizeEvent implements IWorkflowEvent, ISyncEvent { + + private Integer workflowInstanceId; +} diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/events/WorkflowFinalizeEventOperator.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/events/WorkflowFinalizeEventOperator.java new file mode 100644 index 0000000000..4d4055c8db --- /dev/null +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/events/WorkflowFinalizeEventOperator.java @@ -0,0 +1,26 @@ +package org.apache.dolphinscheduler.server.master.events; + +import org.apache.dolphinscheduler.server.master.cache.IWorkflowExecuteRunnableRepository; +import org.apache.dolphinscheduler.server.master.dag.IWorkflowExecutionRunnable; + +import lombok.extern.slf4j.Slf4j; + +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Component; + +@Slf4j +@Component +public class WorkflowFinalizeEventOperator + implements + IWorkflowEventOperator<WorkflowFinalizeEvent> { + + @Autowired + private IWorkflowExecuteRunnableRepository<IWorkflowExecutionRunnable> workflowExecuteRunnableRepository; + + @Override + public void handleEvent(WorkflowFinalizeEvent event) { + Integer workflowInstanceId = event.getWorkflowInstanceId(); + workflowExecuteRunnableRepository.removeByProcessInstanceId(workflowInstanceId); + } + +} diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/events/WorkflowFinishEvent.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/events/WorkflowFinishEvent.java new file mode 100644 index 0000000000..db49459862 --- /dev/null +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/events/WorkflowFinishEvent.java @@ -0,0 +1,20 @@ +package org.apache.dolphinscheduler.server.master.events; + +import org.apache.dolphinscheduler.common.enums.WorkflowExecutionStatus; + +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.Data; +import lombok.NoArgsConstructor; + +@Data +@Builder +@NoArgsConstructor +@AllArgsConstructor +public class WorkflowFinishEvent implements IWorkflowEvent, ISyncEvent { + + private Integer workflowInstanceId; + + private WorkflowExecutionStatus workflowExecutionStatus; + +} diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/events/WorkflowOperationEvent.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/events/WorkflowOperationEvent.java new file mode 100644 index 0000000000..e0aee2d29b --- /dev/null +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/events/WorkflowOperationEvent.java @@ -0,0 +1,24 @@ +package org.apache.dolphinscheduler.server.master.events; + +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.Data; +import lombok.NoArgsConstructor; +import lombok.extern.slf4j.Slf4j; + +@Slf4j +@Data +@Builder +@NoArgsConstructor +@AllArgsConstructor +public class WorkflowOperationEvent implements IWorkflowEvent, ISyncEvent { + + private Integer workflowInstanceId; + private WorkflowOperationType workflowOperationType; + + public static WorkflowOperationEvent of(Integer workflowInstanceId, WorkflowOperationType workflowOperationType) { + return WorkflowOperationEvent.builder().workflowInstanceId(workflowInstanceId) + .workflowOperationType(workflowOperationType).build(); + } + +} diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/events/WorkflowOperationEventOperator.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/events/WorkflowOperationEventOperator.java new file mode 100644 index 0000000000..4231fb160c --- /dev/null +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/events/WorkflowOperationEventOperator.java @@ -0,0 +1,68 @@ +package org.apache.dolphinscheduler.server.master.events; + +import org.apache.dolphinscheduler.dao.entity.ProcessInstance; +import org.apache.dolphinscheduler.server.master.cache.IWorkflowExecuteRunnableRepository; +import org.apache.dolphinscheduler.server.master.dag.IWorkflowExecutionRunnable; +import org.apache.dolphinscheduler.server.master.utils.ExceptionUtils; + +import lombok.extern.slf4j.Slf4j; + +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Component; + +@Slf4j +@Component +public class WorkflowOperationEventOperator implements IWorkflowEventOperator<WorkflowOperationEvent> { + + @Autowired + private IWorkflowExecuteRunnableRepository workflowExecuteRunnableRepository; + + @Override + public void handleEvent(WorkflowOperationEvent event) { + IWorkflowExecutionRunnable workflowExecutionRunnable = + workflowExecuteRunnableRepository.getByProcessInstanceId(event.getWorkflowInstanceId()); + if (workflowExecutionRunnable == null) { + log.warn( + "Handle workflowExecutionRunnableKillOperationEvent: {} failed: WorkflowExecutionRunnable not found", + event); + return; + } + switch (event.getWorkflowOperationType()) { + case TRIGGER: + triggerWorkflow(workflowExecutionRunnable); + break; + case PAUSE: + pauseWorkflow(workflowExecutionRunnable); + break; + case KILL: + killWorkflow(workflowExecutionRunnable); + break; + default: + log.error("Unknown operationType for event: {}", event); + } + } + + private void triggerWorkflow(IWorkflowExecutionRunnable workflowExecutionRunnable) { + try { + workflowExecutionRunnable.start(); + } catch (Throwable exception) { + if (ExceptionUtils.isDatabaseConnectedFailedException(exception)) { + throw exception; + } + ProcessInstance workflowInstance = + workflowExecutionRunnable.getWorkflowExecutionContext().getWorkflowInstance(); + log.error("Trigger workflow: {} failed", workflowInstance.getName(), exception); + WorkflowFailedEvent workflowExecutionRunnableFailedEvent = WorkflowFailedEvent.builder() + .workflowInstanceId(workflowInstance.getId()).failedReason(exception.getMessage()).build(); + workflowExecutionRunnable.getEventRepository().storeEventToTail(workflowExecutionRunnableFailedEvent); + } + } + + private void pauseWorkflow(IWorkflowExecutionRunnable workflowExecutionRunnable) { + workflowExecutionRunnable.pause(); + } + + private void killWorkflow(IWorkflowExecutionRunnable workflowExecutionRunnable) { + workflowExecutionRunnable.kill(); + } +} diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/events/WorkflowOperationType.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/events/WorkflowOperationType.java new file mode 100644 index 0000000000..296c5c8dd6 --- /dev/null +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/events/WorkflowOperationType.java @@ -0,0 +1,10 @@ +package org.apache.dolphinscheduler.server.master.events; + +public enum WorkflowOperationType { + + TRIGGER, + PAUSE, + KILL, + ; + +} diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/events/WorkflowTimeoutEvent.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/events/WorkflowTimeoutEvent.java new file mode 100644 index 0000000000..ee3c6843a4 --- /dev/null +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/events/WorkflowTimeoutEvent.java @@ -0,0 +1,16 @@ +package org.apache.dolphinscheduler.server.master.events; + +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.Data; +import lombok.NoArgsConstructor; + +@Data +@Builder +@NoArgsConstructor +@AllArgsConstructor +public class WorkflowTimeoutEvent implements IWorkflowEvent, IAsyncEvent { + + private Integer workflowInstanceId; + +} diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/events/WorkflowTimeoutEventOperator.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/events/WorkflowTimeoutEventOperator.java new file mode 100644 index 0000000000..89014b8d9d --- /dev/null +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/events/WorkflowTimeoutEventOperator.java @@ -0,0 +1,43 @@ +package org.apache.dolphinscheduler.server.master.events; + +import org.apache.dolphinscheduler.dao.entity.ProcessInstance; +import org.apache.dolphinscheduler.dao.entity.ProjectUser; +import org.apache.dolphinscheduler.server.master.cache.IWorkflowExecuteRunnableRepository; +import org.apache.dolphinscheduler.server.master.dag.IWorkflowExecutionRunnable; +import org.apache.dolphinscheduler.service.alert.ProcessAlertManager; +import org.apache.dolphinscheduler.service.process.ProcessService; + +import lombok.extern.slf4j.Slf4j; + +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Component; + +@Slf4j +@Component +public class WorkflowTimeoutEventOperator + implements + IEventOperator<WorkflowTimeoutEvent> { + + @Autowired + private IWorkflowExecuteRunnableRepository<IWorkflowExecutionRunnable> workflowExecutionRunnableRepository; + + @Autowired + private ProcessService processService; + + @Autowired + private ProcessAlertManager processAlertManager; + + @Override + public void handleEvent(WorkflowTimeoutEvent event) { + IWorkflowExecutionRunnable workflowExecutionRunnable = + workflowExecutionRunnableRepository.getByProcessInstanceId(event.getWorkflowInstanceId()); + if (workflowExecutionRunnable == null) { + log.warn("Cannot find the workflow instance by id: {}", event.getWorkflowInstanceId()); + return; + } + // we only support timeout warning for now + ProcessInstance workflowInstance = workflowExecutionRunnable.getWorkflowInstance(); + ProjectUser projectUser = processService.queryProjectWithUserByProcessInstanceId(workflowInstance.getId()); + processAlertManager.sendProcessTimeoutAlert(workflowInstance, projectUser); + } +} diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/events/WorkflowTriggerNextTaskEvent.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/events/WorkflowTriggerNextTaskEvent.java new file mode 100644 index 0000000000..45fe8cab46 --- /dev/null +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/events/WorkflowTriggerNextTaskEvent.java @@ -0,0 +1,21 @@ +package org.apache.dolphinscheduler.server.master.events; + +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.Data; +import lombok.NoArgsConstructor; + +@Data +@Builder +@NoArgsConstructor +@AllArgsConstructor +public class WorkflowTriggerNextTaskEvent implements IWorkflowEvent, ISyncEvent { + + private int workflowInstanceId; + + /** + * The task name of the parent task, if it is the root task, the value is null + */ + private String parentTaskNodeName; + +} diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/events/WorkflowTriggerNextTaskEventOperator.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/events/WorkflowTriggerNextTaskEventOperator.java new file mode 100644 index 0000000000..50535c2b2c --- /dev/null +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/events/WorkflowTriggerNextTaskEventOperator.java @@ -0,0 +1,27 @@ +package org.apache.dolphinscheduler.server.master.events; + +import org.apache.dolphinscheduler.server.master.cache.IWorkflowExecuteRunnableRepository; +import org.apache.dolphinscheduler.server.master.dag.IWorkflowExecutionRunnable; + +import lombok.extern.slf4j.Slf4j; + +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Component; + +@Slf4j +@Component +public class WorkflowTriggerNextTaskEventOperator + implements + IWorkflowEventOperator<WorkflowTriggerNextTaskEvent> { + + @Autowired + private IWorkflowExecuteRunnableRepository<IWorkflowExecutionRunnable> workflowExecuteRunnableRepository; + + @Override + public void handleEvent(WorkflowTriggerNextTaskEvent event) { + IWorkflowExecutionRunnable workflowExecutionRunnable = + workflowExecuteRunnableRepository.getByProcessInstanceId(event.getWorkflowInstanceId()); + workflowExecutionRunnable.getDagEngine().triggerNextTasks(event.getParentTaskNodeName()); + } + +} diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/events/WorkflowTriggeredEvent.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/events/WorkflowTriggeredEvent.java new file mode 100644 index 0000000000..0c5283733d --- /dev/null +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/events/WorkflowTriggeredEvent.java @@ -0,0 +1,16 @@ +package org.apache.dolphinscheduler.server.master.events; + +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.Data; +import lombok.NoArgsConstructor; + +@Data +@Builder +@NoArgsConstructor +@AllArgsConstructor +public class WorkflowTriggeredEvent implements IWorkflowEvent, IAsyncEvent { + + private int workflowInstanceId; + +} diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/events/WorkflowTriggeredEventOperator.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/events/WorkflowTriggeredEventOperator.java new file mode 100644 index 0000000000..e61e1f90c3 --- /dev/null +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/events/WorkflowTriggeredEventOperator.java @@ -0,0 +1,44 @@ +package org.apache.dolphinscheduler.server.master.events; + +import org.apache.dolphinscheduler.dao.entity.ProcessInstance; +import org.apache.dolphinscheduler.dao.entity.ProjectUser; +import org.apache.dolphinscheduler.server.master.dag.IWorkflowExecutionRunnable; +import org.apache.dolphinscheduler.server.master.dag.WorkflowExecuteRunnableRepository; +import org.apache.dolphinscheduler.server.master.metrics.ProcessInstanceMetrics; +import org.apache.dolphinscheduler.service.alert.ListenerEventAlertManager; +import org.apache.dolphinscheduler.service.process.ProcessService; + +import lombok.extern.slf4j.Slf4j; + +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Component; + +@Slf4j +@Component +public class WorkflowTriggeredEventOperator + implements + IWorkflowEventOperator<WorkflowTriggeredEvent> { + + @Autowired + private WorkflowExecuteRunnableRepository workflowExecuteRunnableRepository; + + @Autowired + private ProcessService processService; + + @Autowired + private ListenerEventAlertManager listenerEventAlertManager; + + @Override + public void handleEvent(WorkflowTriggeredEvent event) { + int workflowInstanceId = event.getWorkflowInstanceId(); + IWorkflowExecutionRunnable workflowExecutionRunnable = + workflowExecuteRunnableRepository.getWorkflowExecutionRunnableById(workflowInstanceId); + Long workflowDefinitionCode = + workflowExecutionRunnable.getWorkflowExecutionContext().getWorkflowDefinition().getCode(); + ProcessInstanceMetrics.incProcessInstanceByStateAndProcessDefinitionCode("submit", "" + workflowDefinitionCode); + + ProcessInstance workflowInstance = workflowExecutionRunnable.getWorkflowInstance(); + ProjectUser projectUser = processService.queryProjectWithUserByProcessInstanceId(workflowInstanceId); + listenerEventAlertManager.publishProcessStartListenerEvent(workflowInstance, projectUser); + } +} diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/exception/TaskExecuteRunnableNotFoundException.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/exception/TaskExecuteRunnableNotFoundException.java new file mode 100644 index 0000000000..19a11b9639 --- /dev/null +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/exception/TaskExecuteRunnableNotFoundException.java @@ -0,0 +1,12 @@ +package org.apache.dolphinscheduler.server.master.exception; + +public class TaskExecuteRunnableNotFoundException extends RuntimeException { + + public TaskExecuteRunnableNotFoundException(Integer workflowInstanceId) { + super("WorkflowExecuteRunnable not found: [id=" + workflowInstanceId + "]"); + } + + public TaskExecuteRunnableNotFoundException(String workflowInstanceName) { + super("WorkflowExecuteRunnable not found: [name=" + workflowInstanceName + "]"); + } +} diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/exception/TaskExecutionContextCreateException.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/exception/TaskExecutionContextCreateException.java deleted file mode 100644 index ac37c94438..0000000000 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/exception/TaskExecutionContextCreateException.java +++ /dev/null @@ -1,26 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.dolphinscheduler.server.master.exception; - -public class TaskExecutionContextCreateException extends MasterException { - - public TaskExecutionContextCreateException(String message) { - super(message); - } - -} diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/exception/WorkflowExecuteRunnableNotFoundException.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/exception/WorkflowExecuteRunnableNotFoundException.java new file mode 100644 index 0000000000..3ded58f3c6 --- /dev/null +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/exception/WorkflowExecuteRunnableNotFoundException.java @@ -0,0 +1,13 @@ +package org.apache.dolphinscheduler.server.master.exception; + +public class WorkflowExecuteRunnableNotFoundException extends RuntimeException { + + public WorkflowExecuteRunnableNotFoundException(Integer workflowInstanceId) { + super("WorkflowExecuteRunnable not found: [id=" + workflowInstanceId + "]"); + } + + public WorkflowExecuteRunnableNotFoundException(String workflowInstanceName) { + super("WorkflowExecuteRunnable not found: [name=" + workflowInstanceName + "]"); + } + +}
