This is an automated email from the ASF dual-hosted git repository.

wenjun pushed a commit to branch dev_wenjun_refactorMaster
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git

commit 75ca4d4f4a7b5864ca53f0aeb8cbe36a7469d223
Author: Wenjun Ruan <[email protected]>
AuthorDate: Tue Mar 5 15:48:00 2024 +0800

    Refactor Master
---
 .../server/master/dag/BasicDAG.java                |  5 ++
 .../dolphinscheduler/server/master/dag/DAG.java    | 45 +++++++++++
 .../server/master/dag/DAGEngine.java               | 75 +++++++++++++++++
 .../server/master/dag/DAGEngineFactory.java        | 37 +++++++++
 .../server/master/dag/IDAGEngine.java              | 42 ++++++++++
 .../server/master/dag/IDAGEngineFactory.java       |  9 +++
 .../server/master/dag/IEventRepositoryFactory.java |  9 +++
 .../master/dag/IEventfulExecutionRunnable.java     |  8 ++
 .../server/master/dag/ITaskExecutionRunnable.java  | 13 +++
 .../master/dag/ITaskExecutionRunnableFactory.java  |  7 ++
 .../server/master/dag/IWorkflowDAG.java            | 25 ++++++
 .../server/master/dag/IWorkflowDAGFactory.java     |  7 ++
 .../server/master/dag/IWorkflowEngine.java         | 49 ++++++++++++
 .../server/master/dag/IWorkflowExecutionDAG.java   | 14 ++++
 .../master/dag/IWorkflowExecutionDAGFactory.java   |  6 ++
 .../master/dag/IWorkflowExecutionRunnable.java     | 68 ++++++++++++++++
 .../master/dag/MemoryEventRepositoryFactory.java   | 18 +++++
 .../server/master/dag/TaskExecutionContext.java    | 12 +++
 .../server/master/dag/TaskExecutionRunnable.java   | 28 +++++++
 .../dag/TaskExecutionRunnableRepository.java       | 23 ++++++
 .../TaskExecutionRunnableRepositoryFactory.java    | 15 ++++
 .../server/master/dag/TaskIdentify.java            |  5 ++
 .../master/dag/TaskTriggerConditionChecker.java    |  7 ++
 .../server/master/dag/WorkflowEngine.java          | 75 +++++++++++++++++
 .../dag/WorkflowExecuteRunnableRepository.java     | 33 ++++++++
 .../master/dag/WorkflowExecutionContext.java       | 21 +++++
 .../dag/WorkflowExecutionContextFactory.java       | 14 ++++
 .../server/master/dag/WorkflowExecutionDAG.java    | 36 +++++++++
 .../master/dag/WorkflowExecutionDAGFactory.java    | 29 +++++++
 .../master/dag/WorkflowExecutionRunnable.java      | 57 +++++++++++++
 .../dag/WorkflowExecutionRunnableFactory.java      | 44 ++++++++++
 .../server/master/dag/WorkflowIdentify.java        | 18 +++++
 .../server/master/events/EventDispatcher.java      | 53 ++++++++++++
 .../server/master/events/EventEngine.java          | 93 ++++++++++++++++++++++
 .../server/master/events/EventFirer.java           | 78 ++++++++++++++++++
 .../server/master/events/EventOperatorManager.java | 19 +++++
 .../server/master/events/IAsyncEvent.java          |  7 ++
 .../server/master/events/IEvent.java               |  5 ++
 .../server/master/events/IEventDispatcher.java     | 15 ++++
 .../server/master/events/IEventFirer.java          | 19 +++++
 .../server/master/events/IEventOperator.java       | 15 ++++
 .../master/events/IEventOperatorManager.java       | 16 ++++
 .../server/master/events/IEventRepository.java     | 16 ++++
 .../server/master/events/ISyncEvent.java           |  4 +
 .../server/master/events/ITaskEvent.java           |  9 +++
 .../server/master/events/ITaskEventOperator.java   |  5 ++
 .../server/master/events/IWorkflowEvent.java       | 12 +++
 .../master/events/IWorkflowEventOperator.java      |  6 ++
 .../master/events/MemoryEventRepository.java       | 36 +++++++++
 .../master/events/TaskLogSendToRemoteEvent.java    | 19 +++++
 .../events/TaskLogSendToRemoteEventOperator.java   | 21 +++++
 .../server/master/events/TaskOperationEvent.java   | 18 +++++
 .../master/events/TaskOperationEventOperator.java  | 48 +++++++++++
 .../server/master/events/TaskOperationType.java    |  8 ++
 .../server/master/events/TaskSuccessEvent.java     | 16 ++++
 .../master/events/TaskSuccessEventOperator.java    | 30 +++++++
 .../server/master/events/WorkflowFailedEvent.java  | 18 +++++
 .../master/events/WorkflowFailedEventOperator.java | 43 ++++++++++
 .../master/events/WorkflowFinalizeEvent.java       | 15 ++++
 .../events/WorkflowFinalizeEventOperator.java      | 26 ++++++
 .../server/master/events/WorkflowFinishEvent.java  | 20 +++++
 .../master/events/WorkflowOperationEvent.java      | 24 ++++++
 .../events/WorkflowOperationEventOperator.java     | 68 ++++++++++++++++
 .../master/events/WorkflowOperationType.java       | 10 +++
 .../server/master/events/WorkflowTimeoutEvent.java | 16 ++++
 .../events/WorkflowTimeoutEventOperator.java       | 43 ++++++++++
 .../events/WorkflowTriggerNextTaskEvent.java       | 21 +++++
 .../WorkflowTriggerNextTaskEventOperator.java      | 27 +++++++
 .../master/events/WorkflowTriggeredEvent.java      | 16 ++++
 .../events/WorkflowTriggeredEventOperator.java     | 44 ++++++++++
 .../TaskExecuteRunnableNotFoundException.java      | 12 +++
 .../TaskExecutionContextCreateException.java       | 26 ------
 .../WorkflowExecuteRunnableNotFoundException.java  | 13 +++
 .../master/runner/DefaultTaskExecuteRunnable.java  |  3 +-
 .../execute/DefaultTaskExecuteRunnableFactory.java | 18 ++---
 75 files changed, 1845 insertions(+), 40 deletions(-)

diff --git 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dag/BasicDAG.java
 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dag/BasicDAG.java
new file mode 100644
index 0000000000..40bffe0b23
--- /dev/null
+++ 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dag/BasicDAG.java
@@ -0,0 +1,5 @@
+package org.apache.dolphinscheduler.server.master.dag;
+
+public abstract class BasicDAG<E> implements DAG<E> {
+
+}
diff --git 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dag/DAG.java
 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dag/DAG.java
new file mode 100644
index 0000000000..145744e12b
--- /dev/null
+++ 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dag/DAG.java
@@ -0,0 +1,45 @@
+package org.apache.dolphinscheduler.server.master.dag;
+
+import java.util.List;
+
+import lombok.AllArgsConstructor;
+import lombok.Data;
+
+/**
+ * The Directed Acyclic Graph class
+ *
+ * @param <E> type of the node content.
+ */
+public interface DAG<E> {
+
+    List<DAGNode<E>> getAllPostNodes(DAGNode<E> dagNode);
+
+    List<DAGNode<E>> getAllParentNodes(DAGNode<E> dagNode);
+
+    DAGNode<E> getDAGNode(String node);
+
+    /**
+     * The node of the DAG.
+     *
+     * @param <E> content type of the node.
+     */
+    @Data
+    @AllArgsConstructor
+    class DAGNode<E> {
+
+        private String nodeName;
+        private E nodeContent;
+    }
+
+    /**
+     * The edge of the DAG.
+     */
+    @Data
+    @AllArgsConstructor
+    class DAGEdge {
+
+        private String nodeName;
+        private String nextNodeName;
+    }
+
+}
diff --git 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dag/DAGEngine.java
 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dag/DAGEngine.java
new file mode 100644
index 0000000000..d60d249e9e
--- /dev/null
+++ 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dag/DAGEngine.java
@@ -0,0 +1,75 @@
+package org.apache.dolphinscheduler.server.master.dag;
+
+import org.apache.dolphinscheduler.dao.entity.TaskInstance;
+import org.apache.dolphinscheduler.server.master.events.IEventRepository;
+import org.apache.dolphinscheduler.server.master.events.TaskOperationEvent;
+import org.apache.dolphinscheduler.server.master.events.TaskOperationType;
+
+import java.util.List;
+
+import lombok.extern.slf4j.Slf4j;
+
+@Slf4j
+@Builder
+@AllArgsConstructor
+public class DAGEngine implements IDAGEngine {
+
+    @Getter
+    private final IWorkflowExecutionDAG workflowExecutionDAG;
+
+    private final List<TaskTriggerConditionChecker> 
taskTriggerConditionCheckers;
+
+    private final ITaskExecutionRunnableFactory taskExecutionRunnableFactory;
+
+    private final IEventRepository eventRepository;
+
+    @Override
+    public void triggerNextTasks(String parentTaskNodeName) {
+        
workflowExecutionDAG.getWorkflowDAG().getPostNodeNames(parentTaskNodeName).forEach(this::triggerTask);
+    }
+
+    @Override
+    @SneakyThrows
+    public void triggerTask(String taskName) {
+        for (TaskTriggerConditionChecker taskTriggerConditionChecker : 
taskTriggerConditionCheckers) {
+            if (!taskTriggerConditionChecker.taskCanTrigger(taskName)) {
+                return;
+            }
+        }
+        // todo: create Task ExecutionRunnable
+        TaskExecutionRunnable taskExecuteRunnable = 
workflowExecutionDAG.createTaskExecutionRunnable(taskName);
+        TaskInstance taskInstance = 
taskExecuteRunnable.getTaskExecutionContext().getTaskInstance();
+        TaskOperationEvent taskOperationEvent = 
TaskOperationEvent.builder().workflowInstanceId(taskInstance.getId())
+                
.taskInstanceId(taskInstance.getId()).taskOperationType(TaskOperationType.DISPATCH).build();
+        eventRepository.storeEventToTail(taskOperationEvent);
+    }
+
+    @Override
+    public void pauseTask(Integer taskInstanceId) {
+        TaskExecutionRunnable taskExecutionRunnable = 
workflowExecutionDAG.getTaskExecutionRunnableById(taskInstanceId);
+        if (taskExecutionRunnable == null) {
+            log.error("Cannot find the ITaskExecutionRunnable for: {}", 
taskInstanceId);
+            return;
+        }
+        TaskInstance taskInstance = 
taskExecutionRunnable.getTaskExecutionContext().getTaskInstance();
+        TaskOperationEvent taskOperationEvent =
+                
TaskOperationEvent.builder().workflowInstanceId(taskInstance.getProcessInstanceId())
+                        
.taskInstanceId(taskInstance.getId()).taskOperationType(TaskOperationType.PAUSE).build();
+        eventRepository.storeEventToTail(taskOperationEvent);
+    }
+
+    @Override
+    public void killTask(Integer taskInstanceId) {
+        TaskExecutionRunnable taskExecutionRunnable = 
workflowExecutionDAG.getTaskExecutionRunnableById(taskInstanceId);
+        if (taskExecutionRunnable == null) {
+            log.error("Cannot find the ITaskExecutionRunnable for: {}", 
taskInstanceId);
+            return;
+        }
+
+        TaskInstance taskInstance = 
taskExecutionRunnable.getTaskExecutionContext().getTaskInstance();
+        TaskOperationEvent taskOperationEvent = 
TaskOperationEvent.builder().workflowInstanceId(taskInstance.getId())
+                
.taskInstanceId(taskInstance.getId()).taskOperationType(TaskOperationType.KILL).build();
+        eventRepository.storeEventToTail(taskOperationEvent);
+    }
+
+}
diff --git 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dag/DAGEngineFactory.java
 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dag/DAGEngineFactory.java
new file mode 100644
index 0000000000..262cb9b1fb
--- /dev/null
+++ 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dag/DAGEngineFactory.java
@@ -0,0 +1,37 @@
+package org.apache.dolphinscheduler.server.master.dag;
+
+import org.apache.dolphinscheduler.server.master.events.IEventRepository;
+
+import java.util.List;
+
+import lombok.extern.slf4j.Slf4j;
+
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Component;
+
+@Slf4j
+@Component
+public class DAGEngineFactory implements IDAGEngineFactory {
+
+    @Autowired
+    private IWorkflowExecutionDAGFactory workflowExecutionDAGFactory;
+
+    @Autowired
+    private List<TaskTriggerConditionChecker> taskTriggerConditionCheckers;
+
+    @Autowired
+    private ITaskExecutionRunnableFactory taskExecutionRunnableFactory;
+
+    @Override
+    public IDAGEngine createDAGEngine(WorkflowExecutionContext 
workflowExecutionContext,
+                                      IEventRepository eventRepository) {
+        IWorkflowExecutionDAG workflowExecutionDAG =
+                
workflowExecutionDAGFactory.createWorkflowExecutionDAG(workflowExecutionContext);
+        return DAGEngine.builder()
+                .workflowExecutionDAG(workflowExecutionDAG)
+                .taskExecutionRunnableFactory(taskExecutionRunnableFactory)
+                .taskTriggerConditionCheckers(taskTriggerConditionCheckers)
+                .eventRepository(eventRepository)
+                .build();
+    }
+}
diff --git 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dag/IDAGEngine.java
 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dag/IDAGEngine.java
new file mode 100644
index 0000000000..5e3b651872
--- /dev/null
+++ 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dag/IDAGEngine.java
@@ -0,0 +1,42 @@
+package org.apache.dolphinscheduler.server.master.dag;
+
+/**
+ * The IDAGEngine is responsible for triggering, killing, pausing, and 
finalizing task in {@link IWorkflowExecutionDAG}.
+ * <p>All DAG operation should directly use the method in IDAGEngine, new 
{@link IWorkflowExecutionDAG} should be triggered by new IDAGEngine.
+ */
+public interface IDAGEngine {
+
+    /**
+     * Trigger the tasks which are post of the given task.
+     * <P> If there are no task after the given taskNode, will try to finish 
the WorkflowExecutionRunnable.
+     * <p> If the
+     *
+     * @param parentTaskNodeName the parent task name
+     */
+    void triggerNextTasks(String parentTaskNodeName);
+
+    /**
+     * Trigger the given task
+     *
+     * @param taskName task name
+     */
+    void triggerTask(String taskName);
+
+    /**
+     * Pause the given task.
+     */
+    void pauseTask(Integer taskId);
+
+    /**
+     * Kill the given task.
+     */
+    void killTask(Integer taskId);
+
+    /**
+     * Get {@link IWorkflowExecutionDAG} belong to the Engine.
+     *
+     * @return workflow execution DAG.
+     */
+    IWorkflowExecutionDAG getWorkflowExecutionDAG();
+
+}
diff --git 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dag/IDAGEngineFactory.java
 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dag/IDAGEngineFactory.java
new file mode 100644
index 0000000000..9401e3bd2c
--- /dev/null
+++ 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dag/IDAGEngineFactory.java
@@ -0,0 +1,9 @@
+package org.apache.dolphinscheduler.server.master.dag;
+
+import org.apache.dolphinscheduler.server.master.events.IEventRepository;
+
+public interface IDAGEngineFactory {
+
+    IDAGEngine createDAGEngine(WorkflowExecutionContext 
workflowExecutionContext, IEventRepository eventRepository);
+
+}
diff --git 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dag/IEventRepositoryFactory.java
 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dag/IEventRepositoryFactory.java
new file mode 100644
index 0000000000..4dc260c580
--- /dev/null
+++ 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dag/IEventRepositoryFactory.java
@@ -0,0 +1,9 @@
+package org.apache.dolphinscheduler.server.master.dag;
+
+import org.apache.dolphinscheduler.server.master.events.IEventRepository;
+
+public interface IEventRepositoryFactory {
+
+    IEventRepository createEventRepository();
+
+}
diff --git 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dag/IEventfulExecutionRunnable.java
 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dag/IEventfulExecutionRunnable.java
new file mode 100644
index 0000000000..b5209e3e64
--- /dev/null
+++ 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dag/IEventfulExecutionRunnable.java
@@ -0,0 +1,8 @@
+package org.apache.dolphinscheduler.server.master.dag;
+
+import org.apache.dolphinscheduler.server.master.events.IEventRepository;
+
+public interface IEventfulExecutionRunnable {
+
+    IEventRepository getEventRepository();
+}
diff --git 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dag/ITaskExecutionRunnable.java
 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dag/ITaskExecutionRunnable.java
new file mode 100644
index 0000000000..c1c54f1d12
--- /dev/null
+++ 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dag/ITaskExecutionRunnable.java
@@ -0,0 +1,13 @@
+package org.apache.dolphinscheduler.server.master.dag;
+
+public interface ITaskExecutionRunnable {
+
+    void dispatch();
+
+    void kill();
+
+    void pause();
+
+    TaskExecutionContext getTaskExecutionContext();
+
+}
diff --git 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dag/ITaskExecutionRunnableFactory.java
 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dag/ITaskExecutionRunnableFactory.java
new file mode 100644
index 0000000000..5115825672
--- /dev/null
+++ 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dag/ITaskExecutionRunnableFactory.java
@@ -0,0 +1,7 @@
+package org.apache.dolphinscheduler.server.master.dag;
+
+public interface ITaskExecutionRunnableFactory {
+
+    TaskExecutionRunnable createTaskExecutionRunnable(TaskExecutionContext 
taskExecutionContext);
+
+}
diff --git 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dag/IWorkflowDAG.java
 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dag/IWorkflowDAG.java
new file mode 100644
index 0000000000..20d083cf22
--- /dev/null
+++ 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dag/IWorkflowDAG.java
@@ -0,0 +1,25 @@
+package org.apache.dolphinscheduler.server.master.dag;
+
+import org.apache.dolphinscheduler.dao.entity.TaskDefinition;
+
+import java.util.List;
+
+public interface IWorkflowDAG extends DAG<TaskDefinition> {
+
+    /**
+     * Return the post task name of given parentTaskName.
+     *
+     * @param parentTaskName parent task name, can be null.
+     * @return post task name list, sort by priority.
+     */
+    List<String> getPostNodeNames(String parentTaskName);
+
+    /**
+     * Get the pre task name of given taskName
+     *
+     * @param taskName task name can be null.
+     * @return parent task name list.
+     */
+    List<String> getParentNodeNames(String taskName);
+
+}
diff --git 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dag/IWorkflowDAGFactory.java
 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dag/IWorkflowDAGFactory.java
new file mode 100644
index 0000000000..266d979110
--- /dev/null
+++ 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dag/IWorkflowDAGFactory.java
@@ -0,0 +1,7 @@
+package org.apache.dolphinscheduler.server.master.dag;
+
+public interface IWorkflowDAGFactory {
+
+    IWorkflowDAG buildWorkflowDAG(WorkflowIdentify workflowIdentify);
+
+}
diff --git 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dag/IWorkflowEngine.java
 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dag/IWorkflowEngine.java
new file mode 100644
index 0000000000..b7c430cfe2
--- /dev/null
+++ 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dag/IWorkflowEngine.java
@@ -0,0 +1,49 @@
+package org.apache.dolphinscheduler.server.master.dag;
+
+import 
org.apache.dolphinscheduler.server.master.exception.WorkflowExecuteRunnableNotFoundException;
+
+/**
+ * The WorkflowEngine is responsible for starting, stopping, pausing, and 
finalizing workflows.
+ */
+public interface IWorkflowEngine {
+
+    /**
+     * Start the workflow engine.
+     */
+    void start();
+
+    /**
+     * Trigger a workflow to start.
+     *
+     * @param workflowExecuteRunnable the workflow to start
+     */
+    void triggerWorkflow(IWorkflowExecutionRunnable workflowExecuteRunnable);
+
+    /**
+     * Pause a workflow instance.
+     *
+     * @param workflowInstanceId the ID of the workflow to pause
+     * @throws WorkflowExecuteRunnableNotFoundException if the workflow is not 
found
+     */
+    void pauseWorkflow(Integer workflowInstanceId);
+
+    /**
+     * Kill a workflow instance.
+     *
+     * @param workflowInstanceId the ID of the workflow to stop
+     * @throws WorkflowExecuteRunnableNotFoundException if the workflow is not 
found
+     */
+    void killWorkflow(Integer workflowInstanceId);
+
+    /**
+     * Finalize a workflow instance. Once a workflow has been finalized, then 
it cannot receive new operation, and will be removed from memory.
+     *
+     * @param workflowInstanceId the ID of the workflow to finalize
+     */
+    void finalizeWorkflow(Integer workflowInstanceId);
+
+    /**
+     * Stop the workflow engine.
+     */
+    void stop();
+}
diff --git 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dag/IWorkflowExecutionDAG.java
 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dag/IWorkflowExecutionDAG.java
new file mode 100644
index 0000000000..eac23880ad
--- /dev/null
+++ 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dag/IWorkflowExecutionDAG.java
@@ -0,0 +1,14 @@
+package org.apache.dolphinscheduler.server.master.dag;
+
+import java.util.List;
+
+public interface IWorkflowExecutionDAG {
+
+    IWorkflowDAG getWorkflowDAG();
+
+    TaskExecutionRunnable getTaskExecutionRunnableById(Integer taskInstanceId);
+
+    List<TaskExecutionRunnable> getActiveTaskExecutionRunnables();
+
+    TaskExecutionRunnable createTaskExecutionRunnable(String taskName);
+}
diff --git 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dag/IWorkflowExecutionDAGFactory.java
 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dag/IWorkflowExecutionDAGFactory.java
new file mode 100644
index 0000000000..19a2d45cae
--- /dev/null
+++ 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dag/IWorkflowExecutionDAGFactory.java
@@ -0,0 +1,6 @@
+package org.apache.dolphinscheduler.server.master.dag;
+
+public interface IWorkflowExecutionDAGFactory {
+
+    IWorkflowExecutionDAG createWorkflowExecutionDAG(WorkflowExecutionContext 
workflowExecutionContext);
+}
diff --git 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dag/IWorkflowExecutionRunnable.java
 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dag/IWorkflowExecutionRunnable.java
new file mode 100644
index 0000000000..bff326c12c
--- /dev/null
+++ 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dag/IWorkflowExecutionRunnable.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.server.master.dag;
+
+import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
+
+/**
+ * The IWorkflowExecuteRunnable represent a running workflow instance, it is 
responsible for operate the workflow instance. e.g. start, kill, pause, timeout.
+ */
+public interface IWorkflowExecutionRunnable extends IEventfulExecutionRunnable 
{
+
+    /**
+     * Start the workflow instance.
+     */
+    void start();
+
+    /**
+     * Kill the workflow instance.
+     */
+    void kill();
+
+    /**
+     * Pause the workflow instance.
+     */
+    void pause();
+
+    /**
+     * Get the workflow execution context.
+     *
+     * @return the workflow execution context
+     */
+    WorkflowExecutionContext getWorkflowExecutionContext();
+
+    /**
+     * Get the {@link IDAGEngine} which used to execute the dag of the 
workflow instance.
+     *
+     * @return dag engine.
+     */
+    IDAGEngine getDagEngine();
+
+    default Integer getWorkflowInstanceId() {
+        return getWorkflowExecutionContext().getWorkflowInstance().getId();
+    }
+
+    default String getWorkflowInstanceName() {
+        return getWorkflowExecutionContext().getWorkflowInstance().getName();
+    }
+
+    default ProcessInstance getWorkflowInstance() {
+        return getWorkflowExecutionContext().getWorkflowInstance();
+    }
+
+}
diff --git 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dag/MemoryEventRepositoryFactory.java
 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dag/MemoryEventRepositoryFactory.java
new file mode 100644
index 0000000000..01dea77785
--- /dev/null
+++ 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dag/MemoryEventRepositoryFactory.java
@@ -0,0 +1,18 @@
+package org.apache.dolphinscheduler.server.master.dag;
+
+import org.apache.dolphinscheduler.server.master.events.IEventRepository;
+import org.apache.dolphinscheduler.server.master.events.MemoryEventRepository;
+
+import lombok.extern.slf4j.Slf4j;
+
+import org.springframework.stereotype.Component;
+
+@Slf4j
+@Component
+public class MemoryEventRepositoryFactory implements IEventRepositoryFactory {
+
+    @Override
+    public IEventRepository createEventRepository() {
+        return new MemoryEventRepository();
+    }
+}
diff --git 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dag/TaskExecutionContext.java
 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dag/TaskExecutionContext.java
new file mode 100644
index 0000000000..b8b46a3078
--- /dev/null
+++ 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dag/TaskExecutionContext.java
@@ -0,0 +1,12 @@
+package org.apache.dolphinscheduler.server.master.dag;
+
+import org.apache.dolphinscheduler.dao.entity.TaskInstance;
+
+import lombok.Data;
+
+@Data
+public class TaskExecutionContext {
+
+    private TaskInstance taskInstance;
+
+}
diff --git 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dag/TaskExecutionRunnable.java
 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dag/TaskExecutionRunnable.java
new file mode 100644
index 0000000000..2aeb5cfe2d
--- /dev/null
+++ 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dag/TaskExecutionRunnable.java
@@ -0,0 +1,28 @@
+package org.apache.dolphinscheduler.server.master.dag;
+
+import lombok.Builder;
+import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
+
+@Slf4j
+@Getter
+@Builder
+public class TaskExecutionRunnable implements ITaskExecutionRunnable {
+
+    private TaskExecutionContext taskExecutionContext;
+
+    @Override
+    public void dispatch() {
+
+    }
+
+    @Override
+    public void kill() {
+
+    }
+
+    @Override
+    public void pause() {
+
+    }
+}
diff --git 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dag/TaskExecutionRunnableRepository.java
 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dag/TaskExecutionRunnableRepository.java
new file mode 100644
index 0000000000..e1bba388da
--- /dev/null
+++ 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dag/TaskExecutionRunnableRepository.java
@@ -0,0 +1,23 @@
+package org.apache.dolphinscheduler.server.master.dag;
+
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.stream.Collectors;
+
+public class TaskExecutionRunnableRepository {
+
+    private final Map<Integer, TaskExecutionRunnable> taskExecuteRunnableMap = 
new ConcurrentHashMap<>();
+
+    public TaskExecutionRunnable getTaskExecutionRunnableById(Integer 
taskInstanceId) {
+        return taskExecuteRunnableMap.get(taskInstanceId);
+    }
+
+    public List<TaskExecutionRunnable> getActiveTaskExecutionRunnable() {
+        return 
taskExecuteRunnableMap.values().stream().filter(taskExecuteRunnable -> {
+            // filter the status is not finished
+            return true;
+        }).collect(Collectors.toList());
+    }
+
+}
diff --git 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dag/TaskExecutionRunnableRepositoryFactory.java
 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dag/TaskExecutionRunnableRepositoryFactory.java
new file mode 100644
index 0000000000..fbcd3f2901
--- /dev/null
+++ 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dag/TaskExecutionRunnableRepositoryFactory.java
@@ -0,0 +1,15 @@
+package org.apache.dolphinscheduler.server.master.dag;
+
+import lombok.extern.slf4j.Slf4j;
+
+import org.springframework.stereotype.Component;
+
+@Slf4j
+@Component
+public class TaskExecutionRunnableRepositoryFactory {
+
+    public TaskExecutionRunnableRepository 
createTaskExecutionRunnableRepository() {
+        return new TaskExecutionRunnableRepository();
+    }
+
+}
diff --git 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dag/TaskIdentify.java
 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dag/TaskIdentify.java
new file mode 100644
index 0000000000..e3e7420ab4
--- /dev/null
+++ 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dag/TaskIdentify.java
@@ -0,0 +1,5 @@
+package org.apache.dolphinscheduler.server.master.dag;
+
+public class TaskIdentify {
+
+}
diff --git 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dag/TaskTriggerConditionChecker.java
 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dag/TaskTriggerConditionChecker.java
new file mode 100644
index 0000000000..daf652e0b8
--- /dev/null
+++ 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dag/TaskTriggerConditionChecker.java
@@ -0,0 +1,7 @@
+package org.apache.dolphinscheduler.server.master.dag;
+
+public interface TaskTriggerConditionChecker {
+
+    boolean taskCanTrigger(String taskName);
+
+}
diff --git 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dag/WorkflowEngine.java
 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dag/WorkflowEngine.java
new file mode 100644
index 0000000000..eb8bc0cfa1
--- /dev/null
+++ 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dag/WorkflowEngine.java
@@ -0,0 +1,75 @@
+package org.apache.dolphinscheduler.server.master.dag;
+
+import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
+import org.apache.dolphinscheduler.server.master.events.WorkflowOperationEvent;
+import org.apache.dolphinscheduler.server.master.events.WorkflowOperationType;
+import 
org.apache.dolphinscheduler.server.master.exception.WorkflowExecuteRunnableNotFoundException;
+
+import lombok.extern.slf4j.Slf4j;
+
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Component;
+
+@Slf4j
+@Component
+public class WorkflowEngine implements IWorkflowEngine {
+
+    @Autowired
+    private WorkflowExecuteRunnableRepository 
workflowExecuteRunnableRepository;
+
+    @Override
+    public void start() {
+        log.info("{} started", getClass().getName());
+    }
+
+    @Override
+    public void triggerWorkflow(IWorkflowExecutionRunnable 
workflowExecuteRunnable) {
+        ProcessInstance workflowInstance = 
workflowExecuteRunnable.getWorkflowExecutionContext().getWorkflowInstance();
+        Integer workflowInstanceId = workflowInstance.getId();
+        log.info("Triggering WorkflowExecutionRunnable: {}", 
workflowInstance.getName());
+        
workflowExecuteRunnableRepository.storeWorkflowExecutionRunnable(workflowExecuteRunnable);
+        workflowExecuteRunnable.getEventRepository()
+                
.storeEventToTail(WorkflowOperationEvent.of(workflowInstanceId, 
WorkflowOperationType.TRIGGER));
+    }
+
+    @Override
+    public void pauseWorkflow(Integer workflowInstanceId) {
+        IWorkflowExecutionRunnable workflowExecuteRunnable =
+                
workflowExecuteRunnableRepository.getWorkflowExecutionRunnableById(workflowInstanceId);
+        if (workflowExecuteRunnable == null) {
+            throw new 
WorkflowExecuteRunnableNotFoundException(workflowInstanceId);
+        }
+        log.info("Pausing WorkflowExecutionRunnable: {}", 
workflowExecuteRunnable.getWorkflowInstanceName());
+        workflowExecuteRunnable.getEventRepository()
+                
.storeEventToTail(WorkflowOperationEvent.of(workflowInstanceId, 
WorkflowOperationType.PAUSE));
+    }
+
+    @Override
+    public void killWorkflow(Integer workflowInstanceId) {
+        IWorkflowExecutionRunnable workflowExecuteRunnable =
+                
workflowExecuteRunnableRepository.getWorkflowExecutionRunnableById(workflowInstanceId);
+        if (workflowExecuteRunnable == null) {
+            throw new 
WorkflowExecuteRunnableNotFoundException(workflowInstanceId);
+        }
+        log.info("Killing WorkflowExecutionRunnable: {}", 
workflowExecuteRunnable.getWorkflowInstanceName());
+        workflowExecuteRunnable.getEventRepository()
+                
.storeEventToTail(WorkflowOperationEvent.of(workflowInstanceId, 
WorkflowOperationType.KILL));
+    }
+
+    @Override
+    public void finalizeWorkflow(Integer workflowInstanceId) {
+        IWorkflowExecutionRunnable workflowExecutionRunnable =
+                
workflowExecuteRunnableRepository.getWorkflowExecutionRunnableById(workflowInstanceId);
+        if (workflowExecutionRunnable == null) {
+            return;
+        }
+        log.info("Finalizing WorkflowExecutionRunnable: {}", 
workflowExecutionRunnable.getWorkflowInstanceName());
+        
workflowExecuteRunnableRepository.removeWorkflowExecutionRunnable(workflowInstanceId);
+    }
+
+    @Override
+    public void stop() {
+        log.info("{} stopped", getClass().getName());
+    }
+
+}
diff --git 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dag/WorkflowExecuteRunnableRepository.java
 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dag/WorkflowExecuteRunnableRepository.java
new file mode 100644
index 0000000000..fd3a05ce40
--- /dev/null
+++ 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dag/WorkflowExecuteRunnableRepository.java
@@ -0,0 +1,33 @@
+package org.apache.dolphinscheduler.server.master.dag;
+
+import java.util.Collection;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+import lombok.extern.slf4j.Slf4j;
+
+import org.springframework.stereotype.Component;
+
+@Slf4j
+@Component
+public class WorkflowExecuteRunnableRepository {
+
+    private final Map<Integer, IWorkflowExecutionRunnable> 
workflowExecutionRunnableMap = new ConcurrentHashMap<>();
+
+    public void storeWorkflowExecutionRunnable(IWorkflowExecutionRunnable 
workflowExecutionRunnable) {
+        
workflowExecutionRunnableMap.put(workflowExecutionRunnable.getWorkflowInstanceId(),
 workflowExecutionRunnable);
+    }
+
+    public IWorkflowExecutionRunnable getWorkflowExecutionRunnableById(Integer 
workflowInstanceId) {
+        return workflowExecutionRunnableMap.get(workflowInstanceId);
+    }
+
+    public Collection<IWorkflowExecutionRunnable> 
getActiveWorkflowExecutionRunnable() {
+        return workflowExecutionRunnableMap.values();
+    }
+
+    public void removeWorkflowExecutionRunnable(Integer workflowInstanceId) {
+        workflowExecutionRunnableMap.remove(workflowInstanceId);
+    }
+
+}
diff --git 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dag/WorkflowExecutionContext.java
 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dag/WorkflowExecutionContext.java
new file mode 100644
index 0000000000..7df8cf9ad1
--- /dev/null
+++ 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dag/WorkflowExecutionContext.java
@@ -0,0 +1,21 @@
+package org.apache.dolphinscheduler.server.master.dag;
+
+import org.apache.dolphinscheduler.dao.entity.ProcessDefinition;
+import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
+
+import lombok.AllArgsConstructor;
+import lombok.Builder;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
+@Data
+@Builder
+@NoArgsConstructor
+@AllArgsConstructor
+public class WorkflowExecutionContext {
+
+    private ProcessDefinition workflowDefinition;
+
+    private ProcessInstance workflowInstance;
+
+}
diff --git 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dag/WorkflowExecutionContextFactory.java
 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dag/WorkflowExecutionContextFactory.java
new file mode 100644
index 0000000000..a38f0e9853
--- /dev/null
+++ 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dag/WorkflowExecutionContextFactory.java
@@ -0,0 +1,14 @@
+package org.apache.dolphinscheduler.server.master.dag;
+
+import org.apache.dolphinscheduler.dao.entity.Command;
+
+import org.springframework.stereotype.Component;
+
+@Component
+public class WorkflowExecutionContextFactory {
+
+    public WorkflowExecutionContext createWorkflowExecutionContext(Command 
command) {
+        return null;
+    }
+
+}
diff --git 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dag/WorkflowExecutionDAG.java
 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dag/WorkflowExecutionDAG.java
new file mode 100644
index 0000000000..d990f5f7c6
--- /dev/null
+++ 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dag/WorkflowExecutionDAG.java
@@ -0,0 +1,36 @@
+package org.apache.dolphinscheduler.server.master.dag;
+
+import java.util.List;
+
+import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
+
+@Slf4j
+public class WorkflowExecutionDAG implements IWorkflowExecutionDAG {
+
+    @Getter
+    private final IWorkflowDAG workflowDAG;
+
+    private final TaskExecutionRunnableRepository 
taskExecutionRunnableRepository;
+
+    public WorkflowExecutionDAG(IWorkflowDAG workflowDAG,
+                                TaskExecutionRunnableRepository 
taskExecutionRunnableRepository) {
+        this.workflowDAG = workflowDAG;
+        this.taskExecutionRunnableRepository = taskExecutionRunnableRepository;
+    }
+
+    @Override
+    public TaskExecutionRunnable getTaskExecutionRunnableById(Integer 
taskInstanceId) {
+        return 
taskExecutionRunnableRepository.getTaskExecutionRunnableById(taskInstanceId);
+    }
+
+    @Override
+    public List<TaskExecutionRunnable> getActiveTaskExecutionRunnables() {
+        return 
taskExecutionRunnableRepository.getActiveTaskExecutionRunnable();
+    }
+
+    @Override
+    public TaskExecutionRunnable createTaskExecutionRunnable(String taskName) {
+        return null;
+    }
+}
diff --git 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dag/WorkflowExecutionDAGFactory.java
 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dag/WorkflowExecutionDAGFactory.java
new file mode 100644
index 0000000000..8e1ff804c3
--- /dev/null
+++ 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dag/WorkflowExecutionDAGFactory.java
@@ -0,0 +1,29 @@
+package org.apache.dolphinscheduler.server.master.dag;
+
+import lombok.extern.slf4j.Slf4j;
+
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Component;
+
+@Slf4j
+@Component
+public class WorkflowExecutionDAGFactory implements 
IWorkflowExecutionDAGFactory {
+
+    @Autowired
+    private IWorkflowDAGFactory workflowDAGFactory;
+
+    @Autowired
+    private TaskExecutionRunnableRepositoryFactory 
taskExecutionRunnableRepositoryFactory;
+
+    @Override
+    public IWorkflowExecutionDAG 
createWorkflowExecutionDAG(WorkflowExecutionContext workflowExecutionContext) {
+        long workflowDefinitionCode = 
workflowExecutionContext.getWorkflowDefinition().getCode();
+        int workflowDefinitionVersion = 
workflowExecutionContext.getWorkflowDefinition().getVersion();
+
+        IWorkflowDAG workflowDAG = workflowDAGFactory
+                .buildWorkflowDAG(new WorkflowIdentify(workflowDefinitionCode, 
workflowDefinitionVersion));
+        TaskExecutionRunnableRepository taskExecutionRunnableRepository =
+                
taskExecutionRunnableRepositoryFactory.createTaskExecutionRunnableRepository();
+        return new WorkflowExecutionDAG(workflowDAG, 
taskExecutionRunnableRepository);
+    }
+}
diff --git 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dag/WorkflowExecutionRunnable.java
 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dag/WorkflowExecutionRunnable.java
new file mode 100644
index 0000000000..44b5020d09
--- /dev/null
+++ 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dag/WorkflowExecutionRunnable.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.server.master.dag;
+
+import org.apache.dolphinscheduler.server.master.events.IEventRepository;
+import org.apache.dolphinscheduler.server.master.events.WorkflowTriggeredEvent;
+
+import lombok.AllArgsConstructor;
+import lombok.Builder;
+import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
+
+@Slf4j
+@Getter
+@Builder
+@AllArgsConstructor
+public class WorkflowExecutionRunnable implements IWorkflowExecutionRunnable {
+
+    private final WorkflowExecutionContext workflowExecutionContext;
+
+    private final IDAGEngine dagEngine;
+
+    private final IEventRepository eventRepository;
+
+    public void start() {
+        dagEngine.triggerNextTasks(null);
+        eventRepository.storeEventToTail(new 
WorkflowTriggeredEvent(getWorkflowInstanceId()));
+    }
+
+    @Override
+    public void pause() {
+        
dagEngine.getWorkflowExecutionDAG().getActiveTaskExecutionRunnables().forEach(taskExecuteRunnable
 -> dagEngine
+                
.pauseTask(taskExecuteRunnable.getTaskExecutionContext().getTaskInstance().getId()));
+    }
+
+    @Override
+    public void kill() {
+        
dagEngine.getWorkflowExecutionDAG().getActiveTaskExecutionRunnables().forEach(taskExecuteRunnable
 -> dagEngine
+                
.killTask(taskExecuteRunnable.getTaskExecutionContext().getTaskInstance().getId()));
+    }
+
+}
diff --git 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dag/WorkflowExecutionRunnableFactory.java
 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dag/WorkflowExecutionRunnableFactory.java
new file mode 100644
index 0000000000..c1eacf5d8a
--- /dev/null
+++ 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dag/WorkflowExecutionRunnableFactory.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.server.master.dag;
+
+import org.apache.dolphinscheduler.server.master.events.IEventRepository;
+
+import lombok.extern.slf4j.Slf4j;
+
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Component;
+
+@Slf4j
+@Component
+public class WorkflowExecutionRunnableFactory {
+
+    @Autowired
+    private IDAGEngineFactory dagEngineFactory;
+
+    @Autowired
+    private IEventRepositoryFactory eventRepositoryFactory;
+
+    public WorkflowExecutionRunnable 
createWorkflowExecuteRunnable(WorkflowExecutionContext 
workflowExecutionContext) {
+        IEventRepository eventRepository = 
eventRepositoryFactory.createEventRepository();
+        IDAGEngine dagEngine = 
dagEngineFactory.createDAGEngine(workflowExecutionContext, eventRepository);
+        return 
WorkflowExecutionRunnable.builder().workflowExecutionContext(workflowExecutionContext)
+                .eventRepository(eventRepository).dagEngine(dagEngine).build();
+    }
+
+}
diff --git 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dag/WorkflowIdentify.java
 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dag/WorkflowIdentify.java
new file mode 100644
index 0000000000..84a8b94bb2
--- /dev/null
+++ 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dag/WorkflowIdentify.java
@@ -0,0 +1,18 @@
+package org.apache.dolphinscheduler.server.master.dag;
+
+import lombok.AllArgsConstructor;
+import lombok.Builder;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
+@Data
+@Builder
+@NoArgsConstructor
+@AllArgsConstructor
+public class WorkflowIdentify {
+
+    private long workflowCode;
+
+    private int workflowVersion;
+
+}
diff --git 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/events/EventDispatcher.java
 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/events/EventDispatcher.java
new file mode 100644
index 0000000000..948cb476d7
--- /dev/null
+++ 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/events/EventDispatcher.java
@@ -0,0 +1,53 @@
+package org.apache.dolphinscheduler.server.master.events;
+
+import 
org.apache.dolphinscheduler.server.master.dag.IWorkflowExecutionRunnable;
+import 
org.apache.dolphinscheduler.server.master.dag.WorkflowExecuteRunnableRepository;
+
+import lombok.extern.slf4j.Slf4j;
+
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Component;
+
+@Slf4j
+@Component
+public class EventDispatcher implements IEventDispatcher<IEvent> {
+
+    @Autowired
+    private EventEngine eventEngine;
+
+    @Autowired
+    private WorkflowExecuteRunnableRepository 
workflowExecuteRunnableRepository;
+
+    @Override
+    public void start() {
+        log.info(getClass().getName() + " started");
+    }
+
+    @Override
+    public void dispatchEvent(IEvent iEvent) {
+        Integer workflowInstanceId;
+        if (iEvent instanceof IWorkflowEvent) {
+            workflowInstanceId = ((IWorkflowEvent) 
iEvent).getWorkflowInstanceId();
+        } else if (iEvent instanceof ITaskEvent) {
+            workflowInstanceId = ((ITaskEvent) iEvent).getWorkflowInstanceId();
+        } else {
+            throw new IllegalArgumentException("Unsupported event type: " + 
iEvent.getClass().getName());
+        }
+
+        IWorkflowExecutionRunnable workflowExecuteRunnable =
+                
workflowExecuteRunnableRepository.getWorkflowExecutionRunnableById(workflowInstanceId);
+        if (workflowExecuteRunnable == null) {
+            log.error("Cannot find the IWorkflowExecutionRunnable for event: 
{}", iEvent);
+            return;
+        }
+        workflowExecuteRunnable.getEventRepository().storeEventToTail(iEvent);
+        log.debug("Success dispatch event {} to EventRepository", iEvent);
+        eventEngine.notify();
+    }
+
+    @Override
+    public void stop() {
+        log.info(getClass().getName() + " stopped");
+    }
+
+}
diff --git 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/events/EventEngine.java
 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/events/EventEngine.java
new file mode 100644
index 0000000000..fbe92c69c6
--- /dev/null
+++ 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/events/EventEngine.java
@@ -0,0 +1,93 @@
+package org.apache.dolphinscheduler.server.master.events;
+
+import org.apache.dolphinscheduler.common.thread.BaseDaemonThread;
+import org.apache.dolphinscheduler.common.thread.ThreadUtils;
+import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
+import org.apache.dolphinscheduler.plugin.task.api.utils.LogUtils;
+import 
org.apache.dolphinscheduler.server.master.dag.IWorkflowExecutionRunnable;
+import 
org.apache.dolphinscheduler.server.master.dag.WorkflowExecuteRunnableRepository;
+
+import org.apache.commons.lang3.time.StopWatch;
+
+import java.util.Collection;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+
+import lombok.extern.slf4j.Slf4j;
+
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Component;
+
+@Slf4j
+@Component
+public class EventEngine extends BaseDaemonThread {
+
+    @Autowired
+    private WorkflowExecuteRunnableRepository 
workflowExecuteRunnableRepository;
+
+    @Autowired
+    private EventFirer eventFirer;
+
+    private final Set<Integer> firingWorkflowInstanceIds = 
ConcurrentHashMap.newKeySet();
+
+    public EventEngine() {
+        super("EventEngine");
+    }
+
+    @Override
+    public synchronized void start() {
+        super.start();
+        log.info(getClass().getName() + " started");
+    }
+
+    @Override
+    public void run() {
+        for (;;) {
+            try {
+                StopWatch stopWatch = StopWatch.createStarted();
+                fireAllActiveEvents();
+                stopWatch.stop();
+                log.info("Fire all active events cost: {} ms", 
stopWatch.getTime());
+                this.wait(5_000);
+            } catch (Throwable throwable) {
+                log.error("Fire active event error", throwable);
+                ThreadUtils.sleep(3_000);
+            }
+        }
+    }
+
+    public void fireAllActiveEvents() {
+        Collection<IWorkflowExecutionRunnable> 
workflowExecutionRunnableCollection =
+                
workflowExecuteRunnableRepository.getActiveWorkflowExecutionRunnable();
+        for (IWorkflowExecutionRunnable workflowExecutionRunnable : 
workflowExecutionRunnableCollection) {
+            ProcessInstance workflowInstance = 
workflowExecutionRunnable.getWorkflowInstance();
+            final Integer workflowInstanceId = workflowInstance.getId();
+            final String workflowInstanceName = workflowInstance.getName();
+            try {
+                LogUtils.setWorkflowInstanceIdMDC(workflowInstanceId);
+                if (firingWorkflowInstanceIds.contains(workflowInstanceId)) {
+                    log.debug("WorkflowExecutionRunnable: {} is already in 
firing", workflowInstanceName);
+                    return;
+                }
+                IEventRepository<IEvent> workflowEventRepository = 
workflowExecutionRunnable.getEventRepository();
+                firingWorkflowInstanceIds.add(workflowInstanceId);
+                eventFirer.fireActiveEvents(workflowEventRepository)
+                        .whenComplete((fireCount, ex) -> {
+                            
firingWorkflowInstanceIds.remove(workflowInstanceId);
+                            if (ex != null) {
+                                log.error("Fire event for 
WorkflowExecutionRunnable: {} error", workflowInstanceName,
+                                        ex);
+                            } else {
+                                if (fireCount > 0) {
+                                    log.info("Fire {} events for 
WorkflowExecutionRunnable: {} success", fireCount,
+                                            workflowInstanceName);
+                                }
+                            }
+                        });
+            } finally {
+                LogUtils.removeWorkflowInstanceIdMDC();
+            }
+        }
+    }
+
+}
diff --git 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/events/EventFirer.java
 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/events/EventFirer.java
new file mode 100644
index 0000000000..2782528a3f
--- /dev/null
+++ 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/events/EventFirer.java
@@ -0,0 +1,78 @@
+package org.apache.dolphinscheduler.server.master.events;
+
+import org.apache.dolphinscheduler.common.thread.ThreadUtils;
+import org.apache.dolphinscheduler.server.master.config.MasterConfig;
+import org.apache.dolphinscheduler.server.master.utils.ExceptionUtils;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ThreadPoolExecutor;
+
+import lombok.extern.slf4j.Slf4j;
+
+import org.springframework.stereotype.Component;
+
+@Slf4j
+@Component
+public class EventFirer implements IEventFirer<IEvent> {
+
+    private final IEventOperatorManager<IEvent> eventOperatorManager;
+
+    private final ThreadPoolExecutor eventFireThreadPool;
+
+    public EventFirer(IEventOperatorManager<IEvent> eventOperatorManager, 
MasterConfig masterConfig) {
+        this.eventOperatorManager = eventOperatorManager;
+        this.eventFireThreadPool =
+                
ThreadUtils.newDaemonFixedThreadExecutor("EventFireThreadPool", 
masterConfig.getExecThreads());
+    }
+
+    @Override
+    public CompletableFuture<Integer> 
fireActiveEvents(IEventRepository<IEvent> eventRepository) {
+        if (eventRepository.getEventSize() == 0) {
+            return CompletableFuture.completedFuture(0);
+        }
+        return CompletableFuture.supplyAsync(() -> {
+            int fireCount = 0;
+            for (;;) {
+                IEvent event = eventRepository.poolEvent();
+                if (event == null) {
+                    break;
+                }
+
+                if (event instanceof IAsyncEvent) {
+                    fireAsyncEvent(event);
+                    fireCount++;
+                    continue;
+                }
+                try {
+                    fireSyncEvent(event);
+                    fireCount++;
+                } catch (Exception ex) {
+                    if (ExceptionUtils.isDatabaseConnectedFailedException(ex)) 
{
+                        // If the event is failed due to cannot connect to DB, 
we should retry it
+                        eventRepository.storeEventToHead(event);
+                    }
+                    throw ex;
+                }
+            }
+            return fireCount;
+        }, eventFireThreadPool);
+    }
+
+    private void fireAsyncEvent(IEvent event) {
+        CompletableFuture.runAsync(() -> {
+            log.info("Begin fire IAsyncEvent: {}", event);
+            eventOperatorManager.getEventOperator(event).handleEvent(event);
+            log.info("Success fire IAsyncEvent: {}", event);
+        }, eventFireThreadPool).exceptionally(ex -> {
+            log.error("Failed to fire IAsyncEvent: {}", event, ex);
+            return null;
+        });
+    }
+
+    private void fireSyncEvent(IEvent event) {
+        log.info("Begin fire SyncEvent: {}", event);
+        eventOperatorManager.getEventOperator(event).handleEvent(event);
+        log.info("Success fire SyncEvent: {}", event);
+    }
+
+}
diff --git 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/events/EventOperatorManager.java
 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/events/EventOperatorManager.java
new file mode 100644
index 0000000000..a7915a168b
--- /dev/null
+++ 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/events/EventOperatorManager.java
@@ -0,0 +1,19 @@
+package org.apache.dolphinscheduler.server.master.events;
+
+import lombok.extern.slf4j.Slf4j;
+
+import org.springframework.stereotype.Component;
+
+/**
+ * The event operator manager interface used to get {@link ITaskEventOperator}.
+ */
+@Slf4j
+@Component
+public class EventOperatorManager implements IEventOperatorManager<IEvent> {
+
+    @Override
+    public IEventOperator<IEvent> getEventOperator(IEvent event) {
+        return null;
+    }
+
+}
diff --git 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/events/IAsyncEvent.java
 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/events/IAsyncEvent.java
new file mode 100644
index 0000000000..1ea4553554
--- /dev/null
+++ 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/events/IAsyncEvent.java
@@ -0,0 +1,7 @@
+package org.apache.dolphinscheduler.server.master.events;
+
+/**
+ * Mark the event as AsyncEvent, if the event is marked as AsyncEvent, the 
event will be handled asynchronously and we don't .
+ */
+public interface IAsyncEvent {
+}
diff --git 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/events/IEvent.java
 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/events/IEvent.java
new file mode 100644
index 0000000000..425ba97d49
--- /dev/null
+++ 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/events/IEvent.java
@@ -0,0 +1,5 @@
+package org.apache.dolphinscheduler.server.master.events;
+
+public interface IEvent {
+
+}
diff --git 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/events/IEventDispatcher.java
 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/events/IEventDispatcher.java
new file mode 100644
index 0000000000..e32f8bc619
--- /dev/null
+++ 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/events/IEventDispatcher.java
@@ -0,0 +1,15 @@
+package org.apache.dolphinscheduler.server.master.events;
+
+/**
+ * The event dispatcher interface used to dispatch event.
+ * Each event should be dispatched to the corresponding workflow event queue.
+ */
+public interface IEventDispatcher<E> {
+
+    void start();
+
+    void stop();
+
+    void dispatchEvent(E event);
+
+}
diff --git 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/events/IEventFirer.java
 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/events/IEventFirer.java
new file mode 100644
index 0000000000..576942089d
--- /dev/null
+++ 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/events/IEventFirer.java
@@ -0,0 +1,19 @@
+package org.apache.dolphinscheduler.server.master.events;
+
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * The event firer interface used to fire event.
+ *
+ * @param <E> event type
+ */
+public interface IEventFirer<E> {
+
+    /**
+     * Fire all active events in the event repository
+     *
+     * @return the count of fired success events
+     */
+    CompletableFuture<Integer> fireActiveEvents(IEventRepository<E> 
eventRepository);
+
+}
diff --git 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/events/IEventOperator.java
 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/events/IEventOperator.java
new file mode 100644
index 0000000000..ef767e35d0
--- /dev/null
+++ 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/events/IEventOperator.java
@@ -0,0 +1,15 @@
+package org.apache.dolphinscheduler.server.master.events;
+
+/**
+ * The event operator interface used to handle event.
+ */
+public interface IEventOperator<E> {
+
+    /**
+     * Handle the given event
+     *
+     * @param event event
+     */
+    void handleEvent(E event);
+
+}
diff --git 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/events/IEventOperatorManager.java
 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/events/IEventOperatorManager.java
new file mode 100644
index 0000000000..0670dc6d57
--- /dev/null
+++ 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/events/IEventOperatorManager.java
@@ -0,0 +1,16 @@
+package org.apache.dolphinscheduler.server.master.events;
+
+/**
+ * The event operator manager interface used to get event operator.
+ */
+public interface IEventOperatorManager<E> {
+
+    /**
+     * Get the {@link IEventOperator} for the given event.
+     *
+     * @param event event
+     * @return event operator for the given event
+     */
+    IEventOperator<E> getEventOperator(E event);
+
+}
diff --git 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/events/IEventRepository.java
 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/events/IEventRepository.java
new file mode 100644
index 0000000000..eda74a2ee7
--- /dev/null
+++ 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/events/IEventRepository.java
@@ -0,0 +1,16 @@
+package org.apache.dolphinscheduler.server.master.events;
+
+/**
+ * The event repository interface used to store event.
+ */
+public interface IEventRepository {
+
+    void storeEventToTail(IEvent event);
+
+    void storeEventToHead(IEvent event);
+
+    IEvent poolEvent();
+
+    int getEventSize();
+
+}
diff --git 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/events/ISyncEvent.java
 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/events/ISyncEvent.java
new file mode 100644
index 0000000000..95de44d0b8
--- /dev/null
+++ 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/events/ISyncEvent.java
@@ -0,0 +1,4 @@
+package org.apache.dolphinscheduler.server.master.events;
+
+public interface ISyncEvent {
+}
diff --git 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/events/ITaskEvent.java
 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/events/ITaskEvent.java
new file mode 100644
index 0000000000..37b2a99b15
--- /dev/null
+++ 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/events/ITaskEvent.java
@@ -0,0 +1,9 @@
+package org.apache.dolphinscheduler.server.master.events;
+
+public interface ITaskEvent extends IEvent {
+
+    Integer getWorkflowInstanceId();
+
+    Integer getTaskInstanceId();
+
+}
diff --git 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/events/ITaskEventOperator.java
 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/events/ITaskEventOperator.java
new file mode 100644
index 0000000000..d2588a7027
--- /dev/null
+++ 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/events/ITaskEventOperator.java
@@ -0,0 +1,5 @@
+package org.apache.dolphinscheduler.server.master.events;
+
+public interface ITaskEventOperator<E extends ITaskEvent> extends 
IEventOperator<E> {
+
+}
diff --git 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/events/IWorkflowEvent.java
 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/events/IWorkflowEvent.java
new file mode 100644
index 0000000000..a9dc0a31a5
--- /dev/null
+++ 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/events/IWorkflowEvent.java
@@ -0,0 +1,12 @@
+package org.apache.dolphinscheduler.server.master.events;
+
+public interface IWorkflowEvent extends IEvent {
+
+    /**
+     * The id of WorkflowInstance which the event is related to
+     *
+     * @return workflowInstanceId, shouldn't be null
+     */
+    Integer getWorkflowInstanceId();
+
+}
diff --git 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/events/IWorkflowEventOperator.java
 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/events/IWorkflowEventOperator.java
new file mode 100644
index 0000000000..5d70dd1859
--- /dev/null
+++ 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/events/IWorkflowEventOperator.java
@@ -0,0 +1,6 @@
+package org.apache.dolphinscheduler.server.master.events;
+
+public interface IWorkflowEventOperator<E extends IWorkflowEvent>
+        extends
+            IEventOperator<E> {
+}
diff --git 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/events/MemoryEventRepository.java
 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/events/MemoryEventRepository.java
new file mode 100644
index 0000000000..ccaf6d2611
--- /dev/null
+++ 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/events/MemoryEventRepository.java
@@ -0,0 +1,36 @@
+package org.apache.dolphinscheduler.server.master.events;
+
+import java.util.concurrent.LinkedBlockingDeque;
+
+import lombok.extern.slf4j.Slf4j;
+
+@Slf4j
+public class MemoryEventRepository implements IEventRepository {
+
+    protected final LinkedBlockingDeque<IEvent> eventQueue;
+
+    public MemoryEventRepository() {
+        this.eventQueue = new LinkedBlockingDeque<>();
+    }
+
+    @Override
+    public void storeEventToTail(IEvent event) {
+        eventQueue.offerLast(event);
+    }
+
+    @Override
+    public void storeEventToHead(IEvent event) {
+        eventQueue.offerFirst(event);
+    }
+
+    @Override
+    public IEvent poolEvent() {
+        return eventQueue.poll();
+    }
+
+    @Override
+    public int getEventSize() {
+        return eventQueue.size();
+    }
+
+}
diff --git 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/events/TaskLogSendToRemoteEvent.java
 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/events/TaskLogSendToRemoteEvent.java
new file mode 100644
index 0000000000..a54210606e
--- /dev/null
+++ 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/events/TaskLogSendToRemoteEvent.java
@@ -0,0 +1,19 @@
+package org.apache.dolphinscheduler.server.master.events;
+
+import lombok.AllArgsConstructor;
+import lombok.Builder;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
+@Data
+@Builder
+@NoArgsConstructor
+@AllArgsConstructor
+public class TaskLogSendToRemoteEvent implements ITaskEvent {
+
+    private Integer workflowInstanceId;
+    private Integer taskInstanceId;
+
+    private String taskType;
+    private String logPath;
+}
diff --git 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/events/TaskLogSendToRemoteEventOperator.java
 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/events/TaskLogSendToRemoteEventOperator.java
new file mode 100644
index 0000000000..667c0de0cc
--- /dev/null
+++ 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/events/TaskLogSendToRemoteEventOperator.java
@@ -0,0 +1,21 @@
+package org.apache.dolphinscheduler.server.master.events;
+
+import org.apache.dolphinscheduler.common.log.remote.RemoteLogUtils;
+import org.apache.dolphinscheduler.server.master.utils.TaskUtils;
+
+import lombok.extern.slf4j.Slf4j;
+
+import org.springframework.stereotype.Component;
+
+@Slf4j
+@Component
+public class TaskLogSendToRemoteEventOperator implements 
ITaskEventOperator<TaskLogSendToRemoteEvent> {
+
+    @Override
+    public void handleEvent(TaskLogSendToRemoteEvent event) {
+        if (RemoteLogUtils.isRemoteLoggingEnable() && 
TaskUtils.isMasterTask(event.getTaskType())) {
+            RemoteLogUtils.sendRemoteLog(event.getLogPath());
+            log.info("Master sends task log {} to remote storage 
asynchronously.", event.getLogPath());
+        }
+    }
+}
diff --git 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/events/TaskOperationEvent.java
 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/events/TaskOperationEvent.java
new file mode 100644
index 0000000000..79df2c093f
--- /dev/null
+++ 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/events/TaskOperationEvent.java
@@ -0,0 +1,18 @@
+package org.apache.dolphinscheduler.server.master.events;
+
+import lombok.AllArgsConstructor;
+import lombok.Builder;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
+@Data
+@Builder
+@NoArgsConstructor
+@AllArgsConstructor
+public class TaskOperationEvent implements ITaskEvent, ISyncEvent {
+
+    private Integer workflowInstanceId;
+    private Integer taskInstanceId;
+
+    private TaskOperationType taskOperationType;
+}
diff --git 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/events/TaskOperationEventOperator.java
 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/events/TaskOperationEventOperator.java
new file mode 100644
index 0000000000..749894fc87
--- /dev/null
+++ 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/events/TaskOperationEventOperator.java
@@ -0,0 +1,48 @@
+package org.apache.dolphinscheduler.server.master.events;
+
+import org.apache.dolphinscheduler.server.master.dag.IWorkflowExecutionDAG;
+import 
org.apache.dolphinscheduler.server.master.dag.IWorkflowExecutionRunnable;
+import org.apache.dolphinscheduler.server.master.dag.TaskExecutionRunnable;
+import 
org.apache.dolphinscheduler.server.master.dag.WorkflowExecuteRunnableRepository;
+
+import lombok.extern.slf4j.Slf4j;
+
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Component;
+
+@Slf4j
+@Component
+public class TaskOperationEventOperator implements 
ITaskEventOperator<TaskOperationEvent> {
+
+    @Autowired
+    private WorkflowExecuteRunnableRepository 
workflowExecuteRunnableRepository;
+
+    @Override
+    public void handleEvent(TaskOperationEvent event) {
+        IWorkflowExecutionRunnable workflowExecutionRunnable =
+                
workflowExecuteRunnableRepository.getWorkflowExecutionRunnableById(event.getWorkflowInstanceId());
+        if (workflowExecutionRunnable == null) {
+            log.error("Cannot find the IWorkflowExecutionRunnable for event: 
{}", event);
+            return;
+        }
+        IWorkflowExecutionDAG workflowExecutionDAG = 
workflowExecutionRunnable.getDagEngine().getWorkflowExecutionDAG();
+        TaskExecutionRunnable taskExecutionRunnable =
+                
workflowExecutionDAG.getTaskExecutionRunnableById(event.getTaskInstanceId());
+        if (taskExecutionRunnable == null) {
+            log.error("Cannot find the ITaskExecutionRunnable for event: {}", 
event);
+        }
+        switch (event.getTaskOperationType()) {
+            case DISPATCH:
+                taskExecutionRunnable.dispatch();
+                break;
+            case KILL:
+                taskExecutionRunnable.kill();
+                break;
+            case PAUSE:
+                taskExecutionRunnable.pause();
+                break;
+            default:
+                log.error("Unknown TaskOperationType for event: {}", event);
+        }
+    }
+}
diff --git 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/events/TaskOperationType.java
 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/events/TaskOperationType.java
new file mode 100644
index 0000000000..0029342f62
--- /dev/null
+++ 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/events/TaskOperationType.java
@@ -0,0 +1,8 @@
+package org.apache.dolphinscheduler.server.master.events;
+
+public enum TaskOperationType {
+
+    DISPATCH, KILL, PAUSE,
+    ;
+
+}
diff --git 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/events/TaskSuccessEvent.java
 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/events/TaskSuccessEvent.java
new file mode 100644
index 0000000000..950db9b468
--- /dev/null
+++ 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/events/TaskSuccessEvent.java
@@ -0,0 +1,16 @@
+package org.apache.dolphinscheduler.server.master.events;
+
+import lombok.AllArgsConstructor;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
+@Data
+@NoArgsConstructor
+@AllArgsConstructor
+public class TaskSuccessEvent implements ITaskEvent {
+
+    private Integer workflowInstanceId;
+
+    private Integer taskInstanceId;
+
+}
diff --git 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/events/TaskSuccessEventOperator.java
 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/events/TaskSuccessEventOperator.java
new file mode 100644
index 0000000000..48cf50b3b8
--- /dev/null
+++ 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/events/TaskSuccessEventOperator.java
@@ -0,0 +1,30 @@
+package org.apache.dolphinscheduler.server.master.events;
+
+import 
org.apache.dolphinscheduler.server.master.cache.IWorkflowExecuteRunnableRepository;
+import 
org.apache.dolphinscheduler.server.master.dag.IWorkflowExecutionRunnable;
+
+import lombok.extern.slf4j.Slf4j;
+
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Component;
+
+@Slf4j
+@Component
+public class TaskSuccessEventOperator implements ITaskEventOperator {
+
+    @Autowired
+    private IWorkflowExecuteRunnableRepository 
workflowExecuteRunnableRepository;
+
+    @Override
+    public void handleEvent(ITaskEvent event) {
+        Integer workflowInstanceId = event.getWorkflowInstanceId();
+        Integer taskInstanceId = event.getTaskInstanceId();
+        IWorkflowExecutionRunnable workflowExecutionRunnable =
+                
workflowExecuteRunnableRepository.getByProcessInstanceId(workflowInstanceId);
+        if (workflowExecutionRunnable == null) {
+            log.error("Cannot find the WorkflowExecutionRunnable, the event: 
{} will be dropped", event);
+            return;
+        }
+
+    }
+}
diff --git 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/events/WorkflowFailedEvent.java
 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/events/WorkflowFailedEvent.java
new file mode 100644
index 0000000000..e9de91a6cf
--- /dev/null
+++ 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/events/WorkflowFailedEvent.java
@@ -0,0 +1,18 @@
+package org.apache.dolphinscheduler.server.master.events;
+
+import lombok.AllArgsConstructor;
+import lombok.Builder;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
+@Data
+@Builder
+@NoArgsConstructor
+@AllArgsConstructor
+public class WorkflowFailedEvent implements IWorkflowEvent, ISyncEvent {
+
+    private Integer workflowInstanceId;
+
+    private String failedReason;
+
+}
diff --git 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/events/WorkflowFailedEventOperator.java
 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/events/WorkflowFailedEventOperator.java
new file mode 100644
index 0000000000..9400209224
--- /dev/null
+++ 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/events/WorkflowFailedEventOperator.java
@@ -0,0 +1,43 @@
+package org.apache.dolphinscheduler.server.master.events;
+
+import org.apache.dolphinscheduler.common.enums.WorkflowExecutionStatus;
+import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
+import org.apache.dolphinscheduler.dao.repository.ProcessInstanceDao;
+import 
org.apache.dolphinscheduler.server.master.dag.IWorkflowExecutionRunnable;
+import 
org.apache.dolphinscheduler.server.master.dag.WorkflowExecuteRunnableRepository;
+
+import lombok.extern.slf4j.Slf4j;
+
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Component;
+
+@Slf4j
+@Component
+public class WorkflowFailedEventOperator
+        implements
+            IWorkflowEventOperator<WorkflowFailedEvent> {
+
+    @Autowired
+    private WorkflowExecuteRunnableRepository 
workflowExecuteRunnableRepository;
+
+    @Autowired
+    private EventDispatcher eventDispatcher;
+
+    @Autowired
+    private ProcessInstanceDao processInstanceDao;
+
+    @Override
+    public void handleEvent(WorkflowFailedEvent event) {
+        Integer workflowInstanceId = event.getWorkflowInstanceId();
+        IWorkflowExecutionRunnable workflowExecutionRunnable =
+                
workflowExecuteRunnableRepository.getWorkflowExecutionRunnableById(workflowInstanceId);
+
+        ProcessInstance workflowInstance = 
workflowExecutionRunnable.getWorkflowInstance();
+        workflowInstance.setState(WorkflowExecutionStatus.FAILURE);
+        processInstanceDao.updateById(workflowInstance);
+        log.info("Handle WorkflowExecutionRunnableFailedEvent success, set 
workflowInstance status to {}",
+                workflowInstance.getState());
+
+        eventDispatcher.dispatchEvent(new 
WorkflowFinalizeEvent(workflowInstanceId));
+    }
+}
diff --git 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/events/WorkflowFinalizeEvent.java
 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/events/WorkflowFinalizeEvent.java
new file mode 100644
index 0000000000..e4a4acebb3
--- /dev/null
+++ 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/events/WorkflowFinalizeEvent.java
@@ -0,0 +1,15 @@
+package org.apache.dolphinscheduler.server.master.events;
+
+import lombok.AllArgsConstructor;
+import lombok.Builder;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
+@Data
+@Builder
+@AllArgsConstructor
+@NoArgsConstructor
+public class WorkflowFinalizeEvent implements IWorkflowEvent, ISyncEvent {
+
+    private Integer workflowInstanceId;
+}
diff --git 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/events/WorkflowFinalizeEventOperator.java
 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/events/WorkflowFinalizeEventOperator.java
new file mode 100644
index 0000000000..4d4055c8db
--- /dev/null
+++ 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/events/WorkflowFinalizeEventOperator.java
@@ -0,0 +1,26 @@
+package org.apache.dolphinscheduler.server.master.events;
+
+import 
org.apache.dolphinscheduler.server.master.cache.IWorkflowExecuteRunnableRepository;
+import 
org.apache.dolphinscheduler.server.master.dag.IWorkflowExecutionRunnable;
+
+import lombok.extern.slf4j.Slf4j;
+
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Component;
+
+@Slf4j
+@Component
+public class WorkflowFinalizeEventOperator
+        implements
+            IWorkflowEventOperator<WorkflowFinalizeEvent> {
+
+    @Autowired
+    private IWorkflowExecuteRunnableRepository<IWorkflowExecutionRunnable> 
workflowExecuteRunnableRepository;
+
+    @Override
+    public void handleEvent(WorkflowFinalizeEvent event) {
+        Integer workflowInstanceId = event.getWorkflowInstanceId();
+        
workflowExecuteRunnableRepository.removeByProcessInstanceId(workflowInstanceId);
+    }
+
+}
diff --git 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/events/WorkflowFinishEvent.java
 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/events/WorkflowFinishEvent.java
new file mode 100644
index 0000000000..db49459862
--- /dev/null
+++ 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/events/WorkflowFinishEvent.java
@@ -0,0 +1,20 @@
+package org.apache.dolphinscheduler.server.master.events;
+
+import org.apache.dolphinscheduler.common.enums.WorkflowExecutionStatus;
+
+import lombok.AllArgsConstructor;
+import lombok.Builder;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
+@Data
+@Builder
+@NoArgsConstructor
+@AllArgsConstructor
+public class WorkflowFinishEvent implements IWorkflowEvent, ISyncEvent {
+
+    private Integer workflowInstanceId;
+
+    private WorkflowExecutionStatus workflowExecutionStatus;
+
+}
diff --git 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/events/WorkflowOperationEvent.java
 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/events/WorkflowOperationEvent.java
new file mode 100644
index 0000000000..e0aee2d29b
--- /dev/null
+++ 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/events/WorkflowOperationEvent.java
@@ -0,0 +1,24 @@
+package org.apache.dolphinscheduler.server.master.events;
+
+import lombok.AllArgsConstructor;
+import lombok.Builder;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+
+@Slf4j
+@Data
+@Builder
+@NoArgsConstructor
+@AllArgsConstructor
+public class WorkflowOperationEvent implements IWorkflowEvent, ISyncEvent {
+
+    private Integer workflowInstanceId;
+    private WorkflowOperationType workflowOperationType;
+
+    public static WorkflowOperationEvent of(Integer workflowInstanceId, 
WorkflowOperationType workflowOperationType) {
+        return 
WorkflowOperationEvent.builder().workflowInstanceId(workflowInstanceId)
+                .workflowOperationType(workflowOperationType).build();
+    }
+
+}
diff --git 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/events/WorkflowOperationEventOperator.java
 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/events/WorkflowOperationEventOperator.java
new file mode 100644
index 0000000000..4231fb160c
--- /dev/null
+++ 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/events/WorkflowOperationEventOperator.java
@@ -0,0 +1,68 @@
+package org.apache.dolphinscheduler.server.master.events;
+
+import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
+import 
org.apache.dolphinscheduler.server.master.cache.IWorkflowExecuteRunnableRepository;
+import 
org.apache.dolphinscheduler.server.master.dag.IWorkflowExecutionRunnable;
+import org.apache.dolphinscheduler.server.master.utils.ExceptionUtils;
+
+import lombok.extern.slf4j.Slf4j;
+
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Component;
+
+@Slf4j
+@Component
+public class WorkflowOperationEventOperator implements 
IWorkflowEventOperator<WorkflowOperationEvent> {
+
+    @Autowired
+    private IWorkflowExecuteRunnableRepository 
workflowExecuteRunnableRepository;
+
+    @Override
+    public void handleEvent(WorkflowOperationEvent event) {
+        IWorkflowExecutionRunnable workflowExecutionRunnable =
+                
workflowExecuteRunnableRepository.getByProcessInstanceId(event.getWorkflowInstanceId());
+        if (workflowExecutionRunnable == null) {
+            log.warn(
+                    "Handle workflowExecutionRunnableKillOperationEvent: {} 
failed: WorkflowExecutionRunnable not found",
+                    event);
+            return;
+        }
+        switch (event.getWorkflowOperationType()) {
+            case TRIGGER:
+                triggerWorkflow(workflowExecutionRunnable);
+                break;
+            case PAUSE:
+                pauseWorkflow(workflowExecutionRunnable);
+                break;
+            case KILL:
+                killWorkflow(workflowExecutionRunnable);
+                break;
+            default:
+                log.error("Unknown operationType for event: {}", event);
+        }
+    }
+
+    private void triggerWorkflow(IWorkflowExecutionRunnable 
workflowExecutionRunnable) {
+        try {
+            workflowExecutionRunnable.start();
+        } catch (Throwable exception) {
+            if (ExceptionUtils.isDatabaseConnectedFailedException(exception)) {
+                throw exception;
+            }
+            ProcessInstance workflowInstance =
+                    
workflowExecutionRunnable.getWorkflowExecutionContext().getWorkflowInstance();
+            log.error("Trigger workflow: {} failed", 
workflowInstance.getName(), exception);
+            WorkflowFailedEvent workflowExecutionRunnableFailedEvent = 
WorkflowFailedEvent.builder()
+                    
.workflowInstanceId(workflowInstance.getId()).failedReason(exception.getMessage()).build();
+            
workflowExecutionRunnable.getEventRepository().storeEventToTail(workflowExecutionRunnableFailedEvent);
+        }
+    }
+
+    private void pauseWorkflow(IWorkflowExecutionRunnable 
workflowExecutionRunnable) {
+        workflowExecutionRunnable.pause();
+    }
+
+    private void killWorkflow(IWorkflowExecutionRunnable 
workflowExecutionRunnable) {
+        workflowExecutionRunnable.kill();
+    }
+}
diff --git 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/events/WorkflowOperationType.java
 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/events/WorkflowOperationType.java
new file mode 100644
index 0000000000..296c5c8dd6
--- /dev/null
+++ 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/events/WorkflowOperationType.java
@@ -0,0 +1,10 @@
+package org.apache.dolphinscheduler.server.master.events;
+
+public enum WorkflowOperationType {
+
+    TRIGGER,
+    PAUSE,
+    KILL,
+    ;
+
+}
diff --git 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/events/WorkflowTimeoutEvent.java
 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/events/WorkflowTimeoutEvent.java
new file mode 100644
index 0000000000..ee3c6843a4
--- /dev/null
+++ 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/events/WorkflowTimeoutEvent.java
@@ -0,0 +1,16 @@
+package org.apache.dolphinscheduler.server.master.events;
+
+import lombok.AllArgsConstructor;
+import lombok.Builder;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
+@Data
+@Builder
+@NoArgsConstructor
+@AllArgsConstructor
+public class WorkflowTimeoutEvent implements IWorkflowEvent, IAsyncEvent {
+
+    private Integer workflowInstanceId;
+
+}
diff --git 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/events/WorkflowTimeoutEventOperator.java
 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/events/WorkflowTimeoutEventOperator.java
new file mode 100644
index 0000000000..89014b8d9d
--- /dev/null
+++ 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/events/WorkflowTimeoutEventOperator.java
@@ -0,0 +1,43 @@
+package org.apache.dolphinscheduler.server.master.events;
+
+import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
+import org.apache.dolphinscheduler.dao.entity.ProjectUser;
+import 
org.apache.dolphinscheduler.server.master.cache.IWorkflowExecuteRunnableRepository;
+import 
org.apache.dolphinscheduler.server.master.dag.IWorkflowExecutionRunnable;
+import org.apache.dolphinscheduler.service.alert.ProcessAlertManager;
+import org.apache.dolphinscheduler.service.process.ProcessService;
+
+import lombok.extern.slf4j.Slf4j;
+
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Component;
+
+@Slf4j
+@Component
+public class WorkflowTimeoutEventOperator
+        implements
+            IEventOperator<WorkflowTimeoutEvent> {
+
+    @Autowired
+    private IWorkflowExecuteRunnableRepository<IWorkflowExecutionRunnable> 
workflowExecutionRunnableRepository;
+
+    @Autowired
+    private ProcessService processService;
+
+    @Autowired
+    private ProcessAlertManager processAlertManager;
+
+    @Override
+    public void handleEvent(WorkflowTimeoutEvent event) {
+        IWorkflowExecutionRunnable workflowExecutionRunnable =
+                
workflowExecutionRunnableRepository.getByProcessInstanceId(event.getWorkflowInstanceId());
+        if (workflowExecutionRunnable == null) {
+            log.warn("Cannot find the workflow instance by id: {}", 
event.getWorkflowInstanceId());
+            return;
+        }
+        // we only support timeout warning for now
+        ProcessInstance workflowInstance = 
workflowExecutionRunnable.getWorkflowInstance();
+        ProjectUser projectUser = 
processService.queryProjectWithUserByProcessInstanceId(workflowInstance.getId());
+        processAlertManager.sendProcessTimeoutAlert(workflowInstance, 
projectUser);
+    }
+}
diff --git 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/events/WorkflowTriggerNextTaskEvent.java
 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/events/WorkflowTriggerNextTaskEvent.java
new file mode 100644
index 0000000000..45fe8cab46
--- /dev/null
+++ 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/events/WorkflowTriggerNextTaskEvent.java
@@ -0,0 +1,21 @@
+package org.apache.dolphinscheduler.server.master.events;
+
+import lombok.AllArgsConstructor;
+import lombok.Builder;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
+@Data
+@Builder
+@NoArgsConstructor
+@AllArgsConstructor
+public class WorkflowTriggerNextTaskEvent implements IWorkflowEvent, 
ISyncEvent {
+
+    private int workflowInstanceId;
+
+    /**
+     * The task name of the parent task, if it is the root task, the value is 
null
+     */
+    private String parentTaskNodeName;
+
+}
diff --git 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/events/WorkflowTriggerNextTaskEventOperator.java
 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/events/WorkflowTriggerNextTaskEventOperator.java
new file mode 100644
index 0000000000..50535c2b2c
--- /dev/null
+++ 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/events/WorkflowTriggerNextTaskEventOperator.java
@@ -0,0 +1,27 @@
+package org.apache.dolphinscheduler.server.master.events;
+
+import 
org.apache.dolphinscheduler.server.master.cache.IWorkflowExecuteRunnableRepository;
+import 
org.apache.dolphinscheduler.server.master.dag.IWorkflowExecutionRunnable;
+
+import lombok.extern.slf4j.Slf4j;
+
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Component;
+
+@Slf4j
+@Component
+public class WorkflowTriggerNextTaskEventOperator
+        implements
+            IWorkflowEventOperator<WorkflowTriggerNextTaskEvent> {
+
+    @Autowired
+    private IWorkflowExecuteRunnableRepository<IWorkflowExecutionRunnable> 
workflowExecuteRunnableRepository;
+
+    @Override
+    public void handleEvent(WorkflowTriggerNextTaskEvent event) {
+        IWorkflowExecutionRunnable workflowExecutionRunnable =
+                
workflowExecuteRunnableRepository.getByProcessInstanceId(event.getWorkflowInstanceId());
+        
workflowExecutionRunnable.getDagEngine().triggerNextTasks(event.getParentTaskNodeName());
+    }
+
+}
diff --git 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/events/WorkflowTriggeredEvent.java
 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/events/WorkflowTriggeredEvent.java
new file mode 100644
index 0000000000..0c5283733d
--- /dev/null
+++ 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/events/WorkflowTriggeredEvent.java
@@ -0,0 +1,16 @@
+package org.apache.dolphinscheduler.server.master.events;
+
+import lombok.AllArgsConstructor;
+import lombok.Builder;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
+@Data
+@Builder
+@NoArgsConstructor
+@AllArgsConstructor
+public class WorkflowTriggeredEvent implements IWorkflowEvent, IAsyncEvent {
+
+    private int workflowInstanceId;
+
+}
diff --git 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/events/WorkflowTriggeredEventOperator.java
 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/events/WorkflowTriggeredEventOperator.java
new file mode 100644
index 0000000000..e61e1f90c3
--- /dev/null
+++ 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/events/WorkflowTriggeredEventOperator.java
@@ -0,0 +1,44 @@
+package org.apache.dolphinscheduler.server.master.events;
+
+import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
+import org.apache.dolphinscheduler.dao.entity.ProjectUser;
+import 
org.apache.dolphinscheduler.server.master.dag.IWorkflowExecutionRunnable;
+import 
org.apache.dolphinscheduler.server.master.dag.WorkflowExecuteRunnableRepository;
+import 
org.apache.dolphinscheduler.server.master.metrics.ProcessInstanceMetrics;
+import org.apache.dolphinscheduler.service.alert.ListenerEventAlertManager;
+import org.apache.dolphinscheduler.service.process.ProcessService;
+
+import lombok.extern.slf4j.Slf4j;
+
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Component;
+
+@Slf4j
+@Component
+public class WorkflowTriggeredEventOperator
+        implements
+            IWorkflowEventOperator<WorkflowTriggeredEvent> {
+
+    @Autowired
+    private WorkflowExecuteRunnableRepository 
workflowExecuteRunnableRepository;
+
+    @Autowired
+    private ProcessService processService;
+
+    @Autowired
+    private ListenerEventAlertManager listenerEventAlertManager;
+
+    @Override
+    public void handleEvent(WorkflowTriggeredEvent event) {
+        int workflowInstanceId = event.getWorkflowInstanceId();
+        IWorkflowExecutionRunnable workflowExecutionRunnable =
+                
workflowExecuteRunnableRepository.getWorkflowExecutionRunnableById(workflowInstanceId);
+        Long workflowDefinitionCode =
+                
workflowExecutionRunnable.getWorkflowExecutionContext().getWorkflowDefinition().getCode();
+        
ProcessInstanceMetrics.incProcessInstanceByStateAndProcessDefinitionCode("submit",
 "" + workflowDefinitionCode);
+
+        ProcessInstance workflowInstance = 
workflowExecutionRunnable.getWorkflowInstance();
+        ProjectUser projectUser = 
processService.queryProjectWithUserByProcessInstanceId(workflowInstanceId);
+        
listenerEventAlertManager.publishProcessStartListenerEvent(workflowInstance, 
projectUser);
+    }
+}
diff --git 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/exception/TaskExecuteRunnableNotFoundException.java
 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/exception/TaskExecuteRunnableNotFoundException.java
new file mode 100644
index 0000000000..19a11b9639
--- /dev/null
+++ 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/exception/TaskExecuteRunnableNotFoundException.java
@@ -0,0 +1,12 @@
+package org.apache.dolphinscheduler.server.master.exception;
+
+public class TaskExecuteRunnableNotFoundException extends RuntimeException {
+
+    public TaskExecuteRunnableNotFoundException(Integer workflowInstanceId) {
+        super("WorkflowExecuteRunnable not found: [id=" + workflowInstanceId + 
"]");
+    }
+
+    public TaskExecuteRunnableNotFoundException(String workflowInstanceName) {
+        super("WorkflowExecuteRunnable not found: [name=" + 
workflowInstanceName + "]");
+    }
+}
diff --git 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/exception/TaskExecutionContextCreateException.java
 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/exception/TaskExecutionContextCreateException.java
deleted file mode 100644
index ac37c94438..0000000000
--- 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/exception/TaskExecutionContextCreateException.java
+++ /dev/null
@@ -1,26 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.dolphinscheduler.server.master.exception;
-
-public class TaskExecutionContextCreateException extends MasterException {
-
-    public TaskExecutionContextCreateException(String message) {
-        super(message);
-    }
-
-}
diff --git 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/exception/WorkflowExecuteRunnableNotFoundException.java
 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/exception/WorkflowExecuteRunnableNotFoundException.java
new file mode 100644
index 0000000000..3ded58f3c6
--- /dev/null
+++ 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/exception/WorkflowExecuteRunnableNotFoundException.java
@@ -0,0 +1,13 @@
+package org.apache.dolphinscheduler.server.master.exception;
+
+public class WorkflowExecuteRunnableNotFoundException extends RuntimeException 
{
+
+    public WorkflowExecuteRunnableNotFoundException(Integer 
workflowInstanceId) {
+        super("WorkflowExecuteRunnable not found: [id=" + workflowInstanceId + 
"]");
+    }
+
+    public WorkflowExecuteRunnableNotFoundException(String 
workflowInstanceName) {
+        super("WorkflowExecuteRunnable not found: [name=" + 
workflowInstanceName + "]");
+    }
+
+}
diff --git 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/DefaultTaskExecuteRunnable.java
 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/DefaultTaskExecuteRunnable.java
index c1b13717bd..a1e60e0a6e 100644
--- 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/DefaultTaskExecuteRunnable.java
+++ 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/DefaultTaskExecuteRunnable.java
@@ -28,8 +28,7 @@ public class DefaultTaskExecuteRunnable extends 
PriorityDelayTaskExecuteRunnable
 
     private final TaskExecuteRunnableOperatorManager 
taskExecuteRunnableOperatorManager;
 
-    public DefaultTaskExecuteRunnable(ProcessInstance workflowInstance,
-                                      TaskInstance taskInstance,
+    public DefaultTaskExecuteRunnable(ProcessInstance workflowInstance, 
TaskInstance taskInstance,
                                       TaskExecutionContext 
taskExecutionContext,
                                       TaskExecuteRunnableOperatorManager 
taskExecuteRunnableOperatorManager) {
         super(workflowInstance, taskInstance, taskExecutionContext);
diff --git 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/execute/DefaultTaskExecuteRunnableFactory.java
 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/execute/DefaultTaskExecuteRunnableFactory.java
index ab749b5861..6c1a4e8569 100644
--- 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/execute/DefaultTaskExecuteRunnableFactory.java
+++ 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/execute/DefaultTaskExecuteRunnableFactory.java
@@ -19,8 +19,6 @@ package 
org.apache.dolphinscheduler.server.master.runner.execute;
 
 import org.apache.dolphinscheduler.dao.entity.TaskInstance;
 import 
org.apache.dolphinscheduler.server.master.cache.ProcessInstanceExecCacheManager;
-import 
org.apache.dolphinscheduler.server.master.exception.TaskExecuteRunnableCreateException;
-import 
org.apache.dolphinscheduler.server.master.exception.TaskExecutionContextCreateException;
 import 
org.apache.dolphinscheduler.server.master.runner.DefaultTaskExecuteRunnable;
 import 
org.apache.dolphinscheduler.server.master.runner.TaskExecuteRunnableFactory;
 import 
org.apache.dolphinscheduler.server.master.runner.TaskExecutionContextFactory;
@@ -46,17 +44,13 @@ public class DefaultTaskExecuteRunnableFactory implements 
TaskExecuteRunnableFac
     private TaskExecuteRunnableOperatorManager 
taskExecuteRunnableOperatorManager;
 
     @Override
-    public DefaultTaskExecuteRunnable createTaskExecuteRunnable(TaskInstance 
taskInstance) throws TaskExecuteRunnableCreateException {
+    public DefaultTaskExecuteRunnable createTaskExecuteRunnable(TaskInstance 
taskInstance) {
         WorkflowExecuteRunnable workflowExecuteRunnable =
                 
processInstanceExecCacheManager.getByProcessInstanceId(taskInstance.getProcessInstanceId());
-        try {
-            return new DefaultTaskExecuteRunnable(
-                    
workflowExecuteRunnable.getWorkflowExecuteContext().getWorkflowInstance(),
-                    taskInstance,
-                    
taskExecutionContextFactory.createTaskExecutionContext(taskInstance),
-                    taskExecuteRunnableOperatorManager);
-        } catch (TaskExecutionContextCreateException ex) {
-            throw new TaskExecuteRunnableCreateException("Create 
DefaultTaskExecuteRunnable failed", ex);
-        }
+        return new DefaultTaskExecuteRunnable(
+                
workflowExecuteRunnable.getWorkflowExecuteContext().getWorkflowInstance(),
+                taskInstance,
+                
taskExecutionContextFactory.createTaskExecutionContext(taskInstance),
+                taskExecuteRunnableOperatorManager);
     }
 }

Reply via email to