This is an automated email from the ASF dual-hosted git repository. wenjun pushed a commit to branch dev_wenjun_refactorMaster in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git
commit 75ca4d4f4a7b5864ca53f0aeb8cbe36a7469d223 Author: Wenjun Ruan <[email protected]> AuthorDate: Tue Mar 5 15:48:00 2024 +0800 Refactor Master --- .../server/master/dag/BasicDAG.java | 5 ++ .../dolphinscheduler/server/master/dag/DAG.java | 45 +++++++++++ .../server/master/dag/DAGEngine.java | 75 +++++++++++++++++ .../server/master/dag/DAGEngineFactory.java | 37 +++++++++ .../server/master/dag/IDAGEngine.java | 42 ++++++++++ .../server/master/dag/IDAGEngineFactory.java | 9 +++ .../server/master/dag/IEventRepositoryFactory.java | 9 +++ .../master/dag/IEventfulExecutionRunnable.java | 8 ++ .../server/master/dag/ITaskExecutionRunnable.java | 13 +++ .../master/dag/ITaskExecutionRunnableFactory.java | 7 ++ .../server/master/dag/IWorkflowDAG.java | 25 ++++++ .../server/master/dag/IWorkflowDAGFactory.java | 7 ++ .../server/master/dag/IWorkflowEngine.java | 49 ++++++++++++ .../server/master/dag/IWorkflowExecutionDAG.java | 14 ++++ .../master/dag/IWorkflowExecutionDAGFactory.java | 6 ++ .../master/dag/IWorkflowExecutionRunnable.java | 68 ++++++++++++++++ .../master/dag/MemoryEventRepositoryFactory.java | 18 +++++ .../server/master/dag/TaskExecutionContext.java | 12 +++ .../server/master/dag/TaskExecutionRunnable.java | 28 +++++++ .../dag/TaskExecutionRunnableRepository.java | 23 ++++++ .../TaskExecutionRunnableRepositoryFactory.java | 15 ++++ .../server/master/dag/TaskIdentify.java | 5 ++ .../master/dag/TaskTriggerConditionChecker.java | 7 ++ .../server/master/dag/WorkflowEngine.java | 75 +++++++++++++++++ .../dag/WorkflowExecuteRunnableRepository.java | 33 ++++++++ .../master/dag/WorkflowExecutionContext.java | 21 +++++ .../dag/WorkflowExecutionContextFactory.java | 14 ++++ .../server/master/dag/WorkflowExecutionDAG.java | 36 +++++++++ .../master/dag/WorkflowExecutionDAGFactory.java | 29 +++++++ .../master/dag/WorkflowExecutionRunnable.java | 57 +++++++++++++ .../dag/WorkflowExecutionRunnableFactory.java | 44 ++++++++++ .../server/master/dag/WorkflowIdentify.java | 18 +++++ .../server/master/events/EventDispatcher.java | 53 ++++++++++++ .../server/master/events/EventEngine.java | 93 ++++++++++++++++++++++ .../server/master/events/EventFirer.java | 78 ++++++++++++++++++ .../server/master/events/EventOperatorManager.java | 19 +++++ .../server/master/events/IAsyncEvent.java | 7 ++ .../server/master/events/IEvent.java | 5 ++ .../server/master/events/IEventDispatcher.java | 15 ++++ .../server/master/events/IEventFirer.java | 19 +++++ .../server/master/events/IEventOperator.java | 15 ++++ .../master/events/IEventOperatorManager.java | 16 ++++ .../server/master/events/IEventRepository.java | 16 ++++ .../server/master/events/ISyncEvent.java | 4 + .../server/master/events/ITaskEvent.java | 9 +++ .../server/master/events/ITaskEventOperator.java | 5 ++ .../server/master/events/IWorkflowEvent.java | 12 +++ .../master/events/IWorkflowEventOperator.java | 6 ++ .../master/events/MemoryEventRepository.java | 36 +++++++++ .../master/events/TaskLogSendToRemoteEvent.java | 19 +++++ .../events/TaskLogSendToRemoteEventOperator.java | 21 +++++ .../server/master/events/TaskOperationEvent.java | 18 +++++ .../master/events/TaskOperationEventOperator.java | 48 +++++++++++ .../server/master/events/TaskOperationType.java | 8 ++ .../server/master/events/TaskSuccessEvent.java | 16 ++++ .../master/events/TaskSuccessEventOperator.java | 30 +++++++ .../server/master/events/WorkflowFailedEvent.java | 18 +++++ .../master/events/WorkflowFailedEventOperator.java | 43 ++++++++++ .../master/events/WorkflowFinalizeEvent.java | 15 ++++ .../events/WorkflowFinalizeEventOperator.java | 26 ++++++ .../server/master/events/WorkflowFinishEvent.java | 20 +++++ .../master/events/WorkflowOperationEvent.java | 24 ++++++ .../events/WorkflowOperationEventOperator.java | 68 ++++++++++++++++ .../master/events/WorkflowOperationType.java | 10 +++ .../server/master/events/WorkflowTimeoutEvent.java | 16 ++++ .../events/WorkflowTimeoutEventOperator.java | 43 ++++++++++ .../events/WorkflowTriggerNextTaskEvent.java | 21 +++++ .../WorkflowTriggerNextTaskEventOperator.java | 27 +++++++ .../master/events/WorkflowTriggeredEvent.java | 16 ++++ .../events/WorkflowTriggeredEventOperator.java | 44 ++++++++++ .../TaskExecuteRunnableNotFoundException.java | 12 +++ .../TaskExecutionContextCreateException.java | 26 ------ .../WorkflowExecuteRunnableNotFoundException.java | 13 +++ .../master/runner/DefaultTaskExecuteRunnable.java | 3 +- .../execute/DefaultTaskExecuteRunnableFactory.java | 18 ++--- 75 files changed, 1845 insertions(+), 40 deletions(-) diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dag/BasicDAG.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dag/BasicDAG.java new file mode 100644 index 0000000000..40bffe0b23 --- /dev/null +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dag/BasicDAG.java @@ -0,0 +1,5 @@ +package org.apache.dolphinscheduler.server.master.dag; + +public abstract class BasicDAG<E> implements DAG<E> { + +} diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dag/DAG.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dag/DAG.java new file mode 100644 index 0000000000..145744e12b --- /dev/null +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dag/DAG.java @@ -0,0 +1,45 @@ +package org.apache.dolphinscheduler.server.master.dag; + +import java.util.List; + +import lombok.AllArgsConstructor; +import lombok.Data; + +/** + * The Directed Acyclic Graph class + * + * @param <E> type of the node content. + */ +public interface DAG<E> { + + List<DAGNode<E>> getAllPostNodes(DAGNode<E> dagNode); + + List<DAGNode<E>> getAllParentNodes(DAGNode<E> dagNode); + + DAGNode<E> getDAGNode(String node); + + /** + * The node of the DAG. + * + * @param <E> content type of the node. + */ + @Data + @AllArgsConstructor + class DAGNode<E> { + + private String nodeName; + private E nodeContent; + } + + /** + * The edge of the DAG. + */ + @Data + @AllArgsConstructor + class DAGEdge { + + private String nodeName; + private String nextNodeName; + } + +} diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dag/DAGEngine.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dag/DAGEngine.java new file mode 100644 index 0000000000..d60d249e9e --- /dev/null +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dag/DAGEngine.java @@ -0,0 +1,75 @@ +package org.apache.dolphinscheduler.server.master.dag; + +import org.apache.dolphinscheduler.dao.entity.TaskInstance; +import org.apache.dolphinscheduler.server.master.events.IEventRepository; +import org.apache.dolphinscheduler.server.master.events.TaskOperationEvent; +import org.apache.dolphinscheduler.server.master.events.TaskOperationType; + +import java.util.List; + +import lombok.extern.slf4j.Slf4j; + +@Slf4j +@Builder +@AllArgsConstructor +public class DAGEngine implements IDAGEngine { + + @Getter + private final IWorkflowExecutionDAG workflowExecutionDAG; + + private final List<TaskTriggerConditionChecker> taskTriggerConditionCheckers; + + private final ITaskExecutionRunnableFactory taskExecutionRunnableFactory; + + private final IEventRepository eventRepository; + + @Override + public void triggerNextTasks(String parentTaskNodeName) { + workflowExecutionDAG.getWorkflowDAG().getPostNodeNames(parentTaskNodeName).forEach(this::triggerTask); + } + + @Override + @SneakyThrows + public void triggerTask(String taskName) { + for (TaskTriggerConditionChecker taskTriggerConditionChecker : taskTriggerConditionCheckers) { + if (!taskTriggerConditionChecker.taskCanTrigger(taskName)) { + return; + } + } + // todo: create Task ExecutionRunnable + TaskExecutionRunnable taskExecuteRunnable = workflowExecutionDAG.createTaskExecutionRunnable(taskName); + TaskInstance taskInstance = taskExecuteRunnable.getTaskExecutionContext().getTaskInstance(); + TaskOperationEvent taskOperationEvent = TaskOperationEvent.builder().workflowInstanceId(taskInstance.getId()) + .taskInstanceId(taskInstance.getId()).taskOperationType(TaskOperationType.DISPATCH).build(); + eventRepository.storeEventToTail(taskOperationEvent); + } + + @Override + public void pauseTask(Integer taskInstanceId) { + TaskExecutionRunnable taskExecutionRunnable = workflowExecutionDAG.getTaskExecutionRunnableById(taskInstanceId); + if (taskExecutionRunnable == null) { + log.error("Cannot find the ITaskExecutionRunnable for: {}", taskInstanceId); + return; + } + TaskInstance taskInstance = taskExecutionRunnable.getTaskExecutionContext().getTaskInstance(); + TaskOperationEvent taskOperationEvent = + TaskOperationEvent.builder().workflowInstanceId(taskInstance.getProcessInstanceId()) + .taskInstanceId(taskInstance.getId()).taskOperationType(TaskOperationType.PAUSE).build(); + eventRepository.storeEventToTail(taskOperationEvent); + } + + @Override + public void killTask(Integer taskInstanceId) { + TaskExecutionRunnable taskExecutionRunnable = workflowExecutionDAG.getTaskExecutionRunnableById(taskInstanceId); + if (taskExecutionRunnable == null) { + log.error("Cannot find the ITaskExecutionRunnable for: {}", taskInstanceId); + return; + } + + TaskInstance taskInstance = taskExecutionRunnable.getTaskExecutionContext().getTaskInstance(); + TaskOperationEvent taskOperationEvent = TaskOperationEvent.builder().workflowInstanceId(taskInstance.getId()) + .taskInstanceId(taskInstance.getId()).taskOperationType(TaskOperationType.KILL).build(); + eventRepository.storeEventToTail(taskOperationEvent); + } + +} diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dag/DAGEngineFactory.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dag/DAGEngineFactory.java new file mode 100644 index 0000000000..262cb9b1fb --- /dev/null +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dag/DAGEngineFactory.java @@ -0,0 +1,37 @@ +package org.apache.dolphinscheduler.server.master.dag; + +import org.apache.dolphinscheduler.server.master.events.IEventRepository; + +import java.util.List; + +import lombok.extern.slf4j.Slf4j; + +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Component; + +@Slf4j +@Component +public class DAGEngineFactory implements IDAGEngineFactory { + + @Autowired + private IWorkflowExecutionDAGFactory workflowExecutionDAGFactory; + + @Autowired + private List<TaskTriggerConditionChecker> taskTriggerConditionCheckers; + + @Autowired + private ITaskExecutionRunnableFactory taskExecutionRunnableFactory; + + @Override + public IDAGEngine createDAGEngine(WorkflowExecutionContext workflowExecutionContext, + IEventRepository eventRepository) { + IWorkflowExecutionDAG workflowExecutionDAG = + workflowExecutionDAGFactory.createWorkflowExecutionDAG(workflowExecutionContext); + return DAGEngine.builder() + .workflowExecutionDAG(workflowExecutionDAG) + .taskExecutionRunnableFactory(taskExecutionRunnableFactory) + .taskTriggerConditionCheckers(taskTriggerConditionCheckers) + .eventRepository(eventRepository) + .build(); + } +} diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dag/IDAGEngine.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dag/IDAGEngine.java new file mode 100644 index 0000000000..5e3b651872 --- /dev/null +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dag/IDAGEngine.java @@ -0,0 +1,42 @@ +package org.apache.dolphinscheduler.server.master.dag; + +/** + * The IDAGEngine is responsible for triggering, killing, pausing, and finalizing task in {@link IWorkflowExecutionDAG}. + * <p>All DAG operation should directly use the method in IDAGEngine, new {@link IWorkflowExecutionDAG} should be triggered by new IDAGEngine. + */ +public interface IDAGEngine { + + /** + * Trigger the tasks which are post of the given task. + * <P> If there are no task after the given taskNode, will try to finish the WorkflowExecutionRunnable. + * <p> If the + * + * @param parentTaskNodeName the parent task name + */ + void triggerNextTasks(String parentTaskNodeName); + + /** + * Trigger the given task + * + * @param taskName task name + */ + void triggerTask(String taskName); + + /** + * Pause the given task. + */ + void pauseTask(Integer taskId); + + /** + * Kill the given task. + */ + void killTask(Integer taskId); + + /** + * Get {@link IWorkflowExecutionDAG} belong to the Engine. + * + * @return workflow execution DAG. + */ + IWorkflowExecutionDAG getWorkflowExecutionDAG(); + +} diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dag/IDAGEngineFactory.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dag/IDAGEngineFactory.java new file mode 100644 index 0000000000..9401e3bd2c --- /dev/null +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dag/IDAGEngineFactory.java @@ -0,0 +1,9 @@ +package org.apache.dolphinscheduler.server.master.dag; + +import org.apache.dolphinscheduler.server.master.events.IEventRepository; + +public interface IDAGEngineFactory { + + IDAGEngine createDAGEngine(WorkflowExecutionContext workflowExecutionContext, IEventRepository eventRepository); + +} diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dag/IEventRepositoryFactory.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dag/IEventRepositoryFactory.java new file mode 100644 index 0000000000..4dc260c580 --- /dev/null +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dag/IEventRepositoryFactory.java @@ -0,0 +1,9 @@ +package org.apache.dolphinscheduler.server.master.dag; + +import org.apache.dolphinscheduler.server.master.events.IEventRepository; + +public interface IEventRepositoryFactory { + + IEventRepository createEventRepository(); + +} diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dag/IEventfulExecutionRunnable.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dag/IEventfulExecutionRunnable.java new file mode 100644 index 0000000000..b5209e3e64 --- /dev/null +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dag/IEventfulExecutionRunnable.java @@ -0,0 +1,8 @@ +package org.apache.dolphinscheduler.server.master.dag; + +import org.apache.dolphinscheduler.server.master.events.IEventRepository; + +public interface IEventfulExecutionRunnable { + + IEventRepository getEventRepository(); +} diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dag/ITaskExecutionRunnable.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dag/ITaskExecutionRunnable.java new file mode 100644 index 0000000000..c1c54f1d12 --- /dev/null +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dag/ITaskExecutionRunnable.java @@ -0,0 +1,13 @@ +package org.apache.dolphinscheduler.server.master.dag; + +public interface ITaskExecutionRunnable { + + void dispatch(); + + void kill(); + + void pause(); + + TaskExecutionContext getTaskExecutionContext(); + +} diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dag/ITaskExecutionRunnableFactory.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dag/ITaskExecutionRunnableFactory.java new file mode 100644 index 0000000000..5115825672 --- /dev/null +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dag/ITaskExecutionRunnableFactory.java @@ -0,0 +1,7 @@ +package org.apache.dolphinscheduler.server.master.dag; + +public interface ITaskExecutionRunnableFactory { + + TaskExecutionRunnable createTaskExecutionRunnable(TaskExecutionContext taskExecutionContext); + +} diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dag/IWorkflowDAG.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dag/IWorkflowDAG.java new file mode 100644 index 0000000000..20d083cf22 --- /dev/null +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dag/IWorkflowDAG.java @@ -0,0 +1,25 @@ +package org.apache.dolphinscheduler.server.master.dag; + +import org.apache.dolphinscheduler.dao.entity.TaskDefinition; + +import java.util.List; + +public interface IWorkflowDAG extends DAG<TaskDefinition> { + + /** + * Return the post task name of given parentTaskName. + * + * @param parentTaskName parent task name, can be null. + * @return post task name list, sort by priority. + */ + List<String> getPostNodeNames(String parentTaskName); + + /** + * Get the pre task name of given taskName + * + * @param taskName task name can be null. + * @return parent task name list. + */ + List<String> getParentNodeNames(String taskName); + +} diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dag/IWorkflowDAGFactory.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dag/IWorkflowDAGFactory.java new file mode 100644 index 0000000000..266d979110 --- /dev/null +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dag/IWorkflowDAGFactory.java @@ -0,0 +1,7 @@ +package org.apache.dolphinscheduler.server.master.dag; + +public interface IWorkflowDAGFactory { + + IWorkflowDAG buildWorkflowDAG(WorkflowIdentify workflowIdentify); + +} diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dag/IWorkflowEngine.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dag/IWorkflowEngine.java new file mode 100644 index 0000000000..b7c430cfe2 --- /dev/null +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dag/IWorkflowEngine.java @@ -0,0 +1,49 @@ +package org.apache.dolphinscheduler.server.master.dag; + +import org.apache.dolphinscheduler.server.master.exception.WorkflowExecuteRunnableNotFoundException; + +/** + * The WorkflowEngine is responsible for starting, stopping, pausing, and finalizing workflows. + */ +public interface IWorkflowEngine { + + /** + * Start the workflow engine. + */ + void start(); + + /** + * Trigger a workflow to start. + * + * @param workflowExecuteRunnable the workflow to start + */ + void triggerWorkflow(IWorkflowExecutionRunnable workflowExecuteRunnable); + + /** + * Pause a workflow instance. + * + * @param workflowInstanceId the ID of the workflow to pause + * @throws WorkflowExecuteRunnableNotFoundException if the workflow is not found + */ + void pauseWorkflow(Integer workflowInstanceId); + + /** + * Kill a workflow instance. + * + * @param workflowInstanceId the ID of the workflow to stop + * @throws WorkflowExecuteRunnableNotFoundException if the workflow is not found + */ + void killWorkflow(Integer workflowInstanceId); + + /** + * Finalize a workflow instance. Once a workflow has been finalized, then it cannot receive new operation, and will be removed from memory. + * + * @param workflowInstanceId the ID of the workflow to finalize + */ + void finalizeWorkflow(Integer workflowInstanceId); + + /** + * Stop the workflow engine. + */ + void stop(); +} diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dag/IWorkflowExecutionDAG.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dag/IWorkflowExecutionDAG.java new file mode 100644 index 0000000000..eac23880ad --- /dev/null +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dag/IWorkflowExecutionDAG.java @@ -0,0 +1,14 @@ +package org.apache.dolphinscheduler.server.master.dag; + +import java.util.List; + +public interface IWorkflowExecutionDAG { + + IWorkflowDAG getWorkflowDAG(); + + TaskExecutionRunnable getTaskExecutionRunnableById(Integer taskInstanceId); + + List<TaskExecutionRunnable> getActiveTaskExecutionRunnables(); + + TaskExecutionRunnable createTaskExecutionRunnable(String taskName); +} diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dag/IWorkflowExecutionDAGFactory.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dag/IWorkflowExecutionDAGFactory.java new file mode 100644 index 0000000000..19a2d45cae --- /dev/null +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dag/IWorkflowExecutionDAGFactory.java @@ -0,0 +1,6 @@ +package org.apache.dolphinscheduler.server.master.dag; + +public interface IWorkflowExecutionDAGFactory { + + IWorkflowExecutionDAG createWorkflowExecutionDAG(WorkflowExecutionContext workflowExecutionContext); +} diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dag/IWorkflowExecutionRunnable.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dag/IWorkflowExecutionRunnable.java new file mode 100644 index 0000000000..bff326c12c --- /dev/null +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dag/IWorkflowExecutionRunnable.java @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.server.master.dag; + +import org.apache.dolphinscheduler.dao.entity.ProcessInstance; + +/** + * The IWorkflowExecuteRunnable represent a running workflow instance, it is responsible for operate the workflow instance. e.g. start, kill, pause, timeout. + */ +public interface IWorkflowExecutionRunnable extends IEventfulExecutionRunnable { + + /** + * Start the workflow instance. + */ + void start(); + + /** + * Kill the workflow instance. + */ + void kill(); + + /** + * Pause the workflow instance. + */ + void pause(); + + /** + * Get the workflow execution context. + * + * @return the workflow execution context + */ + WorkflowExecutionContext getWorkflowExecutionContext(); + + /** + * Get the {@link IDAGEngine} which used to execute the dag of the workflow instance. + * + * @return dag engine. + */ + IDAGEngine getDagEngine(); + + default Integer getWorkflowInstanceId() { + return getWorkflowExecutionContext().getWorkflowInstance().getId(); + } + + default String getWorkflowInstanceName() { + return getWorkflowExecutionContext().getWorkflowInstance().getName(); + } + + default ProcessInstance getWorkflowInstance() { + return getWorkflowExecutionContext().getWorkflowInstance(); + } + +} diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dag/MemoryEventRepositoryFactory.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dag/MemoryEventRepositoryFactory.java new file mode 100644 index 0000000000..01dea77785 --- /dev/null +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dag/MemoryEventRepositoryFactory.java @@ -0,0 +1,18 @@ +package org.apache.dolphinscheduler.server.master.dag; + +import org.apache.dolphinscheduler.server.master.events.IEventRepository; +import org.apache.dolphinscheduler.server.master.events.MemoryEventRepository; + +import lombok.extern.slf4j.Slf4j; + +import org.springframework.stereotype.Component; + +@Slf4j +@Component +public class MemoryEventRepositoryFactory implements IEventRepositoryFactory { + + @Override + public IEventRepository createEventRepository() { + return new MemoryEventRepository(); + } +} diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dag/TaskExecutionContext.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dag/TaskExecutionContext.java new file mode 100644 index 0000000000..b8b46a3078 --- /dev/null +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dag/TaskExecutionContext.java @@ -0,0 +1,12 @@ +package org.apache.dolphinscheduler.server.master.dag; + +import org.apache.dolphinscheduler.dao.entity.TaskInstance; + +import lombok.Data; + +@Data +public class TaskExecutionContext { + + private TaskInstance taskInstance; + +} diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dag/TaskExecutionRunnable.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dag/TaskExecutionRunnable.java new file mode 100644 index 0000000000..2aeb5cfe2d --- /dev/null +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dag/TaskExecutionRunnable.java @@ -0,0 +1,28 @@ +package org.apache.dolphinscheduler.server.master.dag; + +import lombok.Builder; +import lombok.Getter; +import lombok.extern.slf4j.Slf4j; + +@Slf4j +@Getter +@Builder +public class TaskExecutionRunnable implements ITaskExecutionRunnable { + + private TaskExecutionContext taskExecutionContext; + + @Override + public void dispatch() { + + } + + @Override + public void kill() { + + } + + @Override + public void pause() { + + } +} diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dag/TaskExecutionRunnableRepository.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dag/TaskExecutionRunnableRepository.java new file mode 100644 index 0000000000..e1bba388da --- /dev/null +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dag/TaskExecutionRunnableRepository.java @@ -0,0 +1,23 @@ +package org.apache.dolphinscheduler.server.master.dag; + +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.stream.Collectors; + +public class TaskExecutionRunnableRepository { + + private final Map<Integer, TaskExecutionRunnable> taskExecuteRunnableMap = new ConcurrentHashMap<>(); + + public TaskExecutionRunnable getTaskExecutionRunnableById(Integer taskInstanceId) { + return taskExecuteRunnableMap.get(taskInstanceId); + } + + public List<TaskExecutionRunnable> getActiveTaskExecutionRunnable() { + return taskExecuteRunnableMap.values().stream().filter(taskExecuteRunnable -> { + // filter the status is not finished + return true; + }).collect(Collectors.toList()); + } + +} diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dag/TaskExecutionRunnableRepositoryFactory.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dag/TaskExecutionRunnableRepositoryFactory.java new file mode 100644 index 0000000000..fbcd3f2901 --- /dev/null +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dag/TaskExecutionRunnableRepositoryFactory.java @@ -0,0 +1,15 @@ +package org.apache.dolphinscheduler.server.master.dag; + +import lombok.extern.slf4j.Slf4j; + +import org.springframework.stereotype.Component; + +@Slf4j +@Component +public class TaskExecutionRunnableRepositoryFactory { + + public TaskExecutionRunnableRepository createTaskExecutionRunnableRepository() { + return new TaskExecutionRunnableRepository(); + } + +} diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dag/TaskIdentify.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dag/TaskIdentify.java new file mode 100644 index 0000000000..e3e7420ab4 --- /dev/null +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dag/TaskIdentify.java @@ -0,0 +1,5 @@ +package org.apache.dolphinscheduler.server.master.dag; + +public class TaskIdentify { + +} diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dag/TaskTriggerConditionChecker.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dag/TaskTriggerConditionChecker.java new file mode 100644 index 0000000000..daf652e0b8 --- /dev/null +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dag/TaskTriggerConditionChecker.java @@ -0,0 +1,7 @@ +package org.apache.dolphinscheduler.server.master.dag; + +public interface TaskTriggerConditionChecker { + + boolean taskCanTrigger(String taskName); + +} diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dag/WorkflowEngine.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dag/WorkflowEngine.java new file mode 100644 index 0000000000..eb8bc0cfa1 --- /dev/null +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dag/WorkflowEngine.java @@ -0,0 +1,75 @@ +package org.apache.dolphinscheduler.server.master.dag; + +import org.apache.dolphinscheduler.dao.entity.ProcessInstance; +import org.apache.dolphinscheduler.server.master.events.WorkflowOperationEvent; +import org.apache.dolphinscheduler.server.master.events.WorkflowOperationType; +import org.apache.dolphinscheduler.server.master.exception.WorkflowExecuteRunnableNotFoundException; + +import lombok.extern.slf4j.Slf4j; + +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Component; + +@Slf4j +@Component +public class WorkflowEngine implements IWorkflowEngine { + + @Autowired + private WorkflowExecuteRunnableRepository workflowExecuteRunnableRepository; + + @Override + public void start() { + log.info("{} started", getClass().getName()); + } + + @Override + public void triggerWorkflow(IWorkflowExecutionRunnable workflowExecuteRunnable) { + ProcessInstance workflowInstance = workflowExecuteRunnable.getWorkflowExecutionContext().getWorkflowInstance(); + Integer workflowInstanceId = workflowInstance.getId(); + log.info("Triggering WorkflowExecutionRunnable: {}", workflowInstance.getName()); + workflowExecuteRunnableRepository.storeWorkflowExecutionRunnable(workflowExecuteRunnable); + workflowExecuteRunnable.getEventRepository() + .storeEventToTail(WorkflowOperationEvent.of(workflowInstanceId, WorkflowOperationType.TRIGGER)); + } + + @Override + public void pauseWorkflow(Integer workflowInstanceId) { + IWorkflowExecutionRunnable workflowExecuteRunnable = + workflowExecuteRunnableRepository.getWorkflowExecutionRunnableById(workflowInstanceId); + if (workflowExecuteRunnable == null) { + throw new WorkflowExecuteRunnableNotFoundException(workflowInstanceId); + } + log.info("Pausing WorkflowExecutionRunnable: {}", workflowExecuteRunnable.getWorkflowInstanceName()); + workflowExecuteRunnable.getEventRepository() + .storeEventToTail(WorkflowOperationEvent.of(workflowInstanceId, WorkflowOperationType.PAUSE)); + } + + @Override + public void killWorkflow(Integer workflowInstanceId) { + IWorkflowExecutionRunnable workflowExecuteRunnable = + workflowExecuteRunnableRepository.getWorkflowExecutionRunnableById(workflowInstanceId); + if (workflowExecuteRunnable == null) { + throw new WorkflowExecuteRunnableNotFoundException(workflowInstanceId); + } + log.info("Killing WorkflowExecutionRunnable: {}", workflowExecuteRunnable.getWorkflowInstanceName()); + workflowExecuteRunnable.getEventRepository() + .storeEventToTail(WorkflowOperationEvent.of(workflowInstanceId, WorkflowOperationType.KILL)); + } + + @Override + public void finalizeWorkflow(Integer workflowInstanceId) { + IWorkflowExecutionRunnable workflowExecutionRunnable = + workflowExecuteRunnableRepository.getWorkflowExecutionRunnableById(workflowInstanceId); + if (workflowExecutionRunnable == null) { + return; + } + log.info("Finalizing WorkflowExecutionRunnable: {}", workflowExecutionRunnable.getWorkflowInstanceName()); + workflowExecuteRunnableRepository.removeWorkflowExecutionRunnable(workflowInstanceId); + } + + @Override + public void stop() { + log.info("{} stopped", getClass().getName()); + } + +} diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dag/WorkflowExecuteRunnableRepository.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dag/WorkflowExecuteRunnableRepository.java new file mode 100644 index 0000000000..fd3a05ce40 --- /dev/null +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dag/WorkflowExecuteRunnableRepository.java @@ -0,0 +1,33 @@ +package org.apache.dolphinscheduler.server.master.dag; + +import java.util.Collection; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +import lombok.extern.slf4j.Slf4j; + +import org.springframework.stereotype.Component; + +@Slf4j +@Component +public class WorkflowExecuteRunnableRepository { + + private final Map<Integer, IWorkflowExecutionRunnable> workflowExecutionRunnableMap = new ConcurrentHashMap<>(); + + public void storeWorkflowExecutionRunnable(IWorkflowExecutionRunnable workflowExecutionRunnable) { + workflowExecutionRunnableMap.put(workflowExecutionRunnable.getWorkflowInstanceId(), workflowExecutionRunnable); + } + + public IWorkflowExecutionRunnable getWorkflowExecutionRunnableById(Integer workflowInstanceId) { + return workflowExecutionRunnableMap.get(workflowInstanceId); + } + + public Collection<IWorkflowExecutionRunnable> getActiveWorkflowExecutionRunnable() { + return workflowExecutionRunnableMap.values(); + } + + public void removeWorkflowExecutionRunnable(Integer workflowInstanceId) { + workflowExecutionRunnableMap.remove(workflowInstanceId); + } + +} diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dag/WorkflowExecutionContext.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dag/WorkflowExecutionContext.java new file mode 100644 index 0000000000..7df8cf9ad1 --- /dev/null +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dag/WorkflowExecutionContext.java @@ -0,0 +1,21 @@ +package org.apache.dolphinscheduler.server.master.dag; + +import org.apache.dolphinscheduler.dao.entity.ProcessDefinition; +import org.apache.dolphinscheduler.dao.entity.ProcessInstance; + +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.Data; +import lombok.NoArgsConstructor; + +@Data +@Builder +@NoArgsConstructor +@AllArgsConstructor +public class WorkflowExecutionContext { + + private ProcessDefinition workflowDefinition; + + private ProcessInstance workflowInstance; + +} diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dag/WorkflowExecutionContextFactory.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dag/WorkflowExecutionContextFactory.java new file mode 100644 index 0000000000..a38f0e9853 --- /dev/null +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dag/WorkflowExecutionContextFactory.java @@ -0,0 +1,14 @@ +package org.apache.dolphinscheduler.server.master.dag; + +import org.apache.dolphinscheduler.dao.entity.Command; + +import org.springframework.stereotype.Component; + +@Component +public class WorkflowExecutionContextFactory { + + public WorkflowExecutionContext createWorkflowExecutionContext(Command command) { + return null; + } + +} diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dag/WorkflowExecutionDAG.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dag/WorkflowExecutionDAG.java new file mode 100644 index 0000000000..d990f5f7c6 --- /dev/null +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dag/WorkflowExecutionDAG.java @@ -0,0 +1,36 @@ +package org.apache.dolphinscheduler.server.master.dag; + +import java.util.List; + +import lombok.Getter; +import lombok.extern.slf4j.Slf4j; + +@Slf4j +public class WorkflowExecutionDAG implements IWorkflowExecutionDAG { + + @Getter + private final IWorkflowDAG workflowDAG; + + private final TaskExecutionRunnableRepository taskExecutionRunnableRepository; + + public WorkflowExecutionDAG(IWorkflowDAG workflowDAG, + TaskExecutionRunnableRepository taskExecutionRunnableRepository) { + this.workflowDAG = workflowDAG; + this.taskExecutionRunnableRepository = taskExecutionRunnableRepository; + } + + @Override + public TaskExecutionRunnable getTaskExecutionRunnableById(Integer taskInstanceId) { + return taskExecutionRunnableRepository.getTaskExecutionRunnableById(taskInstanceId); + } + + @Override + public List<TaskExecutionRunnable> getActiveTaskExecutionRunnables() { + return taskExecutionRunnableRepository.getActiveTaskExecutionRunnable(); + } + + @Override + public TaskExecutionRunnable createTaskExecutionRunnable(String taskName) { + return null; + } +} diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dag/WorkflowExecutionDAGFactory.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dag/WorkflowExecutionDAGFactory.java new file mode 100644 index 0000000000..8e1ff804c3 --- /dev/null +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dag/WorkflowExecutionDAGFactory.java @@ -0,0 +1,29 @@ +package org.apache.dolphinscheduler.server.master.dag; + +import lombok.extern.slf4j.Slf4j; + +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Component; + +@Slf4j +@Component +public class WorkflowExecutionDAGFactory implements IWorkflowExecutionDAGFactory { + + @Autowired + private IWorkflowDAGFactory workflowDAGFactory; + + @Autowired + private TaskExecutionRunnableRepositoryFactory taskExecutionRunnableRepositoryFactory; + + @Override + public IWorkflowExecutionDAG createWorkflowExecutionDAG(WorkflowExecutionContext workflowExecutionContext) { + long workflowDefinitionCode = workflowExecutionContext.getWorkflowDefinition().getCode(); + int workflowDefinitionVersion = workflowExecutionContext.getWorkflowDefinition().getVersion(); + + IWorkflowDAG workflowDAG = workflowDAGFactory + .buildWorkflowDAG(new WorkflowIdentify(workflowDefinitionCode, workflowDefinitionVersion)); + TaskExecutionRunnableRepository taskExecutionRunnableRepository = + taskExecutionRunnableRepositoryFactory.createTaskExecutionRunnableRepository(); + return new WorkflowExecutionDAG(workflowDAG, taskExecutionRunnableRepository); + } +} diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dag/WorkflowExecutionRunnable.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dag/WorkflowExecutionRunnable.java new file mode 100644 index 0000000000..44b5020d09 --- /dev/null +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dag/WorkflowExecutionRunnable.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.server.master.dag; + +import org.apache.dolphinscheduler.server.master.events.IEventRepository; +import org.apache.dolphinscheduler.server.master.events.WorkflowTriggeredEvent; + +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.Getter; +import lombok.extern.slf4j.Slf4j; + +@Slf4j +@Getter +@Builder +@AllArgsConstructor +public class WorkflowExecutionRunnable implements IWorkflowExecutionRunnable { + + private final WorkflowExecutionContext workflowExecutionContext; + + private final IDAGEngine dagEngine; + + private final IEventRepository eventRepository; + + public void start() { + dagEngine.triggerNextTasks(null); + eventRepository.storeEventToTail(new WorkflowTriggeredEvent(getWorkflowInstanceId())); + } + + @Override + public void pause() { + dagEngine.getWorkflowExecutionDAG().getActiveTaskExecutionRunnables().forEach(taskExecuteRunnable -> dagEngine + .pauseTask(taskExecuteRunnable.getTaskExecutionContext().getTaskInstance().getId())); + } + + @Override + public void kill() { + dagEngine.getWorkflowExecutionDAG().getActiveTaskExecutionRunnables().forEach(taskExecuteRunnable -> dagEngine + .killTask(taskExecuteRunnable.getTaskExecutionContext().getTaskInstance().getId())); + } + +} diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dag/WorkflowExecutionRunnableFactory.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dag/WorkflowExecutionRunnableFactory.java new file mode 100644 index 0000000000..c1eacf5d8a --- /dev/null +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dag/WorkflowExecutionRunnableFactory.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.server.master.dag; + +import org.apache.dolphinscheduler.server.master.events.IEventRepository; + +import lombok.extern.slf4j.Slf4j; + +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Component; + +@Slf4j +@Component +public class WorkflowExecutionRunnableFactory { + + @Autowired + private IDAGEngineFactory dagEngineFactory; + + @Autowired + private IEventRepositoryFactory eventRepositoryFactory; + + public WorkflowExecutionRunnable createWorkflowExecuteRunnable(WorkflowExecutionContext workflowExecutionContext) { + IEventRepository eventRepository = eventRepositoryFactory.createEventRepository(); + IDAGEngine dagEngine = dagEngineFactory.createDAGEngine(workflowExecutionContext, eventRepository); + return WorkflowExecutionRunnable.builder().workflowExecutionContext(workflowExecutionContext) + .eventRepository(eventRepository).dagEngine(dagEngine).build(); + } + +} diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dag/WorkflowIdentify.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dag/WorkflowIdentify.java new file mode 100644 index 0000000000..84a8b94bb2 --- /dev/null +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dag/WorkflowIdentify.java @@ -0,0 +1,18 @@ +package org.apache.dolphinscheduler.server.master.dag; + +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.Data; +import lombok.NoArgsConstructor; + +@Data +@Builder +@NoArgsConstructor +@AllArgsConstructor +public class WorkflowIdentify { + + private long workflowCode; + + private int workflowVersion; + +} diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/events/EventDispatcher.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/events/EventDispatcher.java new file mode 100644 index 0000000000..948cb476d7 --- /dev/null +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/events/EventDispatcher.java @@ -0,0 +1,53 @@ +package org.apache.dolphinscheduler.server.master.events; + +import org.apache.dolphinscheduler.server.master.dag.IWorkflowExecutionRunnable; +import org.apache.dolphinscheduler.server.master.dag.WorkflowExecuteRunnableRepository; + +import lombok.extern.slf4j.Slf4j; + +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Component; + +@Slf4j +@Component +public class EventDispatcher implements IEventDispatcher<IEvent> { + + @Autowired + private EventEngine eventEngine; + + @Autowired + private WorkflowExecuteRunnableRepository workflowExecuteRunnableRepository; + + @Override + public void start() { + log.info(getClass().getName() + " started"); + } + + @Override + public void dispatchEvent(IEvent iEvent) { + Integer workflowInstanceId; + if (iEvent instanceof IWorkflowEvent) { + workflowInstanceId = ((IWorkflowEvent) iEvent).getWorkflowInstanceId(); + } else if (iEvent instanceof ITaskEvent) { + workflowInstanceId = ((ITaskEvent) iEvent).getWorkflowInstanceId(); + } else { + throw new IllegalArgumentException("Unsupported event type: " + iEvent.getClass().getName()); + } + + IWorkflowExecutionRunnable workflowExecuteRunnable = + workflowExecuteRunnableRepository.getWorkflowExecutionRunnableById(workflowInstanceId); + if (workflowExecuteRunnable == null) { + log.error("Cannot find the IWorkflowExecutionRunnable for event: {}", iEvent); + return; + } + workflowExecuteRunnable.getEventRepository().storeEventToTail(iEvent); + log.debug("Success dispatch event {} to EventRepository", iEvent); + eventEngine.notify(); + } + + @Override + public void stop() { + log.info(getClass().getName() + " stopped"); + } + +} diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/events/EventEngine.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/events/EventEngine.java new file mode 100644 index 0000000000..fbe92c69c6 --- /dev/null +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/events/EventEngine.java @@ -0,0 +1,93 @@ +package org.apache.dolphinscheduler.server.master.events; + +import org.apache.dolphinscheduler.common.thread.BaseDaemonThread; +import org.apache.dolphinscheduler.common.thread.ThreadUtils; +import org.apache.dolphinscheduler.dao.entity.ProcessInstance; +import org.apache.dolphinscheduler.plugin.task.api.utils.LogUtils; +import org.apache.dolphinscheduler.server.master.dag.IWorkflowExecutionRunnable; +import org.apache.dolphinscheduler.server.master.dag.WorkflowExecuteRunnableRepository; + +import org.apache.commons.lang3.time.StopWatch; + +import java.util.Collection; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; + +import lombok.extern.slf4j.Slf4j; + +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Component; + +@Slf4j +@Component +public class EventEngine extends BaseDaemonThread { + + @Autowired + private WorkflowExecuteRunnableRepository workflowExecuteRunnableRepository; + + @Autowired + private EventFirer eventFirer; + + private final Set<Integer> firingWorkflowInstanceIds = ConcurrentHashMap.newKeySet(); + + public EventEngine() { + super("EventEngine"); + } + + @Override + public synchronized void start() { + super.start(); + log.info(getClass().getName() + " started"); + } + + @Override + public void run() { + for (;;) { + try { + StopWatch stopWatch = StopWatch.createStarted(); + fireAllActiveEvents(); + stopWatch.stop(); + log.info("Fire all active events cost: {} ms", stopWatch.getTime()); + this.wait(5_000); + } catch (Throwable throwable) { + log.error("Fire active event error", throwable); + ThreadUtils.sleep(3_000); + } + } + } + + public void fireAllActiveEvents() { + Collection<IWorkflowExecutionRunnable> workflowExecutionRunnableCollection = + workflowExecuteRunnableRepository.getActiveWorkflowExecutionRunnable(); + for (IWorkflowExecutionRunnable workflowExecutionRunnable : workflowExecutionRunnableCollection) { + ProcessInstance workflowInstance = workflowExecutionRunnable.getWorkflowInstance(); + final Integer workflowInstanceId = workflowInstance.getId(); + final String workflowInstanceName = workflowInstance.getName(); + try { + LogUtils.setWorkflowInstanceIdMDC(workflowInstanceId); + if (firingWorkflowInstanceIds.contains(workflowInstanceId)) { + log.debug("WorkflowExecutionRunnable: {} is already in firing", workflowInstanceName); + return; + } + IEventRepository<IEvent> workflowEventRepository = workflowExecutionRunnable.getEventRepository(); + firingWorkflowInstanceIds.add(workflowInstanceId); + eventFirer.fireActiveEvents(workflowEventRepository) + .whenComplete((fireCount, ex) -> { + firingWorkflowInstanceIds.remove(workflowInstanceId); + if (ex != null) { + log.error("Fire event for WorkflowExecutionRunnable: {} error", workflowInstanceName, + ex); + } else { + if (fireCount > 0) { + log.info("Fire {} events for WorkflowExecutionRunnable: {} success", fireCount, + workflowInstanceName); + } + } + }); + } finally { + LogUtils.removeWorkflowInstanceIdMDC(); + } + } + } + +} diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/events/EventFirer.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/events/EventFirer.java new file mode 100644 index 0000000000..2782528a3f --- /dev/null +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/events/EventFirer.java @@ -0,0 +1,78 @@ +package org.apache.dolphinscheduler.server.master.events; + +import org.apache.dolphinscheduler.common.thread.ThreadUtils; +import org.apache.dolphinscheduler.server.master.config.MasterConfig; +import org.apache.dolphinscheduler.server.master.utils.ExceptionUtils; + +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ThreadPoolExecutor; + +import lombok.extern.slf4j.Slf4j; + +import org.springframework.stereotype.Component; + +@Slf4j +@Component +public class EventFirer implements IEventFirer<IEvent> { + + private final IEventOperatorManager<IEvent> eventOperatorManager; + + private final ThreadPoolExecutor eventFireThreadPool; + + public EventFirer(IEventOperatorManager<IEvent> eventOperatorManager, MasterConfig masterConfig) { + this.eventOperatorManager = eventOperatorManager; + this.eventFireThreadPool = + ThreadUtils.newDaemonFixedThreadExecutor("EventFireThreadPool", masterConfig.getExecThreads()); + } + + @Override + public CompletableFuture<Integer> fireActiveEvents(IEventRepository<IEvent> eventRepository) { + if (eventRepository.getEventSize() == 0) { + return CompletableFuture.completedFuture(0); + } + return CompletableFuture.supplyAsync(() -> { + int fireCount = 0; + for (;;) { + IEvent event = eventRepository.poolEvent(); + if (event == null) { + break; + } + + if (event instanceof IAsyncEvent) { + fireAsyncEvent(event); + fireCount++; + continue; + } + try { + fireSyncEvent(event); + fireCount++; + } catch (Exception ex) { + if (ExceptionUtils.isDatabaseConnectedFailedException(ex)) { + // If the event is failed due to cannot connect to DB, we should retry it + eventRepository.storeEventToHead(event); + } + throw ex; + } + } + return fireCount; + }, eventFireThreadPool); + } + + private void fireAsyncEvent(IEvent event) { + CompletableFuture.runAsync(() -> { + log.info("Begin fire IAsyncEvent: {}", event); + eventOperatorManager.getEventOperator(event).handleEvent(event); + log.info("Success fire IAsyncEvent: {}", event); + }, eventFireThreadPool).exceptionally(ex -> { + log.error("Failed to fire IAsyncEvent: {}", event, ex); + return null; + }); + } + + private void fireSyncEvent(IEvent event) { + log.info("Begin fire SyncEvent: {}", event); + eventOperatorManager.getEventOperator(event).handleEvent(event); + log.info("Success fire SyncEvent: {}", event); + } + +} diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/events/EventOperatorManager.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/events/EventOperatorManager.java new file mode 100644 index 0000000000..a7915a168b --- /dev/null +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/events/EventOperatorManager.java @@ -0,0 +1,19 @@ +package org.apache.dolphinscheduler.server.master.events; + +import lombok.extern.slf4j.Slf4j; + +import org.springframework.stereotype.Component; + +/** + * The event operator manager interface used to get {@link ITaskEventOperator}. + */ +@Slf4j +@Component +public class EventOperatorManager implements IEventOperatorManager<IEvent> { + + @Override + public IEventOperator<IEvent> getEventOperator(IEvent event) { + return null; + } + +} diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/events/IAsyncEvent.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/events/IAsyncEvent.java new file mode 100644 index 0000000000..1ea4553554 --- /dev/null +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/events/IAsyncEvent.java @@ -0,0 +1,7 @@ +package org.apache.dolphinscheduler.server.master.events; + +/** + * Mark the event as AsyncEvent, if the event is marked as AsyncEvent, the event will be handled asynchronously and we don't . + */ +public interface IAsyncEvent { +} diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/events/IEvent.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/events/IEvent.java new file mode 100644 index 0000000000..425ba97d49 --- /dev/null +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/events/IEvent.java @@ -0,0 +1,5 @@ +package org.apache.dolphinscheduler.server.master.events; + +public interface IEvent { + +} diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/events/IEventDispatcher.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/events/IEventDispatcher.java new file mode 100644 index 0000000000..e32f8bc619 --- /dev/null +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/events/IEventDispatcher.java @@ -0,0 +1,15 @@ +package org.apache.dolphinscheduler.server.master.events; + +/** + * The event dispatcher interface used to dispatch event. + * Each event should be dispatched to the corresponding workflow event queue. + */ +public interface IEventDispatcher<E> { + + void start(); + + void stop(); + + void dispatchEvent(E event); + +} diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/events/IEventFirer.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/events/IEventFirer.java new file mode 100644 index 0000000000..576942089d --- /dev/null +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/events/IEventFirer.java @@ -0,0 +1,19 @@ +package org.apache.dolphinscheduler.server.master.events; + +import java.util.concurrent.CompletableFuture; + +/** + * The event firer interface used to fire event. + * + * @param <E> event type + */ +public interface IEventFirer<E> { + + /** + * Fire all active events in the event repository + * + * @return the count of fired success events + */ + CompletableFuture<Integer> fireActiveEvents(IEventRepository<E> eventRepository); + +} diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/events/IEventOperator.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/events/IEventOperator.java new file mode 100644 index 0000000000..ef767e35d0 --- /dev/null +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/events/IEventOperator.java @@ -0,0 +1,15 @@ +package org.apache.dolphinscheduler.server.master.events; + +/** + * The event operator interface used to handle event. + */ +public interface IEventOperator<E> { + + /** + * Handle the given event + * + * @param event event + */ + void handleEvent(E event); + +} diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/events/IEventOperatorManager.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/events/IEventOperatorManager.java new file mode 100644 index 0000000000..0670dc6d57 --- /dev/null +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/events/IEventOperatorManager.java @@ -0,0 +1,16 @@ +package org.apache.dolphinscheduler.server.master.events; + +/** + * The event operator manager interface used to get event operator. + */ +public interface IEventOperatorManager<E> { + + /** + * Get the {@link IEventOperator} for the given event. + * + * @param event event + * @return event operator for the given event + */ + IEventOperator<E> getEventOperator(E event); + +} diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/events/IEventRepository.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/events/IEventRepository.java new file mode 100644 index 0000000000..eda74a2ee7 --- /dev/null +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/events/IEventRepository.java @@ -0,0 +1,16 @@ +package org.apache.dolphinscheduler.server.master.events; + +/** + * The event repository interface used to store event. + */ +public interface IEventRepository { + + void storeEventToTail(IEvent event); + + void storeEventToHead(IEvent event); + + IEvent poolEvent(); + + int getEventSize(); + +} diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/events/ISyncEvent.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/events/ISyncEvent.java new file mode 100644 index 0000000000..95de44d0b8 --- /dev/null +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/events/ISyncEvent.java @@ -0,0 +1,4 @@ +package org.apache.dolphinscheduler.server.master.events; + +public interface ISyncEvent { +} diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/events/ITaskEvent.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/events/ITaskEvent.java new file mode 100644 index 0000000000..37b2a99b15 --- /dev/null +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/events/ITaskEvent.java @@ -0,0 +1,9 @@ +package org.apache.dolphinscheduler.server.master.events; + +public interface ITaskEvent extends IEvent { + + Integer getWorkflowInstanceId(); + + Integer getTaskInstanceId(); + +} diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/events/ITaskEventOperator.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/events/ITaskEventOperator.java new file mode 100644 index 0000000000..d2588a7027 --- /dev/null +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/events/ITaskEventOperator.java @@ -0,0 +1,5 @@ +package org.apache.dolphinscheduler.server.master.events; + +public interface ITaskEventOperator<E extends ITaskEvent> extends IEventOperator<E> { + +} diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/events/IWorkflowEvent.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/events/IWorkflowEvent.java new file mode 100644 index 0000000000..a9dc0a31a5 --- /dev/null +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/events/IWorkflowEvent.java @@ -0,0 +1,12 @@ +package org.apache.dolphinscheduler.server.master.events; + +public interface IWorkflowEvent extends IEvent { + + /** + * The id of WorkflowInstance which the event is related to + * + * @return workflowInstanceId, shouldn't be null + */ + Integer getWorkflowInstanceId(); + +} diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/events/IWorkflowEventOperator.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/events/IWorkflowEventOperator.java new file mode 100644 index 0000000000..5d70dd1859 --- /dev/null +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/events/IWorkflowEventOperator.java @@ -0,0 +1,6 @@ +package org.apache.dolphinscheduler.server.master.events; + +public interface IWorkflowEventOperator<E extends IWorkflowEvent> + extends + IEventOperator<E> { +} diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/events/MemoryEventRepository.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/events/MemoryEventRepository.java new file mode 100644 index 0000000000..ccaf6d2611 --- /dev/null +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/events/MemoryEventRepository.java @@ -0,0 +1,36 @@ +package org.apache.dolphinscheduler.server.master.events; + +import java.util.concurrent.LinkedBlockingDeque; + +import lombok.extern.slf4j.Slf4j; + +@Slf4j +public class MemoryEventRepository implements IEventRepository { + + protected final LinkedBlockingDeque<IEvent> eventQueue; + + public MemoryEventRepository() { + this.eventQueue = new LinkedBlockingDeque<>(); + } + + @Override + public void storeEventToTail(IEvent event) { + eventQueue.offerLast(event); + } + + @Override + public void storeEventToHead(IEvent event) { + eventQueue.offerFirst(event); + } + + @Override + public IEvent poolEvent() { + return eventQueue.poll(); + } + + @Override + public int getEventSize() { + return eventQueue.size(); + } + +} diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/events/TaskLogSendToRemoteEvent.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/events/TaskLogSendToRemoteEvent.java new file mode 100644 index 0000000000..a54210606e --- /dev/null +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/events/TaskLogSendToRemoteEvent.java @@ -0,0 +1,19 @@ +package org.apache.dolphinscheduler.server.master.events; + +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.Data; +import lombok.NoArgsConstructor; + +@Data +@Builder +@NoArgsConstructor +@AllArgsConstructor +public class TaskLogSendToRemoteEvent implements ITaskEvent { + + private Integer workflowInstanceId; + private Integer taskInstanceId; + + private String taskType; + private String logPath; +} diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/events/TaskLogSendToRemoteEventOperator.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/events/TaskLogSendToRemoteEventOperator.java new file mode 100644 index 0000000000..667c0de0cc --- /dev/null +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/events/TaskLogSendToRemoteEventOperator.java @@ -0,0 +1,21 @@ +package org.apache.dolphinscheduler.server.master.events; + +import org.apache.dolphinscheduler.common.log.remote.RemoteLogUtils; +import org.apache.dolphinscheduler.server.master.utils.TaskUtils; + +import lombok.extern.slf4j.Slf4j; + +import org.springframework.stereotype.Component; + +@Slf4j +@Component +public class TaskLogSendToRemoteEventOperator implements ITaskEventOperator<TaskLogSendToRemoteEvent> { + + @Override + public void handleEvent(TaskLogSendToRemoteEvent event) { + if (RemoteLogUtils.isRemoteLoggingEnable() && TaskUtils.isMasterTask(event.getTaskType())) { + RemoteLogUtils.sendRemoteLog(event.getLogPath()); + log.info("Master sends task log {} to remote storage asynchronously.", event.getLogPath()); + } + } +} diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/events/TaskOperationEvent.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/events/TaskOperationEvent.java new file mode 100644 index 0000000000..79df2c093f --- /dev/null +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/events/TaskOperationEvent.java @@ -0,0 +1,18 @@ +package org.apache.dolphinscheduler.server.master.events; + +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.Data; +import lombok.NoArgsConstructor; + +@Data +@Builder +@NoArgsConstructor +@AllArgsConstructor +public class TaskOperationEvent implements ITaskEvent, ISyncEvent { + + private Integer workflowInstanceId; + private Integer taskInstanceId; + + private TaskOperationType taskOperationType; +} diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/events/TaskOperationEventOperator.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/events/TaskOperationEventOperator.java new file mode 100644 index 0000000000..749894fc87 --- /dev/null +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/events/TaskOperationEventOperator.java @@ -0,0 +1,48 @@ +package org.apache.dolphinscheduler.server.master.events; + +import org.apache.dolphinscheduler.server.master.dag.IWorkflowExecutionDAG; +import org.apache.dolphinscheduler.server.master.dag.IWorkflowExecutionRunnable; +import org.apache.dolphinscheduler.server.master.dag.TaskExecutionRunnable; +import org.apache.dolphinscheduler.server.master.dag.WorkflowExecuteRunnableRepository; + +import lombok.extern.slf4j.Slf4j; + +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Component; + +@Slf4j +@Component +public class TaskOperationEventOperator implements ITaskEventOperator<TaskOperationEvent> { + + @Autowired + private WorkflowExecuteRunnableRepository workflowExecuteRunnableRepository; + + @Override + public void handleEvent(TaskOperationEvent event) { + IWorkflowExecutionRunnable workflowExecutionRunnable = + workflowExecuteRunnableRepository.getWorkflowExecutionRunnableById(event.getWorkflowInstanceId()); + if (workflowExecutionRunnable == null) { + log.error("Cannot find the IWorkflowExecutionRunnable for event: {}", event); + return; + } + IWorkflowExecutionDAG workflowExecutionDAG = workflowExecutionRunnable.getDagEngine().getWorkflowExecutionDAG(); + TaskExecutionRunnable taskExecutionRunnable = + workflowExecutionDAG.getTaskExecutionRunnableById(event.getTaskInstanceId()); + if (taskExecutionRunnable == null) { + log.error("Cannot find the ITaskExecutionRunnable for event: {}", event); + } + switch (event.getTaskOperationType()) { + case DISPATCH: + taskExecutionRunnable.dispatch(); + break; + case KILL: + taskExecutionRunnable.kill(); + break; + case PAUSE: + taskExecutionRunnable.pause(); + break; + default: + log.error("Unknown TaskOperationType for event: {}", event); + } + } +} diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/events/TaskOperationType.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/events/TaskOperationType.java new file mode 100644 index 0000000000..0029342f62 --- /dev/null +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/events/TaskOperationType.java @@ -0,0 +1,8 @@ +package org.apache.dolphinscheduler.server.master.events; + +public enum TaskOperationType { + + DISPATCH, KILL, PAUSE, + ; + +} diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/events/TaskSuccessEvent.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/events/TaskSuccessEvent.java new file mode 100644 index 0000000000..950db9b468 --- /dev/null +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/events/TaskSuccessEvent.java @@ -0,0 +1,16 @@ +package org.apache.dolphinscheduler.server.master.events; + +import lombok.AllArgsConstructor; +import lombok.Data; +import lombok.NoArgsConstructor; + +@Data +@NoArgsConstructor +@AllArgsConstructor +public class TaskSuccessEvent implements ITaskEvent { + + private Integer workflowInstanceId; + + private Integer taskInstanceId; + +} diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/events/TaskSuccessEventOperator.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/events/TaskSuccessEventOperator.java new file mode 100644 index 0000000000..48cf50b3b8 --- /dev/null +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/events/TaskSuccessEventOperator.java @@ -0,0 +1,30 @@ +package org.apache.dolphinscheduler.server.master.events; + +import org.apache.dolphinscheduler.server.master.cache.IWorkflowExecuteRunnableRepository; +import org.apache.dolphinscheduler.server.master.dag.IWorkflowExecutionRunnable; + +import lombok.extern.slf4j.Slf4j; + +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Component; + +@Slf4j +@Component +public class TaskSuccessEventOperator implements ITaskEventOperator { + + @Autowired + private IWorkflowExecuteRunnableRepository workflowExecuteRunnableRepository; + + @Override + public void handleEvent(ITaskEvent event) { + Integer workflowInstanceId = event.getWorkflowInstanceId(); + Integer taskInstanceId = event.getTaskInstanceId(); + IWorkflowExecutionRunnable workflowExecutionRunnable = + workflowExecuteRunnableRepository.getByProcessInstanceId(workflowInstanceId); + if (workflowExecutionRunnable == null) { + log.error("Cannot find the WorkflowExecutionRunnable, the event: {} will be dropped", event); + return; + } + + } +} diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/events/WorkflowFailedEvent.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/events/WorkflowFailedEvent.java new file mode 100644 index 0000000000..e9de91a6cf --- /dev/null +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/events/WorkflowFailedEvent.java @@ -0,0 +1,18 @@ +package org.apache.dolphinscheduler.server.master.events; + +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.Data; +import lombok.NoArgsConstructor; + +@Data +@Builder +@NoArgsConstructor +@AllArgsConstructor +public class WorkflowFailedEvent implements IWorkflowEvent, ISyncEvent { + + private Integer workflowInstanceId; + + private String failedReason; + +} diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/events/WorkflowFailedEventOperator.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/events/WorkflowFailedEventOperator.java new file mode 100644 index 0000000000..9400209224 --- /dev/null +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/events/WorkflowFailedEventOperator.java @@ -0,0 +1,43 @@ +package org.apache.dolphinscheduler.server.master.events; + +import org.apache.dolphinscheduler.common.enums.WorkflowExecutionStatus; +import org.apache.dolphinscheduler.dao.entity.ProcessInstance; +import org.apache.dolphinscheduler.dao.repository.ProcessInstanceDao; +import org.apache.dolphinscheduler.server.master.dag.IWorkflowExecutionRunnable; +import org.apache.dolphinscheduler.server.master.dag.WorkflowExecuteRunnableRepository; + +import lombok.extern.slf4j.Slf4j; + +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Component; + +@Slf4j +@Component +public class WorkflowFailedEventOperator + implements + IWorkflowEventOperator<WorkflowFailedEvent> { + + @Autowired + private WorkflowExecuteRunnableRepository workflowExecuteRunnableRepository; + + @Autowired + private EventDispatcher eventDispatcher; + + @Autowired + private ProcessInstanceDao processInstanceDao; + + @Override + public void handleEvent(WorkflowFailedEvent event) { + Integer workflowInstanceId = event.getWorkflowInstanceId(); + IWorkflowExecutionRunnable workflowExecutionRunnable = + workflowExecuteRunnableRepository.getWorkflowExecutionRunnableById(workflowInstanceId); + + ProcessInstance workflowInstance = workflowExecutionRunnable.getWorkflowInstance(); + workflowInstance.setState(WorkflowExecutionStatus.FAILURE); + processInstanceDao.updateById(workflowInstance); + log.info("Handle WorkflowExecutionRunnableFailedEvent success, set workflowInstance status to {}", + workflowInstance.getState()); + + eventDispatcher.dispatchEvent(new WorkflowFinalizeEvent(workflowInstanceId)); + } +} diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/events/WorkflowFinalizeEvent.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/events/WorkflowFinalizeEvent.java new file mode 100644 index 0000000000..e4a4acebb3 --- /dev/null +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/events/WorkflowFinalizeEvent.java @@ -0,0 +1,15 @@ +package org.apache.dolphinscheduler.server.master.events; + +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.Data; +import lombok.NoArgsConstructor; + +@Data +@Builder +@AllArgsConstructor +@NoArgsConstructor +public class WorkflowFinalizeEvent implements IWorkflowEvent, ISyncEvent { + + private Integer workflowInstanceId; +} diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/events/WorkflowFinalizeEventOperator.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/events/WorkflowFinalizeEventOperator.java new file mode 100644 index 0000000000..4d4055c8db --- /dev/null +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/events/WorkflowFinalizeEventOperator.java @@ -0,0 +1,26 @@ +package org.apache.dolphinscheduler.server.master.events; + +import org.apache.dolphinscheduler.server.master.cache.IWorkflowExecuteRunnableRepository; +import org.apache.dolphinscheduler.server.master.dag.IWorkflowExecutionRunnable; + +import lombok.extern.slf4j.Slf4j; + +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Component; + +@Slf4j +@Component +public class WorkflowFinalizeEventOperator + implements + IWorkflowEventOperator<WorkflowFinalizeEvent> { + + @Autowired + private IWorkflowExecuteRunnableRepository<IWorkflowExecutionRunnable> workflowExecuteRunnableRepository; + + @Override + public void handleEvent(WorkflowFinalizeEvent event) { + Integer workflowInstanceId = event.getWorkflowInstanceId(); + workflowExecuteRunnableRepository.removeByProcessInstanceId(workflowInstanceId); + } + +} diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/events/WorkflowFinishEvent.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/events/WorkflowFinishEvent.java new file mode 100644 index 0000000000..db49459862 --- /dev/null +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/events/WorkflowFinishEvent.java @@ -0,0 +1,20 @@ +package org.apache.dolphinscheduler.server.master.events; + +import org.apache.dolphinscheduler.common.enums.WorkflowExecutionStatus; + +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.Data; +import lombok.NoArgsConstructor; + +@Data +@Builder +@NoArgsConstructor +@AllArgsConstructor +public class WorkflowFinishEvent implements IWorkflowEvent, ISyncEvent { + + private Integer workflowInstanceId; + + private WorkflowExecutionStatus workflowExecutionStatus; + +} diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/events/WorkflowOperationEvent.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/events/WorkflowOperationEvent.java new file mode 100644 index 0000000000..e0aee2d29b --- /dev/null +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/events/WorkflowOperationEvent.java @@ -0,0 +1,24 @@ +package org.apache.dolphinscheduler.server.master.events; + +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.Data; +import lombok.NoArgsConstructor; +import lombok.extern.slf4j.Slf4j; + +@Slf4j +@Data +@Builder +@NoArgsConstructor +@AllArgsConstructor +public class WorkflowOperationEvent implements IWorkflowEvent, ISyncEvent { + + private Integer workflowInstanceId; + private WorkflowOperationType workflowOperationType; + + public static WorkflowOperationEvent of(Integer workflowInstanceId, WorkflowOperationType workflowOperationType) { + return WorkflowOperationEvent.builder().workflowInstanceId(workflowInstanceId) + .workflowOperationType(workflowOperationType).build(); + } + +} diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/events/WorkflowOperationEventOperator.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/events/WorkflowOperationEventOperator.java new file mode 100644 index 0000000000..4231fb160c --- /dev/null +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/events/WorkflowOperationEventOperator.java @@ -0,0 +1,68 @@ +package org.apache.dolphinscheduler.server.master.events; + +import org.apache.dolphinscheduler.dao.entity.ProcessInstance; +import org.apache.dolphinscheduler.server.master.cache.IWorkflowExecuteRunnableRepository; +import org.apache.dolphinscheduler.server.master.dag.IWorkflowExecutionRunnable; +import org.apache.dolphinscheduler.server.master.utils.ExceptionUtils; + +import lombok.extern.slf4j.Slf4j; + +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Component; + +@Slf4j +@Component +public class WorkflowOperationEventOperator implements IWorkflowEventOperator<WorkflowOperationEvent> { + + @Autowired + private IWorkflowExecuteRunnableRepository workflowExecuteRunnableRepository; + + @Override + public void handleEvent(WorkflowOperationEvent event) { + IWorkflowExecutionRunnable workflowExecutionRunnable = + workflowExecuteRunnableRepository.getByProcessInstanceId(event.getWorkflowInstanceId()); + if (workflowExecutionRunnable == null) { + log.warn( + "Handle workflowExecutionRunnableKillOperationEvent: {} failed: WorkflowExecutionRunnable not found", + event); + return; + } + switch (event.getWorkflowOperationType()) { + case TRIGGER: + triggerWorkflow(workflowExecutionRunnable); + break; + case PAUSE: + pauseWorkflow(workflowExecutionRunnable); + break; + case KILL: + killWorkflow(workflowExecutionRunnable); + break; + default: + log.error("Unknown operationType for event: {}", event); + } + } + + private void triggerWorkflow(IWorkflowExecutionRunnable workflowExecutionRunnable) { + try { + workflowExecutionRunnable.start(); + } catch (Throwable exception) { + if (ExceptionUtils.isDatabaseConnectedFailedException(exception)) { + throw exception; + } + ProcessInstance workflowInstance = + workflowExecutionRunnable.getWorkflowExecutionContext().getWorkflowInstance(); + log.error("Trigger workflow: {} failed", workflowInstance.getName(), exception); + WorkflowFailedEvent workflowExecutionRunnableFailedEvent = WorkflowFailedEvent.builder() + .workflowInstanceId(workflowInstance.getId()).failedReason(exception.getMessage()).build(); + workflowExecutionRunnable.getEventRepository().storeEventToTail(workflowExecutionRunnableFailedEvent); + } + } + + private void pauseWorkflow(IWorkflowExecutionRunnable workflowExecutionRunnable) { + workflowExecutionRunnable.pause(); + } + + private void killWorkflow(IWorkflowExecutionRunnable workflowExecutionRunnable) { + workflowExecutionRunnable.kill(); + } +} diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/events/WorkflowOperationType.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/events/WorkflowOperationType.java new file mode 100644 index 0000000000..296c5c8dd6 --- /dev/null +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/events/WorkflowOperationType.java @@ -0,0 +1,10 @@ +package org.apache.dolphinscheduler.server.master.events; + +public enum WorkflowOperationType { + + TRIGGER, + PAUSE, + KILL, + ; + +} diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/events/WorkflowTimeoutEvent.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/events/WorkflowTimeoutEvent.java new file mode 100644 index 0000000000..ee3c6843a4 --- /dev/null +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/events/WorkflowTimeoutEvent.java @@ -0,0 +1,16 @@ +package org.apache.dolphinscheduler.server.master.events; + +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.Data; +import lombok.NoArgsConstructor; + +@Data +@Builder +@NoArgsConstructor +@AllArgsConstructor +public class WorkflowTimeoutEvent implements IWorkflowEvent, IAsyncEvent { + + private Integer workflowInstanceId; + +} diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/events/WorkflowTimeoutEventOperator.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/events/WorkflowTimeoutEventOperator.java new file mode 100644 index 0000000000..89014b8d9d --- /dev/null +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/events/WorkflowTimeoutEventOperator.java @@ -0,0 +1,43 @@ +package org.apache.dolphinscheduler.server.master.events; + +import org.apache.dolphinscheduler.dao.entity.ProcessInstance; +import org.apache.dolphinscheduler.dao.entity.ProjectUser; +import org.apache.dolphinscheduler.server.master.cache.IWorkflowExecuteRunnableRepository; +import org.apache.dolphinscheduler.server.master.dag.IWorkflowExecutionRunnable; +import org.apache.dolphinscheduler.service.alert.ProcessAlertManager; +import org.apache.dolphinscheduler.service.process.ProcessService; + +import lombok.extern.slf4j.Slf4j; + +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Component; + +@Slf4j +@Component +public class WorkflowTimeoutEventOperator + implements + IEventOperator<WorkflowTimeoutEvent> { + + @Autowired + private IWorkflowExecuteRunnableRepository<IWorkflowExecutionRunnable> workflowExecutionRunnableRepository; + + @Autowired + private ProcessService processService; + + @Autowired + private ProcessAlertManager processAlertManager; + + @Override + public void handleEvent(WorkflowTimeoutEvent event) { + IWorkflowExecutionRunnable workflowExecutionRunnable = + workflowExecutionRunnableRepository.getByProcessInstanceId(event.getWorkflowInstanceId()); + if (workflowExecutionRunnable == null) { + log.warn("Cannot find the workflow instance by id: {}", event.getWorkflowInstanceId()); + return; + } + // we only support timeout warning for now + ProcessInstance workflowInstance = workflowExecutionRunnable.getWorkflowInstance(); + ProjectUser projectUser = processService.queryProjectWithUserByProcessInstanceId(workflowInstance.getId()); + processAlertManager.sendProcessTimeoutAlert(workflowInstance, projectUser); + } +} diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/events/WorkflowTriggerNextTaskEvent.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/events/WorkflowTriggerNextTaskEvent.java new file mode 100644 index 0000000000..45fe8cab46 --- /dev/null +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/events/WorkflowTriggerNextTaskEvent.java @@ -0,0 +1,21 @@ +package org.apache.dolphinscheduler.server.master.events; + +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.Data; +import lombok.NoArgsConstructor; + +@Data +@Builder +@NoArgsConstructor +@AllArgsConstructor +public class WorkflowTriggerNextTaskEvent implements IWorkflowEvent, ISyncEvent { + + private int workflowInstanceId; + + /** + * The task name of the parent task, if it is the root task, the value is null + */ + private String parentTaskNodeName; + +} diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/events/WorkflowTriggerNextTaskEventOperator.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/events/WorkflowTriggerNextTaskEventOperator.java new file mode 100644 index 0000000000..50535c2b2c --- /dev/null +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/events/WorkflowTriggerNextTaskEventOperator.java @@ -0,0 +1,27 @@ +package org.apache.dolphinscheduler.server.master.events; + +import org.apache.dolphinscheduler.server.master.cache.IWorkflowExecuteRunnableRepository; +import org.apache.dolphinscheduler.server.master.dag.IWorkflowExecutionRunnable; + +import lombok.extern.slf4j.Slf4j; + +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Component; + +@Slf4j +@Component +public class WorkflowTriggerNextTaskEventOperator + implements + IWorkflowEventOperator<WorkflowTriggerNextTaskEvent> { + + @Autowired + private IWorkflowExecuteRunnableRepository<IWorkflowExecutionRunnable> workflowExecuteRunnableRepository; + + @Override + public void handleEvent(WorkflowTriggerNextTaskEvent event) { + IWorkflowExecutionRunnable workflowExecutionRunnable = + workflowExecuteRunnableRepository.getByProcessInstanceId(event.getWorkflowInstanceId()); + workflowExecutionRunnable.getDagEngine().triggerNextTasks(event.getParentTaskNodeName()); + } + +} diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/events/WorkflowTriggeredEvent.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/events/WorkflowTriggeredEvent.java new file mode 100644 index 0000000000..0c5283733d --- /dev/null +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/events/WorkflowTriggeredEvent.java @@ -0,0 +1,16 @@ +package org.apache.dolphinscheduler.server.master.events; + +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.Data; +import lombok.NoArgsConstructor; + +@Data +@Builder +@NoArgsConstructor +@AllArgsConstructor +public class WorkflowTriggeredEvent implements IWorkflowEvent, IAsyncEvent { + + private int workflowInstanceId; + +} diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/events/WorkflowTriggeredEventOperator.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/events/WorkflowTriggeredEventOperator.java new file mode 100644 index 0000000000..e61e1f90c3 --- /dev/null +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/events/WorkflowTriggeredEventOperator.java @@ -0,0 +1,44 @@ +package org.apache.dolphinscheduler.server.master.events; + +import org.apache.dolphinscheduler.dao.entity.ProcessInstance; +import org.apache.dolphinscheduler.dao.entity.ProjectUser; +import org.apache.dolphinscheduler.server.master.dag.IWorkflowExecutionRunnable; +import org.apache.dolphinscheduler.server.master.dag.WorkflowExecuteRunnableRepository; +import org.apache.dolphinscheduler.server.master.metrics.ProcessInstanceMetrics; +import org.apache.dolphinscheduler.service.alert.ListenerEventAlertManager; +import org.apache.dolphinscheduler.service.process.ProcessService; + +import lombok.extern.slf4j.Slf4j; + +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Component; + +@Slf4j +@Component +public class WorkflowTriggeredEventOperator + implements + IWorkflowEventOperator<WorkflowTriggeredEvent> { + + @Autowired + private WorkflowExecuteRunnableRepository workflowExecuteRunnableRepository; + + @Autowired + private ProcessService processService; + + @Autowired + private ListenerEventAlertManager listenerEventAlertManager; + + @Override + public void handleEvent(WorkflowTriggeredEvent event) { + int workflowInstanceId = event.getWorkflowInstanceId(); + IWorkflowExecutionRunnable workflowExecutionRunnable = + workflowExecuteRunnableRepository.getWorkflowExecutionRunnableById(workflowInstanceId); + Long workflowDefinitionCode = + workflowExecutionRunnable.getWorkflowExecutionContext().getWorkflowDefinition().getCode(); + ProcessInstanceMetrics.incProcessInstanceByStateAndProcessDefinitionCode("submit", "" + workflowDefinitionCode); + + ProcessInstance workflowInstance = workflowExecutionRunnable.getWorkflowInstance(); + ProjectUser projectUser = processService.queryProjectWithUserByProcessInstanceId(workflowInstanceId); + listenerEventAlertManager.publishProcessStartListenerEvent(workflowInstance, projectUser); + } +} diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/exception/TaskExecuteRunnableNotFoundException.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/exception/TaskExecuteRunnableNotFoundException.java new file mode 100644 index 0000000000..19a11b9639 --- /dev/null +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/exception/TaskExecuteRunnableNotFoundException.java @@ -0,0 +1,12 @@ +package org.apache.dolphinscheduler.server.master.exception; + +public class TaskExecuteRunnableNotFoundException extends RuntimeException { + + public TaskExecuteRunnableNotFoundException(Integer workflowInstanceId) { + super("WorkflowExecuteRunnable not found: [id=" + workflowInstanceId + "]"); + } + + public TaskExecuteRunnableNotFoundException(String workflowInstanceName) { + super("WorkflowExecuteRunnable not found: [name=" + workflowInstanceName + "]"); + } +} diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/exception/TaskExecutionContextCreateException.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/exception/TaskExecutionContextCreateException.java deleted file mode 100644 index ac37c94438..0000000000 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/exception/TaskExecutionContextCreateException.java +++ /dev/null @@ -1,26 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.dolphinscheduler.server.master.exception; - -public class TaskExecutionContextCreateException extends MasterException { - - public TaskExecutionContextCreateException(String message) { - super(message); - } - -} diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/exception/WorkflowExecuteRunnableNotFoundException.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/exception/WorkflowExecuteRunnableNotFoundException.java new file mode 100644 index 0000000000..3ded58f3c6 --- /dev/null +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/exception/WorkflowExecuteRunnableNotFoundException.java @@ -0,0 +1,13 @@ +package org.apache.dolphinscheduler.server.master.exception; + +public class WorkflowExecuteRunnableNotFoundException extends RuntimeException { + + public WorkflowExecuteRunnableNotFoundException(Integer workflowInstanceId) { + super("WorkflowExecuteRunnable not found: [id=" + workflowInstanceId + "]"); + } + + public WorkflowExecuteRunnableNotFoundException(String workflowInstanceName) { + super("WorkflowExecuteRunnable not found: [name=" + workflowInstanceName + "]"); + } + +} diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/DefaultTaskExecuteRunnable.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/DefaultTaskExecuteRunnable.java index c1b13717bd..a1e60e0a6e 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/DefaultTaskExecuteRunnable.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/DefaultTaskExecuteRunnable.java @@ -28,8 +28,7 @@ public class DefaultTaskExecuteRunnable extends PriorityDelayTaskExecuteRunnable private final TaskExecuteRunnableOperatorManager taskExecuteRunnableOperatorManager; - public DefaultTaskExecuteRunnable(ProcessInstance workflowInstance, - TaskInstance taskInstance, + public DefaultTaskExecuteRunnable(ProcessInstance workflowInstance, TaskInstance taskInstance, TaskExecutionContext taskExecutionContext, TaskExecuteRunnableOperatorManager taskExecuteRunnableOperatorManager) { super(workflowInstance, taskInstance, taskExecutionContext); diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/execute/DefaultTaskExecuteRunnableFactory.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/execute/DefaultTaskExecuteRunnableFactory.java index ab749b5861..6c1a4e8569 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/execute/DefaultTaskExecuteRunnableFactory.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/execute/DefaultTaskExecuteRunnableFactory.java @@ -19,8 +19,6 @@ package org.apache.dolphinscheduler.server.master.runner.execute; import org.apache.dolphinscheduler.dao.entity.TaskInstance; import org.apache.dolphinscheduler.server.master.cache.ProcessInstanceExecCacheManager; -import org.apache.dolphinscheduler.server.master.exception.TaskExecuteRunnableCreateException; -import org.apache.dolphinscheduler.server.master.exception.TaskExecutionContextCreateException; import org.apache.dolphinscheduler.server.master.runner.DefaultTaskExecuteRunnable; import org.apache.dolphinscheduler.server.master.runner.TaskExecuteRunnableFactory; import org.apache.dolphinscheduler.server.master.runner.TaskExecutionContextFactory; @@ -46,17 +44,13 @@ public class DefaultTaskExecuteRunnableFactory implements TaskExecuteRunnableFac private TaskExecuteRunnableOperatorManager taskExecuteRunnableOperatorManager; @Override - public DefaultTaskExecuteRunnable createTaskExecuteRunnable(TaskInstance taskInstance) throws TaskExecuteRunnableCreateException { + public DefaultTaskExecuteRunnable createTaskExecuteRunnable(TaskInstance taskInstance) { WorkflowExecuteRunnable workflowExecuteRunnable = processInstanceExecCacheManager.getByProcessInstanceId(taskInstance.getProcessInstanceId()); - try { - return new DefaultTaskExecuteRunnable( - workflowExecuteRunnable.getWorkflowExecuteContext().getWorkflowInstance(), - taskInstance, - taskExecutionContextFactory.createTaskExecutionContext(taskInstance), - taskExecuteRunnableOperatorManager); - } catch (TaskExecutionContextCreateException ex) { - throw new TaskExecuteRunnableCreateException("Create DefaultTaskExecuteRunnable failed", ex); - } + return new DefaultTaskExecuteRunnable( + workflowExecuteRunnable.getWorkflowExecuteContext().getWorkflowInstance(), + taskInstance, + taskExecutionContextFactory.createTaskExecutionContext(taskInstance), + taskExecuteRunnableOperatorManager); } }
