This is an automated email from the ASF dual-hosted git repository.

wenjun pushed a commit to branch dev_wenjun_refactorMaster
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git

commit aed4e80a268727382ad6828ee7ac64a62608cf03
Author: Wenjun Ruan <[email protected]>
AuthorDate: Tue Mar 5 15:48:00 2024 +0800

    Refactor Master
---
 .../server/master/dag/DAGEngine.java               | 87 ++++++++++++++++++++
 .../server/master/dag/IDAGEngine.java              | 42 ++++++++++
 .../server/master/dag/IDAGEngineBuilder.java       |  7 ++
 .../master/dag/IEventfulExecutionRunnable.java     | 11 +++
 .../server/master/dag/IWorkflowDAG.java            | 23 ++++++
 .../server/master/dag/IWorkflowDAGBuilder.java     |  7 ++
 .../server/master/dag/IWorkflowEngine.java         | 49 ++++++++++++
 .../server/master/dag/IWorkflowExecutionDAG.java   | 16 ++++
 .../master/dag/IWorkflowExecutionDAGBuilder.java   |  6 ++
 .../master/dag/IWorkflowExecutionRunnable.java     | 68 ++++++++++++++++
 .../dag/IWorkflowExecutionRunnableBuilder.java     |  9 +++
 .../dag/TaskExecutionRunnableRepository.java       | 25 ++++++
 .../server/master/dag/TaskIdentify.java            |  5 ++
 .../master/dag/TaskTriggerConditionChecker.java    |  7 ++
 .../server/master/dag/WorkflowEngine.java          | 75 +++++++++++++++++
 .../master/dag/WorkflowExecuteRunnableFactory.java | 61 ++++++++++++++
 .../dag/WorkflowExecuteRunnableRepository.java     | 28 +++++++
 .../master/dag/WorkflowExecutionContext.java       | 21 +++++
 .../master/dag/WorkflowExecutionRunnable.java      | 64 +++++++++++++++
 .../server/master/dag/WorkflowIdentify.java        | 18 +++++
 .../server/master/events/EventDispatcher.java      | 53 ++++++++++++
 .../server/master/events/EventEngine.java          | 93 ++++++++++++++++++++++
 .../server/master/events/EventFirer.java           | 78 ++++++++++++++++++
 .../server/master/events/EventOperatorManager.java | 19 +++++
 .../server/master/events/EventRepository.java      | 36 +++++++++
 .../server/master/events/IAsyncEvent.java          |  7 ++
 .../server/master/events/IEvent.java               |  5 ++
 .../server/master/events/IEventDispatcher.java     | 15 ++++
 .../server/master/events/IEventFirer.java          | 19 +++++
 .../server/master/events/IEventOperator.java       | 15 ++++
 .../master/events/IEventOperatorManager.java       | 16 ++++
 .../server/master/events/IEventRepository.java     | 16 ++++
 .../server/master/events/ISyncEvent.java           |  4 +
 .../server/master/events/ITaskEvent.java           |  9 +++
 .../server/master/events/ITaskEventOperator.java   |  5 ++
 .../server/master/events/IWorkflowEvent.java       | 12 +++
 .../master/events/IWorkflowEventOperator.java      |  6 ++
 .../master/events/TaskLogSendToRemoteEvent.java    | 19 +++++
 .../events/TaskLogSendToRemoteEventOperator.java   | 21 +++++
 .../server/master/events/TaskOperationEvent.java   | 18 +++++
 .../master/events/TaskOperationEventOperator.java  | 48 +++++++++++
 .../server/master/events/TaskOperationType.java    |  8 ++
 .../server/master/events/TaskSuccessEvent.java     | 16 ++++
 .../master/events/TaskSuccessEventOperator.java    | 30 +++++++
 .../server/master/events/WorkflowFailedEvent.java  | 18 +++++
 .../master/events/WorkflowFailedEventOperator.java | 43 ++++++++++
 .../master/events/WorkflowFinalizeEvent.java       | 15 ++++
 .../events/WorkflowFinalizeEventOperator.java      | 26 ++++++
 .../server/master/events/WorkflowFinishEvent.java  | 20 +++++
 .../master/events/WorkflowOperationEvent.java      | 24 ++++++
 .../events/WorkflowOperationEventOperator.java     | 68 ++++++++++++++++
 .../master/events/WorkflowOperationType.java       | 10 +++
 .../server/master/events/WorkflowTimeoutEvent.java | 16 ++++
 .../events/WorkflowTimeoutEventOperator.java       | 43 ++++++++++
 .../events/WorkflowTriggerNextTaskEvent.java       | 21 +++++
 .../WorkflowTriggerNextTaskEventOperator.java      | 27 +++++++
 .../master/events/WorkflowTriggeredEvent.java      | 16 ++++
 .../events/WorkflowTriggeredEventOperator.java     | 44 ++++++++++
 .../TaskExecuteRunnableNotFoundException.java      | 12 +++
 .../TaskExecutionContextCreateException.java       | 26 ------
 .../WorkflowExecuteRunnableNotFoundException.java  | 13 +++
 61 files changed, 1613 insertions(+), 26 deletions(-)

diff --git 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dag/DAGEngine.java
 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dag/DAGEngine.java
new file mode 100644
index 0000000000..4748fb5084
--- /dev/null
+++ 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dag/DAGEngine.java
@@ -0,0 +1,87 @@
+package org.apache.dolphinscheduler.server.master.dag;
+
+import org.apache.dolphinscheduler.server.master.events.EventRepository;
+import org.apache.dolphinscheduler.server.master.events.TaskOperationEvent;
+import org.apache.dolphinscheduler.server.master.events.TaskOperationType;
+import org.apache.dolphinscheduler.server.master.runner.TaskExecuteRunnable;
+import 
org.apache.dolphinscheduler.server.master.runner.TaskExecuteRunnableFactory;
+
+import java.util.List;
+
+import lombok.AllArgsConstructor;
+import lombok.Builder;
+import lombok.SneakyThrows;
+import lombok.extern.slf4j.Slf4j;
+
+@Slf4j
+@Builder
+@AllArgsConstructor
+public class DAGEngine implements IDAGEngine {
+
+    private final IWorkflowExecutionDAG workflowExecutionDAG;
+
+    private final List<TaskTriggerConditionChecker> 
taskTriggerConditionCheckers;
+
+    private final TaskExecuteRunnableFactory<TaskExecuteRunnable> 
taskExecuteRunnableFactory;
+
+    private final EventRepository eventRepository;
+
+    @Override
+    public void triggerNextTasks(String parentTaskNodeName) {
+        
workflowExecutionDAG.getWorkflowDAG().getPostNodeNames(parentTaskNodeName).forEach(this::triggerTask);
+    }
+
+    @Override
+    @SneakyThrows
+    public void triggerTask(String taskName) {
+        for (TaskTriggerConditionChecker taskTriggerConditionChecker : 
taskTriggerConditionCheckers) {
+            if (!taskTriggerConditionChecker.taskCanTrigger(taskName)) {
+                return;
+            }
+        }
+        // todo: create Task ExecutionRunnable
+        TaskExecuteRunnable taskExecuteRunnable = 
taskExecuteRunnableFactory.createTaskExecuteRunnable(null);
+        TaskOperationEvent taskOperationEvent =
+                
TaskOperationEvent.builder().workflowInstanceId(taskExecuteRunnable.getWorkflowInstance().getId())
+                        
.taskInstanceId(taskExecuteRunnable.getTaskInstance().getId())
+                        .taskOperationType(TaskOperationType.DISPATCH).build();
+        eventRepository.storeEventToTail(taskOperationEvent);
+        workflowExecutionDAG.markTaskSubmitted(taskExecuteRunnable);
+
+    }
+
+    @Override
+    public void pauseTask(Integer taskInstanceId) {
+        TaskExecuteRunnable taskExecutionRunnable = 
workflowExecutionDAG.getTaskExecutionRunnableById(taskInstanceId);
+        if (taskExecutionRunnable == null) {
+            log.error("Cannot find the ITaskExecutionRunnable for: {}", 
taskInstanceId);
+            return;
+        }
+
+        TaskOperationEvent taskOperationEvent = TaskOperationEvent.builder()
+                
.workflowInstanceId(taskExecutionRunnable.getTaskInstance().getProcessInstanceId())
+                
.taskInstanceId(taskExecutionRunnable.getTaskInstance().getId())
+                .taskOperationType(TaskOperationType.PAUSE).build();
+        eventRepository.storeEventToTail(taskOperationEvent);
+    }
+
+    @Override
+    public void killTask(Integer taskInstanceId) {
+        TaskExecuteRunnable taskExecutionRunnable = 
workflowExecutionDAG.getTaskExecutionRunnableById(taskInstanceId);
+        if (taskExecutionRunnable == null) {
+            log.error("Cannot find the ITaskExecutionRunnable for: {}", 
taskInstanceId);
+            return;
+        }
+
+        TaskOperationEvent taskOperationEvent =
+                
TaskOperationEvent.builder().workflowInstanceId(taskExecutionRunnable.getWorkflowInstance().getId())
+                        
.taskInstanceId(taskExecutionRunnable.getTaskInstance().getId())
+                        .taskOperationType(TaskOperationType.KILL).build();
+        eventRepository.storeEventToTail(taskOperationEvent);
+    }
+
+    @Override
+    public IWorkflowExecutionDAG getWorkflowExecutionDAG() {
+        return workflowExecutionDAG;
+    }
+}
diff --git 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dag/IDAGEngine.java
 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dag/IDAGEngine.java
new file mode 100644
index 0000000000..5e3b651872
--- /dev/null
+++ 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dag/IDAGEngine.java
@@ -0,0 +1,42 @@
+package org.apache.dolphinscheduler.server.master.dag;
+
+/**
+ * The IDAGEngine is responsible for triggering, killing, pausing, and 
finalizing task in {@link IWorkflowExecutionDAG}.
+ * <p>All DAG operation should directly use the method in IDAGEngine, new 
{@link IWorkflowExecutionDAG} should be triggered by new IDAGEngine.
+ */
+public interface IDAGEngine {
+
+    /**
+     * Trigger the tasks which are post of the given task.
+     * <P> If there are no task after the given taskNode, will try to finish 
the WorkflowExecutionRunnable.
+     * <p> If the
+     *
+     * @param parentTaskNodeName the parent task name
+     */
+    void triggerNextTasks(String parentTaskNodeName);
+
+    /**
+     * Trigger the given task
+     *
+     * @param taskName task name
+     */
+    void triggerTask(String taskName);
+
+    /**
+     * Pause the given task.
+     */
+    void pauseTask(Integer taskId);
+
+    /**
+     * Kill the given task.
+     */
+    void killTask(Integer taskId);
+
+    /**
+     * Get {@link IWorkflowExecutionDAG} belong to the Engine.
+     *
+     * @return workflow execution DAG.
+     */
+    IWorkflowExecutionDAG getWorkflowExecutionDAG();
+
+}
diff --git 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dag/IDAGEngineBuilder.java
 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dag/IDAGEngineBuilder.java
new file mode 100644
index 0000000000..8e713d1274
--- /dev/null
+++ 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dag/IDAGEngineBuilder.java
@@ -0,0 +1,7 @@
+package org.apache.dolphinscheduler.server.master.dag;
+
+public interface IDAGEngineBuilder {
+
+    IDAGEngine buildDAGEngine(IWorkflowExecutionDAG workflowExecutionDAG);
+
+}
diff --git 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dag/IEventfulExecutionRunnable.java
 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dag/IEventfulExecutionRunnable.java
new file mode 100644
index 0000000000..dcc87e050d
--- /dev/null
+++ 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dag/IEventfulExecutionRunnable.java
@@ -0,0 +1,11 @@
+package org.apache.dolphinscheduler.server.master.dag;
+
+import org.apache.dolphinscheduler.server.master.events.IEvent;
+import org.apache.dolphinscheduler.server.master.events.IEventRepository;
+
+public interface IEventfulExecutionRunnable {
+
+    default IEventRepository<IEvent> getEventRepository() {
+        return null;
+    }
+}
diff --git 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dag/IWorkflowDAG.java
 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dag/IWorkflowDAG.java
new file mode 100644
index 0000000000..08a69e41ab
--- /dev/null
+++ 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dag/IWorkflowDAG.java
@@ -0,0 +1,23 @@
+package org.apache.dolphinscheduler.server.master.dag;
+
+import java.util.List;
+
+public interface IWorkflowDAG {
+
+    /**
+     * Return the post task name of given parentTaskName.
+     *
+     * @param parentTaskName parent task name, can be null.
+     * @return post task name list, sort by priority.
+     */
+    List<String> getPostNodeNames(String parentTaskName);
+
+    /**
+     * Get the pre task name of given taskName
+     *
+     * @param taskName task name can be null.
+     * @return parent task name list.
+     */
+    List<String> getParentNodeNames(String taskName);
+
+}
diff --git 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dag/IWorkflowDAGBuilder.java
 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dag/IWorkflowDAGBuilder.java
new file mode 100644
index 0000000000..3b95fd07b1
--- /dev/null
+++ 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dag/IWorkflowDAGBuilder.java
@@ -0,0 +1,7 @@
+package org.apache.dolphinscheduler.server.master.dag;
+
+public interface IWorkflowDAGBuilder {
+
+    IWorkflowDAG buildWorkflowDAG(WorkflowIdentify workflowIdentify);
+
+}
diff --git 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dag/IWorkflowEngine.java
 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dag/IWorkflowEngine.java
new file mode 100644
index 0000000000..b7c430cfe2
--- /dev/null
+++ 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dag/IWorkflowEngine.java
@@ -0,0 +1,49 @@
+package org.apache.dolphinscheduler.server.master.dag;
+
+import 
org.apache.dolphinscheduler.server.master.exception.WorkflowExecuteRunnableNotFoundException;
+
+/**
+ * The WorkflowEngine is responsible for starting, stopping, pausing, and 
finalizing workflows.
+ */
+public interface IWorkflowEngine {
+
+    /**
+     * Start the workflow engine.
+     */
+    void start();
+
+    /**
+     * Trigger a workflow to start.
+     *
+     * @param workflowExecuteRunnable the workflow to start
+     */
+    void triggerWorkflow(IWorkflowExecutionRunnable workflowExecuteRunnable);
+
+    /**
+     * Pause a workflow instance.
+     *
+     * @param workflowInstanceId the ID of the workflow to pause
+     * @throws WorkflowExecuteRunnableNotFoundException if the workflow is not 
found
+     */
+    void pauseWorkflow(Integer workflowInstanceId);
+
+    /**
+     * Kill a workflow instance.
+     *
+     * @param workflowInstanceId the ID of the workflow to stop
+     * @throws WorkflowExecuteRunnableNotFoundException if the workflow is not 
found
+     */
+    void killWorkflow(Integer workflowInstanceId);
+
+    /**
+     * Finalize a workflow instance. Once a workflow has been finalized, then 
it cannot receive new operation, and will be removed from memory.
+     *
+     * @param workflowInstanceId the ID of the workflow to finalize
+     */
+    void finalizeWorkflow(Integer workflowInstanceId);
+
+    /**
+     * Stop the workflow engine.
+     */
+    void stop();
+}
diff --git 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dag/IWorkflowExecutionDAG.java
 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dag/IWorkflowExecutionDAG.java
new file mode 100644
index 0000000000..ed94fcd0de
--- /dev/null
+++ 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dag/IWorkflowExecutionDAG.java
@@ -0,0 +1,16 @@
+package org.apache.dolphinscheduler.server.master.dag;
+
+import org.apache.dolphinscheduler.server.master.runner.TaskExecuteRunnable;
+
+import java.util.List;
+
+public interface IWorkflowExecutionDAG {
+
+    IWorkflowDAG getWorkflowDAG();
+
+    void markTaskSubmitted(TaskExecuteRunnable taskExecutionRunnable);
+
+    TaskExecuteRunnable getTaskExecutionRunnableById(Integer taskInstanceId);
+
+    List<TaskExecuteRunnable> getActiveTaskExecutionRunnables();
+}
diff --git 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dag/IWorkflowExecutionDAGBuilder.java
 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dag/IWorkflowExecutionDAGBuilder.java
new file mode 100644
index 0000000000..a13865aa54
--- /dev/null
+++ 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dag/IWorkflowExecutionDAGBuilder.java
@@ -0,0 +1,6 @@
+package org.apache.dolphinscheduler.server.master.dag;
+
+public interface IWorkflowExecutionDAGBuilder {
+
+    IWorkflowExecutionDAG buildWorkflowExecutionDAG(IWorkflowDAG workflowDAG);
+}
diff --git 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dag/IWorkflowExecutionRunnable.java
 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dag/IWorkflowExecutionRunnable.java
new file mode 100644
index 0000000000..bff326c12c
--- /dev/null
+++ 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dag/IWorkflowExecutionRunnable.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.server.master.dag;
+
+import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
+
+/**
+ * The IWorkflowExecuteRunnable represent a running workflow instance, it is 
responsible for operate the workflow instance. e.g. start, kill, pause, timeout.
+ */
+public interface IWorkflowExecutionRunnable extends IEventfulExecutionRunnable 
{
+
+    /**
+     * Start the workflow instance.
+     */
+    void start();
+
+    /**
+     * Kill the workflow instance.
+     */
+    void kill();
+
+    /**
+     * Pause the workflow instance.
+     */
+    void pause();
+
+    /**
+     * Get the workflow execution context.
+     *
+     * @return the workflow execution context
+     */
+    WorkflowExecutionContext getWorkflowExecutionContext();
+
+    /**
+     * Get the {@link IDAGEngine} which used to execute the dag of the 
workflow instance.
+     *
+     * @return dag engine.
+     */
+    IDAGEngine getDagEngine();
+
+    default Integer getWorkflowInstanceId() {
+        return getWorkflowExecutionContext().getWorkflowInstance().getId();
+    }
+
+    default String getWorkflowInstanceName() {
+        return getWorkflowExecutionContext().getWorkflowInstance().getName();
+    }
+
+    default ProcessInstance getWorkflowInstance() {
+        return getWorkflowExecutionContext().getWorkflowInstance();
+    }
+
+}
diff --git 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dag/IWorkflowExecutionRunnableBuilder.java
 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dag/IWorkflowExecutionRunnableBuilder.java
new file mode 100644
index 0000000000..85cee76474
--- /dev/null
+++ 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dag/IWorkflowExecutionRunnableBuilder.java
@@ -0,0 +1,9 @@
+package org.apache.dolphinscheduler.server.master.dag;
+
+import 
org.apache.dolphinscheduler.server.master.runner.IWorkflowExecutionContext;
+
+public interface IWorkflowExecutionRunnableBuilder {
+
+    IWorkflowExecutionRunnable 
buildWorkflowExecutionRunnable(IWorkflowExecutionContext 
workflowExecutionContext);
+
+}
diff --git 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dag/TaskExecutionRunnableRepository.java
 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dag/TaskExecutionRunnableRepository.java
new file mode 100644
index 0000000000..7ebf008568
--- /dev/null
+++ 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dag/TaskExecutionRunnableRepository.java
@@ -0,0 +1,25 @@
+package org.apache.dolphinscheduler.server.master.dag;
+
+import org.apache.dolphinscheduler.server.master.runner.TaskExecuteRunnable;
+
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.stream.Collectors;
+
+public class TaskExecutionRunnableRepository {
+
+    private Map<Integer, TaskExecuteRunnable> taskExecuteRunnableMap = new 
ConcurrentHashMap<>();
+
+    public TaskExecuteRunnable getTaskExecutionRunnableById(Integer 
taskInstanceId) {
+        return taskExecuteRunnableMap.get(taskInstanceId);
+    }
+
+    public List<TaskExecuteRunnable> getActiveTaskExecutionRunnables() {
+        return 
taskExecuteRunnableMap.values().stream().filter(taskExecuteRunnable -> {
+            // filter the status is not finished
+            return 
!taskExecuteRunnable.getTaskInstance().getState().isFinished();
+        }).collect(Collectors.toList());
+    }
+
+}
diff --git 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dag/TaskIdentify.java
 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dag/TaskIdentify.java
new file mode 100644
index 0000000000..e3e7420ab4
--- /dev/null
+++ 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dag/TaskIdentify.java
@@ -0,0 +1,5 @@
+package org.apache.dolphinscheduler.server.master.dag;
+
+public class TaskIdentify {
+
+}
diff --git 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dag/TaskTriggerConditionChecker.java
 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dag/TaskTriggerConditionChecker.java
new file mode 100644
index 0000000000..daf652e0b8
--- /dev/null
+++ 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dag/TaskTriggerConditionChecker.java
@@ -0,0 +1,7 @@
+package org.apache.dolphinscheduler.server.master.dag;
+
+public interface TaskTriggerConditionChecker {
+
+    boolean taskCanTrigger(String taskName);
+
+}
diff --git 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dag/WorkflowEngine.java
 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dag/WorkflowEngine.java
new file mode 100644
index 0000000000..eb8bc0cfa1
--- /dev/null
+++ 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dag/WorkflowEngine.java
@@ -0,0 +1,75 @@
+package org.apache.dolphinscheduler.server.master.dag;
+
+import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
+import org.apache.dolphinscheduler.server.master.events.WorkflowOperationEvent;
+import org.apache.dolphinscheduler.server.master.events.WorkflowOperationType;
+import 
org.apache.dolphinscheduler.server.master.exception.WorkflowExecuteRunnableNotFoundException;
+
+import lombok.extern.slf4j.Slf4j;
+
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Component;
+
+@Slf4j
+@Component
+public class WorkflowEngine implements IWorkflowEngine {
+
+    @Autowired
+    private WorkflowExecuteRunnableRepository 
workflowExecuteRunnableRepository;
+
+    @Override
+    public void start() {
+        log.info("{} started", getClass().getName());
+    }
+
+    @Override
+    public void triggerWorkflow(IWorkflowExecutionRunnable 
workflowExecuteRunnable) {
+        ProcessInstance workflowInstance = 
workflowExecuteRunnable.getWorkflowExecutionContext().getWorkflowInstance();
+        Integer workflowInstanceId = workflowInstance.getId();
+        log.info("Triggering WorkflowExecutionRunnable: {}", 
workflowInstance.getName());
+        
workflowExecuteRunnableRepository.storeWorkflowExecutionRunnable(workflowExecuteRunnable);
+        workflowExecuteRunnable.getEventRepository()
+                
.storeEventToTail(WorkflowOperationEvent.of(workflowInstanceId, 
WorkflowOperationType.TRIGGER));
+    }
+
+    @Override
+    public void pauseWorkflow(Integer workflowInstanceId) {
+        IWorkflowExecutionRunnable workflowExecuteRunnable =
+                
workflowExecuteRunnableRepository.getWorkflowExecutionRunnableById(workflowInstanceId);
+        if (workflowExecuteRunnable == null) {
+            throw new 
WorkflowExecuteRunnableNotFoundException(workflowInstanceId);
+        }
+        log.info("Pausing WorkflowExecutionRunnable: {}", 
workflowExecuteRunnable.getWorkflowInstanceName());
+        workflowExecuteRunnable.getEventRepository()
+                
.storeEventToTail(WorkflowOperationEvent.of(workflowInstanceId, 
WorkflowOperationType.PAUSE));
+    }
+
+    @Override
+    public void killWorkflow(Integer workflowInstanceId) {
+        IWorkflowExecutionRunnable workflowExecuteRunnable =
+                
workflowExecuteRunnableRepository.getWorkflowExecutionRunnableById(workflowInstanceId);
+        if (workflowExecuteRunnable == null) {
+            throw new 
WorkflowExecuteRunnableNotFoundException(workflowInstanceId);
+        }
+        log.info("Killing WorkflowExecutionRunnable: {}", 
workflowExecuteRunnable.getWorkflowInstanceName());
+        workflowExecuteRunnable.getEventRepository()
+                
.storeEventToTail(WorkflowOperationEvent.of(workflowInstanceId, 
WorkflowOperationType.KILL));
+    }
+
+    @Override
+    public void finalizeWorkflow(Integer workflowInstanceId) {
+        IWorkflowExecutionRunnable workflowExecutionRunnable =
+                
workflowExecuteRunnableRepository.getWorkflowExecutionRunnableById(workflowInstanceId);
+        if (workflowExecutionRunnable == null) {
+            return;
+        }
+        log.info("Finalizing WorkflowExecutionRunnable: {}", 
workflowExecutionRunnable.getWorkflowInstanceName());
+        
workflowExecuteRunnableRepository.removeWorkflowExecutionRunnable(workflowInstanceId);
+    }
+
+    @Override
+    public void stop() {
+        log.info("{} stopped", getClass().getName());
+    }
+
+}
diff --git 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dag/WorkflowExecuteRunnableFactory.java
 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dag/WorkflowExecuteRunnableFactory.java
new file mode 100644
index 0000000000..b1503ed01a
--- /dev/null
+++ 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dag/WorkflowExecuteRunnableFactory.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.server.master.dag;
+
+import org.apache.dolphinscheduler.dao.entity.Command;
+import org.apache.dolphinscheduler.dao.repository.ProcessInstanceDao;
+import org.apache.dolphinscheduler.dao.repository.TaskInstanceDao;
+import org.apache.dolphinscheduler.server.master.config.MasterConfig;
+import 
org.apache.dolphinscheduler.server.master.exception.WorkflowCreateException;
+import 
org.apache.dolphinscheduler.server.master.runner.IWorkflowExecuteContext;
+import 
org.apache.dolphinscheduler.server.master.runner.StateWheelExecuteThread;
+import 
org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteContextFactory;
+import 
org.apache.dolphinscheduler.server.master.runner.execute.DefaultTaskExecuteRunnableFactory;
+import 
org.apache.dolphinscheduler.server.master.runner.taskgroup.TaskGroupCoordinator;
+import org.apache.dolphinscheduler.service.alert.ListenerEventAlertManager;
+import org.apache.dolphinscheduler.service.alert.ProcessAlertManager;
+import org.apache.dolphinscheduler.service.command.CommandService;
+import org.apache.dolphinscheduler.service.expand.CuringParamsService;
+import org.apache.dolphinscheduler.service.process.ProcessService;
+
+import java.util.Optional;
+
+import lombok.extern.slf4j.Slf4j;
+
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Component;
+
+@Slf4j
+@Component
+public class WorkflowExecuteRunnableFactory {
+
+    @Autowired
+    private WorkflowExecuteContextFactory workflowExecuteContextFactory;
+
+    public Optional<WorkflowExecutionRunnable> 
createWorkflowExecuteRunnable(Command command) throws WorkflowCreateException {
+        try {
+            Optional<IWorkflowExecuteContext> 
workflowExecuteRunnableContextOptional = 
workflowExecuteContextFactory.createWorkflowExecuteRunnableContext(command);
+            return 
workflowExecuteRunnableContextOptional.map(iWorkflowExecuteContext -> {
+                return WorkflowExecutionRunnable.builder().build();
+            });
+        } catch (Exception ex) {
+            throw new WorkflowCreateException("Create WorkflowExecuteRunnable 
failed", ex);
+        }
+    }
+
+}
diff --git 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dag/WorkflowExecuteRunnableRepository.java
 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dag/WorkflowExecuteRunnableRepository.java
new file mode 100644
index 0000000000..acf61d46ad
--- /dev/null
+++ 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dag/WorkflowExecuteRunnableRepository.java
@@ -0,0 +1,28 @@
+package org.apache.dolphinscheduler.server.master.dag;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+import lombok.extern.slf4j.Slf4j;
+
+import org.springframework.stereotype.Component;
+
+@Slf4j
+@Component
+public class WorkflowExecuteRunnableRepository {
+
+    private final Map<Integer, IWorkflowExecutionRunnable> 
workflowExecutionRunnableMap = new ConcurrentHashMap<>();
+
+    public void storeWorkflowExecutionRunnable(IWorkflowExecutionRunnable 
workflowExecutionRunnable) {
+        
workflowExecutionRunnableMap.put(workflowExecutionRunnable.getWorkflowInstanceId(),
 workflowExecutionRunnable);
+    }
+
+    public IWorkflowExecutionRunnable getWorkflowExecutionRunnableById(Integer 
workflowInstanceId) {
+        return workflowExecutionRunnableMap.get(workflowInstanceId);
+    }
+
+    public void removeWorkflowExecutionRunnable(Integer workflowInstanceId) {
+        workflowExecutionRunnableMap.remove(workflowInstanceId);
+    }
+
+}
diff --git 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dag/WorkflowExecutionContext.java
 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dag/WorkflowExecutionContext.java
new file mode 100644
index 0000000000..7df8cf9ad1
--- /dev/null
+++ 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dag/WorkflowExecutionContext.java
@@ -0,0 +1,21 @@
+package org.apache.dolphinscheduler.server.master.dag;
+
+import org.apache.dolphinscheduler.dao.entity.ProcessDefinition;
+import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
+
+import lombok.AllArgsConstructor;
+import lombok.Builder;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
+@Data
+@Builder
+@NoArgsConstructor
+@AllArgsConstructor
+public class WorkflowExecutionContext {
+
+    private ProcessDefinition workflowDefinition;
+
+    private ProcessInstance workflowInstance;
+
+}
diff --git 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dag/WorkflowExecutionRunnable.java
 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dag/WorkflowExecutionRunnable.java
new file mode 100644
index 0000000000..aeda4ed5f7
--- /dev/null
+++ 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dag/WorkflowExecutionRunnable.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.server.master.dag;
+
+import org.apache.dolphinscheduler.server.master.events.IEvent;
+import org.apache.dolphinscheduler.server.master.events.IEventRepository;
+import org.apache.dolphinscheduler.server.master.events.WorkflowTriggeredEvent;
+
+import lombok.AllArgsConstructor;
+import lombok.Builder;
+import lombok.Getter;
+import lombok.NoArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+
+@Slf4j
+@Builder
+@AllArgsConstructor
+@NoArgsConstructor
+public class WorkflowExecutionRunnable implements IWorkflowExecutionRunnable {
+
+    @Getter
+    private WorkflowExecutionContext workflowExecutionContext;
+
+    @Getter
+    private IDAGEngine dagEngine;
+
+    private IEventRepository<IEvent> eventRepository;
+
+    public void start() {
+        Integer workflowInstanceId = getWorkflowInstanceId();
+        eventRepository.storeEventToTail(new 
WorkflowTriggeredEvent(workflowInstanceId));
+        dagEngine.triggerNextTasks(null);
+    }
+
+    @Override
+    public void pause() {
+        
dagEngine.getWorkflowExecutionDAG().getActiveTaskExecutionRunnables().stream().forEach(taskExecuteRunnable
 -> {
+            dagEngine.pauseTask(taskExecuteRunnable.getTaskInstance().getId());
+        });
+    }
+
+    @Override
+    public void kill() {
+        
dagEngine.getWorkflowExecutionDAG().getActiveTaskExecutionRunnables().forEach(taskExecuteRunnable
 -> {
+            dagEngine.killTask(taskExecuteRunnable.getTaskInstance().getId());
+        });
+    }
+
+}
diff --git 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dag/WorkflowIdentify.java
 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dag/WorkflowIdentify.java
new file mode 100644
index 0000000000..84a8b94bb2
--- /dev/null
+++ 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dag/WorkflowIdentify.java
@@ -0,0 +1,18 @@
+package org.apache.dolphinscheduler.server.master.dag;
+
+import lombok.AllArgsConstructor;
+import lombok.Builder;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
+@Data
+@Builder
+@NoArgsConstructor
+@AllArgsConstructor
+public class WorkflowIdentify {
+
+    private long workflowCode;
+
+    private int workflowVersion;
+
+}
diff --git 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/events/EventDispatcher.java
 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/events/EventDispatcher.java
new file mode 100644
index 0000000000..948cb476d7
--- /dev/null
+++ 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/events/EventDispatcher.java
@@ -0,0 +1,53 @@
+package org.apache.dolphinscheduler.server.master.events;
+
+import 
org.apache.dolphinscheduler.server.master.dag.IWorkflowExecutionRunnable;
+import 
org.apache.dolphinscheduler.server.master.dag.WorkflowExecuteRunnableRepository;
+
+import lombok.extern.slf4j.Slf4j;
+
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Component;
+
+@Slf4j
+@Component
+public class EventDispatcher implements IEventDispatcher<IEvent> {
+
+    @Autowired
+    private EventEngine eventEngine;
+
+    @Autowired
+    private WorkflowExecuteRunnableRepository 
workflowExecuteRunnableRepository;
+
+    @Override
+    public void start() {
+        log.info(getClass().getName() + " started");
+    }
+
+    @Override
+    public void dispatchEvent(IEvent iEvent) {
+        Integer workflowInstanceId;
+        if (iEvent instanceof IWorkflowEvent) {
+            workflowInstanceId = ((IWorkflowEvent) 
iEvent).getWorkflowInstanceId();
+        } else if (iEvent instanceof ITaskEvent) {
+            workflowInstanceId = ((ITaskEvent) iEvent).getWorkflowInstanceId();
+        } else {
+            throw new IllegalArgumentException("Unsupported event type: " + 
iEvent.getClass().getName());
+        }
+
+        IWorkflowExecutionRunnable workflowExecuteRunnable =
+                
workflowExecuteRunnableRepository.getWorkflowExecutionRunnableById(workflowInstanceId);
+        if (workflowExecuteRunnable == null) {
+            log.error("Cannot find the IWorkflowExecutionRunnable for event: 
{}", iEvent);
+            return;
+        }
+        workflowExecuteRunnable.getEventRepository().storeEventToTail(iEvent);
+        log.debug("Success dispatch event {} to EventRepository", iEvent);
+        eventEngine.notify();
+    }
+
+    @Override
+    public void stop() {
+        log.info(getClass().getName() + " stopped");
+    }
+
+}
diff --git 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/events/EventEngine.java
 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/events/EventEngine.java
new file mode 100644
index 0000000000..1d37f4b19b
--- /dev/null
+++ 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/events/EventEngine.java
@@ -0,0 +1,93 @@
+package org.apache.dolphinscheduler.server.master.events;
+
+import org.apache.dolphinscheduler.common.thread.BaseDaemonThread;
+import org.apache.dolphinscheduler.common.thread.ThreadUtils;
+import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
+import org.apache.dolphinscheduler.plugin.task.api.utils.LogUtils;
+import 
org.apache.dolphinscheduler.server.master.cache.IWorkflowExecuteRunnableRepository;
+import 
org.apache.dolphinscheduler.server.master.dag.IWorkflowExecutionRunnable;
+
+import org.apache.commons.lang3.time.StopWatch;
+
+import java.util.Collection;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+
+import lombok.extern.slf4j.Slf4j;
+
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Component;
+
+@Slf4j
+@Component
+public class EventEngine extends BaseDaemonThread {
+
+    @Autowired
+    private IWorkflowExecuteRunnableRepository 
workflowExecuteRunnableRepository;
+
+    @Autowired
+    private EventFirer eventFirer;
+
+    private final Set<Integer> firingWorkflowInstanceIds = 
ConcurrentHashMap.newKeySet();
+
+    public EventEngine() {
+        super("EventEngine");
+    }
+
+    @Override
+    public synchronized void start() {
+        super.start();
+        log.info(getClass().getName() + " started");
+    }
+
+    @Override
+    public void run() {
+        for (;;) {
+            try {
+                StopWatch stopWatch = StopWatch.createStarted();
+                fireAllActiveEvents();
+                stopWatch.stop();
+                log.info("Fire all active events cost: {} ms", 
stopWatch.getTime());
+                this.wait(5_000);
+            } catch (Throwable throwable) {
+                log.error("Fire active event error", throwable);
+                ThreadUtils.sleep(3_000);
+            }
+        }
+    }
+
+    public void fireAllActiveEvents() {
+        Collection<IWorkflowExecutionRunnable> 
workflowExecutionRunnableCollection =
+                workflowExecuteRunnableRepository.getAll();
+        for (IWorkflowExecutionRunnable workflowExecutionRunnable : 
workflowExecutionRunnableCollection) {
+            ProcessInstance workflowInstance = 
workflowExecutionRunnable.getWorkflowInstance();
+            final Integer workflowInstanceId = workflowInstance.getId();
+            final String workflowInstanceName = workflowInstance.getName();
+            try {
+                LogUtils.setWorkflowInstanceIdMDC(workflowInstanceId);
+                if (firingWorkflowInstanceIds.contains(workflowInstanceId)) {
+                    log.debug("WorkflowExecutionRunnable: {} is already in 
firing", workflowInstanceName);
+                    return;
+                }
+                IEventRepository<IEvent> workflowEventRepository = 
workflowExecutionRunnable.getEventRepository();
+                firingWorkflowInstanceIds.add(workflowInstanceId);
+                eventFirer.fireActiveEvents(workflowEventRepository)
+                        .whenComplete((fireCount, ex) -> {
+                            
firingWorkflowInstanceIds.remove(workflowInstanceId);
+                            if (ex != null) {
+                                log.error("Fire event for 
WorkflowExecutionRunnable: {} error", workflowInstanceName,
+                                        ex);
+                            } else {
+                                if (fireCount > 0) {
+                                    log.info("Fire {} events for 
WorkflowExecutionRunnable: {} success", fireCount,
+                                            workflowInstanceName);
+                                }
+                            }
+                        });
+            } finally {
+                LogUtils.removeWorkflowInstanceIdMDC();
+            }
+        }
+    }
+
+}
diff --git 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/events/EventFirer.java
 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/events/EventFirer.java
new file mode 100644
index 0000000000..2782528a3f
--- /dev/null
+++ 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/events/EventFirer.java
@@ -0,0 +1,78 @@
+package org.apache.dolphinscheduler.server.master.events;
+
+import org.apache.dolphinscheduler.common.thread.ThreadUtils;
+import org.apache.dolphinscheduler.server.master.config.MasterConfig;
+import org.apache.dolphinscheduler.server.master.utils.ExceptionUtils;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ThreadPoolExecutor;
+
+import lombok.extern.slf4j.Slf4j;
+
+import org.springframework.stereotype.Component;
+
+@Slf4j
+@Component
+public class EventFirer implements IEventFirer<IEvent> {
+
+    private final IEventOperatorManager<IEvent> eventOperatorManager;
+
+    private final ThreadPoolExecutor eventFireThreadPool;
+
+    public EventFirer(IEventOperatorManager<IEvent> eventOperatorManager, 
MasterConfig masterConfig) {
+        this.eventOperatorManager = eventOperatorManager;
+        this.eventFireThreadPool =
+                
ThreadUtils.newDaemonFixedThreadExecutor("EventFireThreadPool", 
masterConfig.getExecThreads());
+    }
+
+    @Override
+    public CompletableFuture<Integer> 
fireActiveEvents(IEventRepository<IEvent> eventRepository) {
+        if (eventRepository.getEventSize() == 0) {
+            return CompletableFuture.completedFuture(0);
+        }
+        return CompletableFuture.supplyAsync(() -> {
+            int fireCount = 0;
+            for (;;) {
+                IEvent event = eventRepository.poolEvent();
+                if (event == null) {
+                    break;
+                }
+
+                if (event instanceof IAsyncEvent) {
+                    fireAsyncEvent(event);
+                    fireCount++;
+                    continue;
+                }
+                try {
+                    fireSyncEvent(event);
+                    fireCount++;
+                } catch (Exception ex) {
+                    if (ExceptionUtils.isDatabaseConnectedFailedException(ex)) 
{
+                        // If the event is failed due to cannot connect to DB, 
we should retry it
+                        eventRepository.storeEventToHead(event);
+                    }
+                    throw ex;
+                }
+            }
+            return fireCount;
+        }, eventFireThreadPool);
+    }
+
+    private void fireAsyncEvent(IEvent event) {
+        CompletableFuture.runAsync(() -> {
+            log.info("Begin fire IAsyncEvent: {}", event);
+            eventOperatorManager.getEventOperator(event).handleEvent(event);
+            log.info("Success fire IAsyncEvent: {}", event);
+        }, eventFireThreadPool).exceptionally(ex -> {
+            log.error("Failed to fire IAsyncEvent: {}", event, ex);
+            return null;
+        });
+    }
+
+    private void fireSyncEvent(IEvent event) {
+        log.info("Begin fire SyncEvent: {}", event);
+        eventOperatorManager.getEventOperator(event).handleEvent(event);
+        log.info("Success fire SyncEvent: {}", event);
+    }
+
+}
diff --git 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/events/EventOperatorManager.java
 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/events/EventOperatorManager.java
new file mode 100644
index 0000000000..a7915a168b
--- /dev/null
+++ 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/events/EventOperatorManager.java
@@ -0,0 +1,19 @@
+package org.apache.dolphinscheduler.server.master.events;
+
+import lombok.extern.slf4j.Slf4j;
+
+import org.springframework.stereotype.Component;
+
+/**
+ * The event operator manager interface used to get {@link ITaskEventOperator}.
+ */
+@Slf4j
+@Component
+public class EventOperatorManager implements IEventOperatorManager<IEvent> {
+
+    @Override
+    public IEventOperator<IEvent> getEventOperator(IEvent event) {
+        return null;
+    }
+
+}
diff --git 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/events/EventRepository.java
 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/events/EventRepository.java
new file mode 100644
index 0000000000..5cf0d38ff7
--- /dev/null
+++ 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/events/EventRepository.java
@@ -0,0 +1,36 @@
+package org.apache.dolphinscheduler.server.master.events;
+
+import java.util.concurrent.LinkedBlockingDeque;
+
+import lombok.extern.slf4j.Slf4j;
+
+@Slf4j
+public class EventRepository implements IEventRepository<IEvent> {
+
+    protected final LinkedBlockingDeque<IEvent> eventQueue;
+
+    public EventRepository() {
+        this.eventQueue = new LinkedBlockingDeque<>();
+    }
+
+    @Override
+    public void storeEventToTail(IEvent event) {
+        eventQueue.offerLast(event);
+    }
+
+    @Override
+    public void storeEventToHead(IEvent event) {
+        eventQueue.offerFirst(event);
+    }
+
+    @Override
+    public IEvent poolEvent() {
+        return eventQueue.poll();
+    }
+
+    @Override
+    public int getEventSize() {
+        return eventQueue.size();
+    }
+
+}
diff --git 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/events/IAsyncEvent.java
 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/events/IAsyncEvent.java
new file mode 100644
index 0000000000..1ea4553554
--- /dev/null
+++ 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/events/IAsyncEvent.java
@@ -0,0 +1,7 @@
+package org.apache.dolphinscheduler.server.master.events;
+
+/**
+ * Mark the event as AsyncEvent, if the event is marked as AsyncEvent, the 
event will be handled asynchronously and we don't .
+ */
+public interface IAsyncEvent {
+}
diff --git 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/events/IEvent.java
 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/events/IEvent.java
new file mode 100644
index 0000000000..425ba97d49
--- /dev/null
+++ 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/events/IEvent.java
@@ -0,0 +1,5 @@
+package org.apache.dolphinscheduler.server.master.events;
+
+public interface IEvent {
+
+}
diff --git 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/events/IEventDispatcher.java
 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/events/IEventDispatcher.java
new file mode 100644
index 0000000000..e32f8bc619
--- /dev/null
+++ 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/events/IEventDispatcher.java
@@ -0,0 +1,15 @@
+package org.apache.dolphinscheduler.server.master.events;
+
+/**
+ * The event dispatcher interface used to dispatch event.
+ * Each event should be dispatched to the corresponding workflow event queue.
+ */
+public interface IEventDispatcher<E> {
+
+    void start();
+
+    void stop();
+
+    void dispatchEvent(E event);
+
+}
diff --git 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/events/IEventFirer.java
 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/events/IEventFirer.java
new file mode 100644
index 0000000000..576942089d
--- /dev/null
+++ 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/events/IEventFirer.java
@@ -0,0 +1,19 @@
+package org.apache.dolphinscheduler.server.master.events;
+
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * The event firer interface used to fire event.
+ *
+ * @param <E> event type
+ */
+public interface IEventFirer<E> {
+
+    /**
+     * Fire all active events in the event repository
+     *
+     * @return the count of fired success events
+     */
+    CompletableFuture<Integer> fireActiveEvents(IEventRepository<E> 
eventRepository);
+
+}
diff --git 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/events/IEventOperator.java
 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/events/IEventOperator.java
new file mode 100644
index 0000000000..ef767e35d0
--- /dev/null
+++ 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/events/IEventOperator.java
@@ -0,0 +1,15 @@
+package org.apache.dolphinscheduler.server.master.events;
+
+/**
+ * The event operator interface used to handle event.
+ */
+public interface IEventOperator<E> {
+
+    /**
+     * Handle the given event
+     *
+     * @param event event
+     */
+    void handleEvent(E event);
+
+}
diff --git 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/events/IEventOperatorManager.java
 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/events/IEventOperatorManager.java
new file mode 100644
index 0000000000..0670dc6d57
--- /dev/null
+++ 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/events/IEventOperatorManager.java
@@ -0,0 +1,16 @@
+package org.apache.dolphinscheduler.server.master.events;
+
+/**
+ * The event operator manager interface used to get event operator.
+ */
+public interface IEventOperatorManager<E> {
+
+    /**
+     * Get the {@link IEventOperator} for the given event.
+     *
+     * @param event event
+     * @return event operator for the given event
+     */
+    IEventOperator<E> getEventOperator(E event);
+
+}
diff --git 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/events/IEventRepository.java
 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/events/IEventRepository.java
new file mode 100644
index 0000000000..b6f3d284f5
--- /dev/null
+++ 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/events/IEventRepository.java
@@ -0,0 +1,16 @@
+package org.apache.dolphinscheduler.server.master.events;
+
+/**
+ * The event repository interface used to store event.
+ */
+public interface IEventRepository<E> {
+
+    void storeEventToTail(E event);
+
+    void storeEventToHead(E event);
+
+    E poolEvent();
+
+    int getEventSize();
+
+}
diff --git 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/events/ISyncEvent.java
 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/events/ISyncEvent.java
new file mode 100644
index 0000000000..95de44d0b8
--- /dev/null
+++ 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/events/ISyncEvent.java
@@ -0,0 +1,4 @@
+package org.apache.dolphinscheduler.server.master.events;
+
+public interface ISyncEvent {
+}
diff --git 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/events/ITaskEvent.java
 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/events/ITaskEvent.java
new file mode 100644
index 0000000000..37b2a99b15
--- /dev/null
+++ 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/events/ITaskEvent.java
@@ -0,0 +1,9 @@
+package org.apache.dolphinscheduler.server.master.events;
+
+public interface ITaskEvent extends IEvent {
+
+    Integer getWorkflowInstanceId();
+
+    Integer getTaskInstanceId();
+
+}
diff --git 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/events/ITaskEventOperator.java
 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/events/ITaskEventOperator.java
new file mode 100644
index 0000000000..d2588a7027
--- /dev/null
+++ 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/events/ITaskEventOperator.java
@@ -0,0 +1,5 @@
+package org.apache.dolphinscheduler.server.master.events;
+
+public interface ITaskEventOperator<E extends ITaskEvent> extends 
IEventOperator<E> {
+
+}
diff --git 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/events/IWorkflowEvent.java
 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/events/IWorkflowEvent.java
new file mode 100644
index 0000000000..a9dc0a31a5
--- /dev/null
+++ 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/events/IWorkflowEvent.java
@@ -0,0 +1,12 @@
+package org.apache.dolphinscheduler.server.master.events;
+
+public interface IWorkflowEvent extends IEvent {
+
+    /**
+     * The id of WorkflowInstance which the event is related to
+     *
+     * @return workflowInstanceId, shouldn't be null
+     */
+    Integer getWorkflowInstanceId();
+
+}
diff --git 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/events/IWorkflowEventOperator.java
 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/events/IWorkflowEventOperator.java
new file mode 100644
index 0000000000..5d70dd1859
--- /dev/null
+++ 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/events/IWorkflowEventOperator.java
@@ -0,0 +1,6 @@
+package org.apache.dolphinscheduler.server.master.events;
+
+public interface IWorkflowEventOperator<E extends IWorkflowEvent>
+        extends
+            IEventOperator<E> {
+}
diff --git 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/events/TaskLogSendToRemoteEvent.java
 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/events/TaskLogSendToRemoteEvent.java
new file mode 100644
index 0000000000..a54210606e
--- /dev/null
+++ 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/events/TaskLogSendToRemoteEvent.java
@@ -0,0 +1,19 @@
+package org.apache.dolphinscheduler.server.master.events;
+
+import lombok.AllArgsConstructor;
+import lombok.Builder;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
+@Data
+@Builder
+@NoArgsConstructor
+@AllArgsConstructor
+public class TaskLogSendToRemoteEvent implements ITaskEvent {
+
+    private Integer workflowInstanceId;
+    private Integer taskInstanceId;
+
+    private String taskType;
+    private String logPath;
+}
diff --git 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/events/TaskLogSendToRemoteEventOperator.java
 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/events/TaskLogSendToRemoteEventOperator.java
new file mode 100644
index 0000000000..667c0de0cc
--- /dev/null
+++ 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/events/TaskLogSendToRemoteEventOperator.java
@@ -0,0 +1,21 @@
+package org.apache.dolphinscheduler.server.master.events;
+
+import org.apache.dolphinscheduler.common.log.remote.RemoteLogUtils;
+import org.apache.dolphinscheduler.server.master.utils.TaskUtils;
+
+import lombok.extern.slf4j.Slf4j;
+
+import org.springframework.stereotype.Component;
+
+@Slf4j
+@Component
+public class TaskLogSendToRemoteEventOperator implements 
ITaskEventOperator<TaskLogSendToRemoteEvent> {
+
+    @Override
+    public void handleEvent(TaskLogSendToRemoteEvent event) {
+        if (RemoteLogUtils.isRemoteLoggingEnable() && 
TaskUtils.isMasterTask(event.getTaskType())) {
+            RemoteLogUtils.sendRemoteLog(event.getLogPath());
+            log.info("Master sends task log {} to remote storage 
asynchronously.", event.getLogPath());
+        }
+    }
+}
diff --git 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/events/TaskOperationEvent.java
 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/events/TaskOperationEvent.java
new file mode 100644
index 0000000000..79df2c093f
--- /dev/null
+++ 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/events/TaskOperationEvent.java
@@ -0,0 +1,18 @@
+package org.apache.dolphinscheduler.server.master.events;
+
+import lombok.AllArgsConstructor;
+import lombok.Builder;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
+@Data
+@Builder
+@NoArgsConstructor
+@AllArgsConstructor
+public class TaskOperationEvent implements ITaskEvent, ISyncEvent {
+
+    private Integer workflowInstanceId;
+    private Integer taskInstanceId;
+
+    private TaskOperationType taskOperationType;
+}
diff --git 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/events/TaskOperationEventOperator.java
 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/events/TaskOperationEventOperator.java
new file mode 100644
index 0000000000..c4b8b53c13
--- /dev/null
+++ 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/events/TaskOperationEventOperator.java
@@ -0,0 +1,48 @@
+package org.apache.dolphinscheduler.server.master.events;
+
+import org.apache.dolphinscheduler.server.master.dag.IWorkflowExecutionDAG;
+import 
org.apache.dolphinscheduler.server.master.dag.IWorkflowExecutionRunnable;
+import 
org.apache.dolphinscheduler.server.master.dag.WorkflowExecuteRunnableRepository;
+import org.apache.dolphinscheduler.server.master.runner.TaskExecuteRunnable;
+
+import lombok.extern.slf4j.Slf4j;
+
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Component;
+
+@Slf4j
+@Component
+public class TaskOperationEventOperator implements 
ITaskEventOperator<TaskOperationEvent> {
+
+    @Autowired
+    private WorkflowExecuteRunnableRepository 
workflowExecuteRunnableRepository;
+
+    @Override
+    public void handleEvent(TaskOperationEvent event) {
+        IWorkflowExecutionRunnable workflowExecutionRunnable =
+                
workflowExecuteRunnableRepository.getWorkflowExecutionRunnableById(event.getWorkflowInstanceId());
+        if (workflowExecutionRunnable == null) {
+            log.error("Cannot find the IWorkflowExecutionRunnable for event: 
{}", event);
+            return;
+        }
+        IWorkflowExecutionDAG workflowExecutionDAG = 
workflowExecutionRunnable.getDagEngine().getWorkflowExecutionDAG();
+        TaskExecuteRunnable taskExecuteRunnable =
+                
workflowExecutionDAG.getTaskExecutionRunnableById(event.getTaskInstanceId());
+        if (taskExecuteRunnable == null) {
+            log.error("Cannot find the ITaskExecutionRunnable for event: {}", 
event);
+        }
+        switch (event.getTaskOperationType()) {
+            case DISPATCH:
+                taskExecuteRunnable.dispatch();
+                break;
+            case KILL:
+                taskExecuteRunnable.kill();
+                break;
+            case PAUSE:
+                taskExecuteRunnable.pause();
+                break;
+            default:
+                log.error("Unknown TaskOperationType for event: {}", event);
+        }
+    }
+}
diff --git 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/events/TaskOperationType.java
 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/events/TaskOperationType.java
new file mode 100644
index 0000000000..0029342f62
--- /dev/null
+++ 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/events/TaskOperationType.java
@@ -0,0 +1,8 @@
+package org.apache.dolphinscheduler.server.master.events;
+
+public enum TaskOperationType {
+
+    DISPATCH, KILL, PAUSE,
+    ;
+
+}
diff --git 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/events/TaskSuccessEvent.java
 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/events/TaskSuccessEvent.java
new file mode 100644
index 0000000000..950db9b468
--- /dev/null
+++ 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/events/TaskSuccessEvent.java
@@ -0,0 +1,16 @@
+package org.apache.dolphinscheduler.server.master.events;
+
+import lombok.AllArgsConstructor;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
+@Data
+@NoArgsConstructor
+@AllArgsConstructor
+public class TaskSuccessEvent implements ITaskEvent {
+
+    private Integer workflowInstanceId;
+
+    private Integer taskInstanceId;
+
+}
diff --git 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/events/TaskSuccessEventOperator.java
 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/events/TaskSuccessEventOperator.java
new file mode 100644
index 0000000000..48cf50b3b8
--- /dev/null
+++ 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/events/TaskSuccessEventOperator.java
@@ -0,0 +1,30 @@
+package org.apache.dolphinscheduler.server.master.events;
+
+import 
org.apache.dolphinscheduler.server.master.cache.IWorkflowExecuteRunnableRepository;
+import 
org.apache.dolphinscheduler.server.master.dag.IWorkflowExecutionRunnable;
+
+import lombok.extern.slf4j.Slf4j;
+
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Component;
+
+@Slf4j
+@Component
+public class TaskSuccessEventOperator implements ITaskEventOperator {
+
+    @Autowired
+    private IWorkflowExecuteRunnableRepository 
workflowExecuteRunnableRepository;
+
+    @Override
+    public void handleEvent(ITaskEvent event) {
+        Integer workflowInstanceId = event.getWorkflowInstanceId();
+        Integer taskInstanceId = event.getTaskInstanceId();
+        IWorkflowExecutionRunnable workflowExecutionRunnable =
+                
workflowExecuteRunnableRepository.getByProcessInstanceId(workflowInstanceId);
+        if (workflowExecutionRunnable == null) {
+            log.error("Cannot find the WorkflowExecutionRunnable, the event: 
{} will be dropped", event);
+            return;
+        }
+
+    }
+}
diff --git 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/events/WorkflowFailedEvent.java
 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/events/WorkflowFailedEvent.java
new file mode 100644
index 0000000000..e9de91a6cf
--- /dev/null
+++ 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/events/WorkflowFailedEvent.java
@@ -0,0 +1,18 @@
+package org.apache.dolphinscheduler.server.master.events;
+
+import lombok.AllArgsConstructor;
+import lombok.Builder;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
+@Data
+@Builder
+@NoArgsConstructor
+@AllArgsConstructor
+public class WorkflowFailedEvent implements IWorkflowEvent, ISyncEvent {
+
+    private Integer workflowInstanceId;
+
+    private String failedReason;
+
+}
diff --git 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/events/WorkflowFailedEventOperator.java
 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/events/WorkflowFailedEventOperator.java
new file mode 100644
index 0000000000..9400209224
--- /dev/null
+++ 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/events/WorkflowFailedEventOperator.java
@@ -0,0 +1,43 @@
+package org.apache.dolphinscheduler.server.master.events;
+
+import org.apache.dolphinscheduler.common.enums.WorkflowExecutionStatus;
+import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
+import org.apache.dolphinscheduler.dao.repository.ProcessInstanceDao;
+import 
org.apache.dolphinscheduler.server.master.dag.IWorkflowExecutionRunnable;
+import 
org.apache.dolphinscheduler.server.master.dag.WorkflowExecuteRunnableRepository;
+
+import lombok.extern.slf4j.Slf4j;
+
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Component;
+
+@Slf4j
+@Component
+public class WorkflowFailedEventOperator
+        implements
+            IWorkflowEventOperator<WorkflowFailedEvent> {
+
+    @Autowired
+    private WorkflowExecuteRunnableRepository 
workflowExecuteRunnableRepository;
+
+    @Autowired
+    private EventDispatcher eventDispatcher;
+
+    @Autowired
+    private ProcessInstanceDao processInstanceDao;
+
+    @Override
+    public void handleEvent(WorkflowFailedEvent event) {
+        Integer workflowInstanceId = event.getWorkflowInstanceId();
+        IWorkflowExecutionRunnable workflowExecutionRunnable =
+                
workflowExecuteRunnableRepository.getWorkflowExecutionRunnableById(workflowInstanceId);
+
+        ProcessInstance workflowInstance = 
workflowExecutionRunnable.getWorkflowInstance();
+        workflowInstance.setState(WorkflowExecutionStatus.FAILURE);
+        processInstanceDao.updateById(workflowInstance);
+        log.info("Handle WorkflowExecutionRunnableFailedEvent success, set 
workflowInstance status to {}",
+                workflowInstance.getState());
+
+        eventDispatcher.dispatchEvent(new 
WorkflowFinalizeEvent(workflowInstanceId));
+    }
+}
diff --git 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/events/WorkflowFinalizeEvent.java
 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/events/WorkflowFinalizeEvent.java
new file mode 100644
index 0000000000..e4a4acebb3
--- /dev/null
+++ 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/events/WorkflowFinalizeEvent.java
@@ -0,0 +1,15 @@
+package org.apache.dolphinscheduler.server.master.events;
+
+import lombok.AllArgsConstructor;
+import lombok.Builder;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
+@Data
+@Builder
+@AllArgsConstructor
+@NoArgsConstructor
+public class WorkflowFinalizeEvent implements IWorkflowEvent, ISyncEvent {
+
+    private Integer workflowInstanceId;
+}
diff --git 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/events/WorkflowFinalizeEventOperator.java
 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/events/WorkflowFinalizeEventOperator.java
new file mode 100644
index 0000000000..4d4055c8db
--- /dev/null
+++ 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/events/WorkflowFinalizeEventOperator.java
@@ -0,0 +1,26 @@
+package org.apache.dolphinscheduler.server.master.events;
+
+import 
org.apache.dolphinscheduler.server.master.cache.IWorkflowExecuteRunnableRepository;
+import 
org.apache.dolphinscheduler.server.master.dag.IWorkflowExecutionRunnable;
+
+import lombok.extern.slf4j.Slf4j;
+
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Component;
+
+@Slf4j
+@Component
+public class WorkflowFinalizeEventOperator
+        implements
+            IWorkflowEventOperator<WorkflowFinalizeEvent> {
+
+    @Autowired
+    private IWorkflowExecuteRunnableRepository<IWorkflowExecutionRunnable> 
workflowExecuteRunnableRepository;
+
+    @Override
+    public void handleEvent(WorkflowFinalizeEvent event) {
+        Integer workflowInstanceId = event.getWorkflowInstanceId();
+        
workflowExecuteRunnableRepository.removeByProcessInstanceId(workflowInstanceId);
+    }
+
+}
diff --git 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/events/WorkflowFinishEvent.java
 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/events/WorkflowFinishEvent.java
new file mode 100644
index 0000000000..db49459862
--- /dev/null
+++ 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/events/WorkflowFinishEvent.java
@@ -0,0 +1,20 @@
+package org.apache.dolphinscheduler.server.master.events;
+
+import org.apache.dolphinscheduler.common.enums.WorkflowExecutionStatus;
+
+import lombok.AllArgsConstructor;
+import lombok.Builder;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
+@Data
+@Builder
+@NoArgsConstructor
+@AllArgsConstructor
+public class WorkflowFinishEvent implements IWorkflowEvent, ISyncEvent {
+
+    private Integer workflowInstanceId;
+
+    private WorkflowExecutionStatus workflowExecutionStatus;
+
+}
diff --git 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/events/WorkflowOperationEvent.java
 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/events/WorkflowOperationEvent.java
new file mode 100644
index 0000000000..e0aee2d29b
--- /dev/null
+++ 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/events/WorkflowOperationEvent.java
@@ -0,0 +1,24 @@
+package org.apache.dolphinscheduler.server.master.events;
+
+import lombok.AllArgsConstructor;
+import lombok.Builder;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+
+@Slf4j
+@Data
+@Builder
+@NoArgsConstructor
+@AllArgsConstructor
+public class WorkflowOperationEvent implements IWorkflowEvent, ISyncEvent {
+
+    private Integer workflowInstanceId;
+    private WorkflowOperationType workflowOperationType;
+
+    public static WorkflowOperationEvent of(Integer workflowInstanceId, 
WorkflowOperationType workflowOperationType) {
+        return 
WorkflowOperationEvent.builder().workflowInstanceId(workflowInstanceId)
+                .workflowOperationType(workflowOperationType).build();
+    }
+
+}
diff --git 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/events/WorkflowOperationEventOperator.java
 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/events/WorkflowOperationEventOperator.java
new file mode 100644
index 0000000000..4231fb160c
--- /dev/null
+++ 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/events/WorkflowOperationEventOperator.java
@@ -0,0 +1,68 @@
+package org.apache.dolphinscheduler.server.master.events;
+
+import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
+import 
org.apache.dolphinscheduler.server.master.cache.IWorkflowExecuteRunnableRepository;
+import 
org.apache.dolphinscheduler.server.master.dag.IWorkflowExecutionRunnable;
+import org.apache.dolphinscheduler.server.master.utils.ExceptionUtils;
+
+import lombok.extern.slf4j.Slf4j;
+
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Component;
+
+@Slf4j
+@Component
+public class WorkflowOperationEventOperator implements 
IWorkflowEventOperator<WorkflowOperationEvent> {
+
+    @Autowired
+    private IWorkflowExecuteRunnableRepository 
workflowExecuteRunnableRepository;
+
+    @Override
+    public void handleEvent(WorkflowOperationEvent event) {
+        IWorkflowExecutionRunnable workflowExecutionRunnable =
+                
workflowExecuteRunnableRepository.getByProcessInstanceId(event.getWorkflowInstanceId());
+        if (workflowExecutionRunnable == null) {
+            log.warn(
+                    "Handle workflowExecutionRunnableKillOperationEvent: {} 
failed: WorkflowExecutionRunnable not found",
+                    event);
+            return;
+        }
+        switch (event.getWorkflowOperationType()) {
+            case TRIGGER:
+                triggerWorkflow(workflowExecutionRunnable);
+                break;
+            case PAUSE:
+                pauseWorkflow(workflowExecutionRunnable);
+                break;
+            case KILL:
+                killWorkflow(workflowExecutionRunnable);
+                break;
+            default:
+                log.error("Unknown operationType for event: {}", event);
+        }
+    }
+
+    private void triggerWorkflow(IWorkflowExecutionRunnable 
workflowExecutionRunnable) {
+        try {
+            workflowExecutionRunnable.start();
+        } catch (Throwable exception) {
+            if (ExceptionUtils.isDatabaseConnectedFailedException(exception)) {
+                throw exception;
+            }
+            ProcessInstance workflowInstance =
+                    
workflowExecutionRunnable.getWorkflowExecutionContext().getWorkflowInstance();
+            log.error("Trigger workflow: {} failed", 
workflowInstance.getName(), exception);
+            WorkflowFailedEvent workflowExecutionRunnableFailedEvent = 
WorkflowFailedEvent.builder()
+                    
.workflowInstanceId(workflowInstance.getId()).failedReason(exception.getMessage()).build();
+            
workflowExecutionRunnable.getEventRepository().storeEventToTail(workflowExecutionRunnableFailedEvent);
+        }
+    }
+
+    private void pauseWorkflow(IWorkflowExecutionRunnable 
workflowExecutionRunnable) {
+        workflowExecutionRunnable.pause();
+    }
+
+    private void killWorkflow(IWorkflowExecutionRunnable 
workflowExecutionRunnable) {
+        workflowExecutionRunnable.kill();
+    }
+}
diff --git 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/events/WorkflowOperationType.java
 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/events/WorkflowOperationType.java
new file mode 100644
index 0000000000..296c5c8dd6
--- /dev/null
+++ 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/events/WorkflowOperationType.java
@@ -0,0 +1,10 @@
+package org.apache.dolphinscheduler.server.master.events;
+
+public enum WorkflowOperationType {
+
+    TRIGGER,
+    PAUSE,
+    KILL,
+    ;
+
+}
diff --git 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/events/WorkflowTimeoutEvent.java
 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/events/WorkflowTimeoutEvent.java
new file mode 100644
index 0000000000..ee3c6843a4
--- /dev/null
+++ 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/events/WorkflowTimeoutEvent.java
@@ -0,0 +1,16 @@
+package org.apache.dolphinscheduler.server.master.events;
+
+import lombok.AllArgsConstructor;
+import lombok.Builder;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
+@Data
+@Builder
+@NoArgsConstructor
+@AllArgsConstructor
+public class WorkflowTimeoutEvent implements IWorkflowEvent, IAsyncEvent {
+
+    private Integer workflowInstanceId;
+
+}
diff --git 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/events/WorkflowTimeoutEventOperator.java
 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/events/WorkflowTimeoutEventOperator.java
new file mode 100644
index 0000000000..89014b8d9d
--- /dev/null
+++ 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/events/WorkflowTimeoutEventOperator.java
@@ -0,0 +1,43 @@
+package org.apache.dolphinscheduler.server.master.events;
+
+import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
+import org.apache.dolphinscheduler.dao.entity.ProjectUser;
+import 
org.apache.dolphinscheduler.server.master.cache.IWorkflowExecuteRunnableRepository;
+import 
org.apache.dolphinscheduler.server.master.dag.IWorkflowExecutionRunnable;
+import org.apache.dolphinscheduler.service.alert.ProcessAlertManager;
+import org.apache.dolphinscheduler.service.process.ProcessService;
+
+import lombok.extern.slf4j.Slf4j;
+
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Component;
+
+@Slf4j
+@Component
+public class WorkflowTimeoutEventOperator
+        implements
+            IEventOperator<WorkflowTimeoutEvent> {
+
+    @Autowired
+    private IWorkflowExecuteRunnableRepository<IWorkflowExecutionRunnable> 
workflowExecutionRunnableRepository;
+
+    @Autowired
+    private ProcessService processService;
+
+    @Autowired
+    private ProcessAlertManager processAlertManager;
+
+    @Override
+    public void handleEvent(WorkflowTimeoutEvent event) {
+        IWorkflowExecutionRunnable workflowExecutionRunnable =
+                
workflowExecutionRunnableRepository.getByProcessInstanceId(event.getWorkflowInstanceId());
+        if (workflowExecutionRunnable == null) {
+            log.warn("Cannot find the workflow instance by id: {}", 
event.getWorkflowInstanceId());
+            return;
+        }
+        // we only support timeout warning for now
+        ProcessInstance workflowInstance = 
workflowExecutionRunnable.getWorkflowInstance();
+        ProjectUser projectUser = 
processService.queryProjectWithUserByProcessInstanceId(workflowInstance.getId());
+        processAlertManager.sendProcessTimeoutAlert(workflowInstance, 
projectUser);
+    }
+}
diff --git 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/events/WorkflowTriggerNextTaskEvent.java
 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/events/WorkflowTriggerNextTaskEvent.java
new file mode 100644
index 0000000000..45fe8cab46
--- /dev/null
+++ 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/events/WorkflowTriggerNextTaskEvent.java
@@ -0,0 +1,21 @@
+package org.apache.dolphinscheduler.server.master.events;
+
+import lombok.AllArgsConstructor;
+import lombok.Builder;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
+@Data
+@Builder
+@NoArgsConstructor
+@AllArgsConstructor
+public class WorkflowTriggerNextTaskEvent implements IWorkflowEvent, 
ISyncEvent {
+
+    private int workflowInstanceId;
+
+    /**
+     * The task name of the parent task, if it is the root task, the value is 
null
+     */
+    private String parentTaskNodeName;
+
+}
diff --git 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/events/WorkflowTriggerNextTaskEventOperator.java
 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/events/WorkflowTriggerNextTaskEventOperator.java
new file mode 100644
index 0000000000..50535c2b2c
--- /dev/null
+++ 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/events/WorkflowTriggerNextTaskEventOperator.java
@@ -0,0 +1,27 @@
+package org.apache.dolphinscheduler.server.master.events;
+
+import 
org.apache.dolphinscheduler.server.master.cache.IWorkflowExecuteRunnableRepository;
+import 
org.apache.dolphinscheduler.server.master.dag.IWorkflowExecutionRunnable;
+
+import lombok.extern.slf4j.Slf4j;
+
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Component;
+
+@Slf4j
+@Component
+public class WorkflowTriggerNextTaskEventOperator
+        implements
+            IWorkflowEventOperator<WorkflowTriggerNextTaskEvent> {
+
+    @Autowired
+    private IWorkflowExecuteRunnableRepository<IWorkflowExecutionRunnable> 
workflowExecuteRunnableRepository;
+
+    @Override
+    public void handleEvent(WorkflowTriggerNextTaskEvent event) {
+        IWorkflowExecutionRunnable workflowExecutionRunnable =
+                
workflowExecuteRunnableRepository.getByProcessInstanceId(event.getWorkflowInstanceId());
+        
workflowExecutionRunnable.getDagEngine().triggerNextTasks(event.getParentTaskNodeName());
+    }
+
+}
diff --git 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/events/WorkflowTriggeredEvent.java
 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/events/WorkflowTriggeredEvent.java
new file mode 100644
index 0000000000..0c5283733d
--- /dev/null
+++ 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/events/WorkflowTriggeredEvent.java
@@ -0,0 +1,16 @@
+package org.apache.dolphinscheduler.server.master.events;
+
+import lombok.AllArgsConstructor;
+import lombok.Builder;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
+@Data
+@Builder
+@NoArgsConstructor
+@AllArgsConstructor
+public class WorkflowTriggeredEvent implements IWorkflowEvent, IAsyncEvent {
+
+    private int workflowInstanceId;
+
+}
diff --git 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/events/WorkflowTriggeredEventOperator.java
 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/events/WorkflowTriggeredEventOperator.java
new file mode 100644
index 0000000000..e61e1f90c3
--- /dev/null
+++ 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/events/WorkflowTriggeredEventOperator.java
@@ -0,0 +1,44 @@
+package org.apache.dolphinscheduler.server.master.events;
+
+import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
+import org.apache.dolphinscheduler.dao.entity.ProjectUser;
+import 
org.apache.dolphinscheduler.server.master.dag.IWorkflowExecutionRunnable;
+import 
org.apache.dolphinscheduler.server.master.dag.WorkflowExecuteRunnableRepository;
+import 
org.apache.dolphinscheduler.server.master.metrics.ProcessInstanceMetrics;
+import org.apache.dolphinscheduler.service.alert.ListenerEventAlertManager;
+import org.apache.dolphinscheduler.service.process.ProcessService;
+
+import lombok.extern.slf4j.Slf4j;
+
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Component;
+
+@Slf4j
+@Component
+public class WorkflowTriggeredEventOperator
+        implements
+            IWorkflowEventOperator<WorkflowTriggeredEvent> {
+
+    @Autowired
+    private WorkflowExecuteRunnableRepository 
workflowExecuteRunnableRepository;
+
+    @Autowired
+    private ProcessService processService;
+
+    @Autowired
+    private ListenerEventAlertManager listenerEventAlertManager;
+
+    @Override
+    public void handleEvent(WorkflowTriggeredEvent event) {
+        int workflowInstanceId = event.getWorkflowInstanceId();
+        IWorkflowExecutionRunnable workflowExecutionRunnable =
+                
workflowExecuteRunnableRepository.getWorkflowExecutionRunnableById(workflowInstanceId);
+        Long workflowDefinitionCode =
+                
workflowExecutionRunnable.getWorkflowExecutionContext().getWorkflowDefinition().getCode();
+        
ProcessInstanceMetrics.incProcessInstanceByStateAndProcessDefinitionCode("submit",
 "" + workflowDefinitionCode);
+
+        ProcessInstance workflowInstance = 
workflowExecutionRunnable.getWorkflowInstance();
+        ProjectUser projectUser = 
processService.queryProjectWithUserByProcessInstanceId(workflowInstanceId);
+        
listenerEventAlertManager.publishProcessStartListenerEvent(workflowInstance, 
projectUser);
+    }
+}
diff --git 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/exception/TaskExecuteRunnableNotFoundException.java
 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/exception/TaskExecuteRunnableNotFoundException.java
new file mode 100644
index 0000000000..19a11b9639
--- /dev/null
+++ 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/exception/TaskExecuteRunnableNotFoundException.java
@@ -0,0 +1,12 @@
+package org.apache.dolphinscheduler.server.master.exception;
+
+public class TaskExecuteRunnableNotFoundException extends RuntimeException {
+
+    public TaskExecuteRunnableNotFoundException(Integer workflowInstanceId) {
+        super("WorkflowExecuteRunnable not found: [id=" + workflowInstanceId + 
"]");
+    }
+
+    public TaskExecuteRunnableNotFoundException(String workflowInstanceName) {
+        super("WorkflowExecuteRunnable not found: [name=" + 
workflowInstanceName + "]");
+    }
+}
diff --git 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/exception/TaskExecutionContextCreateException.java
 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/exception/TaskExecutionContextCreateException.java
deleted file mode 100644
index ac37c94438..0000000000
--- 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/exception/TaskExecutionContextCreateException.java
+++ /dev/null
@@ -1,26 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.dolphinscheduler.server.master.exception;
-
-public class TaskExecutionContextCreateException extends MasterException {
-
-    public TaskExecutionContextCreateException(String message) {
-        super(message);
-    }
-
-}
diff --git 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/exception/WorkflowExecuteRunnableNotFoundException.java
 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/exception/WorkflowExecuteRunnableNotFoundException.java
new file mode 100644
index 0000000000..3ded58f3c6
--- /dev/null
+++ 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/exception/WorkflowExecuteRunnableNotFoundException.java
@@ -0,0 +1,13 @@
+package org.apache.dolphinscheduler.server.master.exception;
+
+public class WorkflowExecuteRunnableNotFoundException extends RuntimeException 
{
+
+    public WorkflowExecuteRunnableNotFoundException(Integer 
workflowInstanceId) {
+        super("WorkflowExecuteRunnable not found: [id=" + workflowInstanceId + 
"]");
+    }
+
+    public WorkflowExecuteRunnableNotFoundException(String 
workflowInstanceName) {
+        super("WorkflowExecuteRunnable not found: [name=" + 
workflowInstanceName + "]");
+    }
+
+}


Reply via email to