This is an automated email from the ASF dual-hosted git repository.

wenjun pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git


The following commit(s) were added to refs/heads/dev by this push:
     new f639a2eed4 [Fix-10854] Fix database restart may lost task instance 
status (#10866)
f639a2eed4 is described below

commit f639a2eed464abc0045d970b2e61397b4c488772
Author: Wenjun Ruan <[email protected]>
AuthorDate: Mon Jul 11 09:57:00 2022 +0800

    [Fix-10854] Fix database restart may lost task instance status (#10866)
    
    * Fix database update error doesn't rollback the task instance status
    
    * Fix database error may cause workflow dead with running status
---
 .../common/enums/StateEventType.java               |   2 +-
 .../enums/{Event.java => TaskEventType.java}       |   2 +-
 .../dao/utils/TaskInstanceUtils.java               |  79 +++++++
 .../dao/utils/TaskInstanceUtilsTest.java           |  43 ++++
 .../server/master/MasterServer.java                |  10 +-
 .../server/master/event/TaskDelayEventHandler.java | 120 +++++++++++
 .../master/event/TaskDispatchEventHandler.java     |  88 ++++++++
 .../server/master/event/TaskEventHandleError.java  |  17 +-
 .../master/event/TaskEventHandleException.java     |  17 +-
 .../server/master/event/TaskEventHandler.java      |  22 +-
 .../event/TaskRejectByWorkerEventHandler.java      |  77 +++++++
 .../master/event/TaskResultEventHandler.java       | 117 ++++++++++
 .../master/event/TaskRunningEventHandler.java      | 118 ++++++++++
 .../server/master/event/WorkflowEvent.java         |  19 +-
 .../master/event/WorkflowEventHandleError.java     |  17 +-
 .../master/event/WorkflowEventHandleException.java |  17 +-
 .../server/master/event/WorkflowEventHandler.java  |  19 +-
 .../server/master/event/WorkflowEventQueue.java    |  48 +++++
 .../server/master/event/WorkflowEventType.java     |  13 +-
 .../master/event/WorkflowStartEventHandler.java    |  86 ++++++++
 .../master/event/WorkflowStateEventHandler.java    |   9 +-
 .../master/processor/StateEventProcessor.java      |  11 +-
 .../processor/queue/StateEventResponseService.java |   6 +-
 .../server/master/processor/queue/TaskEvent.java   |  12 +-
 .../processor/queue/TaskExecuteRunnable.java       | 210 +++---------------
 .../processor/queue/TaskExecuteThreadPool.java     |  31 +--
 ...rService.java => MasterSchedulerBootstrap.java} | 183 +++++-----------
 .../server/master/runner/WorkflowEventLooper.java  | 108 ++++++++++
 .../master/runner/WorkflowExecuteRunnable.java     | 240 ++++++++++++---------
 .../remote/command/TaskRecallCommand.java          |   1 -
 .../service/process/ProcessServiceImpl.java        |  20 +-
 .../server/worker/cache/ResponseCache.java         |   4 +-
 .../worker/processor/TaskCallbackService.java      |   8 +-
 .../worker/runner/RetryReportTaskStatusThread.java |  14 +-
 34 files changed, 1253 insertions(+), 535 deletions(-)

diff --git 
a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/StateEventType.java
 
b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/StateEventType.java
index 292202501c..bc021e5e08 100644
--- 
a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/StateEventType.java
+++ 
b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/StateEventType.java
@@ -21,7 +21,7 @@ import com.baomidou.mybatisplus.annotation.EnumValue;
 
 public enum StateEventType {
 
-    PROCESS_STATE_CHANGE(0, "process statechange"),
+    PROCESS_STATE_CHANGE(0, "process state change"),
     TASK_STATE_CHANGE(1, "task state change"),
     PROCESS_TIMEOUT(2, "process timeout"),
     TASK_TIMEOUT(3, "task timeout"),
diff --git 
a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/Event.java
 
b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/TaskEventType.java
similarity index 97%
copy from 
dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/Event.java
copy to 
dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/TaskEventType.java
index 26c3a3beab..09f85d3f17 100644
--- 
a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/Event.java
+++ 
b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/TaskEventType.java
@@ -17,7 +17,7 @@
 
 package org.apache.dolphinscheduler.common.enums;
 
-public enum Event {
+public enum TaskEventType {
     DISPATCH,
     DELAY,
     RUNNING,
diff --git 
a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/utils/TaskInstanceUtils.java
 
b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/utils/TaskInstanceUtils.java
new file mode 100644
index 0000000000..fa3bfec0ca
--- /dev/null
+++ 
b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/utils/TaskInstanceUtils.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.dao.utils;
+
+import org.apache.dolphinscheduler.dao.entity.TaskInstance;
+
+public class TaskInstanceUtils {
+
+    /**
+     * Copy the property of given source {@link TaskInstance} to target.
+     *
+     * @param source Given task instance, copy from.
+     * @param target Given task instance, copy to
+     * @return a soft copy of given task instance.
+     */
+    public static void copyTaskInstance(TaskInstance source, TaskInstance 
target) {
+        target.setId(source.getId());
+        target.setName(source.getName());
+        target.setTaskType(source.getTaskType());
+        target.setProcessInstanceId(source.getProcessInstanceId());
+        target.setTaskCode(source.getTaskCode());
+        target.setTaskDefinitionVersion(source.getTaskDefinitionVersion());
+        target.setProcessInstanceName(source.getProcessInstanceName());
+        target.setTaskGroupPriority(source.getTaskGroupPriority());
+        target.setState(source.getState());
+        target.setFirstSubmitTime(source.getFirstSubmitTime());
+        target.setSubmitTime(source.getSubmitTime());
+        target.setStartTime(source.getStartTime());
+        target.setEndTime(source.getEndTime());
+        target.setHost(source.getHost());
+        target.setExecutePath(source.getExecutePath());
+        target.setLogPath(source.getLogPath());
+        target.setRetryTimes(source.getRetryTimes());
+        target.setAlertFlag(source.getAlertFlag());
+        target.setProcessInstance(source.getProcessInstance());
+        target.setProcessDefine(source.getProcessDefine());
+        target.setTaskDefine(source.getTaskDefine());
+        target.setPid(source.getPid());
+        target.setAppLink(source.getAppLink());
+        target.setFlag(source.getFlag());
+        target.setDependency(source.getDependency());
+        // todo: we need to cpoy the task params and then copy 
switchDependency, since the setSwitchDependency rely on task params, this is 
really a very bad practice.
+        target.setTaskParams(source.getTaskParams());
+        target.setSwitchDependency(source.getSwitchDependency());
+        target.setDuration(source.getDuration());
+        target.setMaxRetryTimes(source.getMaxRetryTimes());
+        target.setRetryInterval(source.getRetryInterval());
+        target.setTaskInstancePriority(source.getTaskInstancePriority());
+        target.setDependentResult(source.getDependentResult());
+        target.setWorkerGroup(source.getWorkerGroup());
+        target.setEnvironmentCode(source.getEnvironmentCode());
+        target.setEnvironmentConfig(source.getEnvironmentConfig());
+        target.setExecutorId(source.getExecutorId());
+        target.setVarPool(source.getVarPool());
+        target.setExecutorName(source.getExecutorName());
+        target.setResources(source.getResources());
+        target.setDelayTime(source.getDelayTime());
+        target.setDryRun(source.getDryRun());
+        target.setTaskGroupId(source.getTaskGroupId());
+        target.setCpuQuota(source.getCpuQuota());
+        target.setMemoryMax(source.getMemoryMax());
+    }
+
+}
diff --git 
a/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/utils/TaskInstanceUtilsTest.java
 
b/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/utils/TaskInstanceUtilsTest.java
new file mode 100644
index 0000000000..497c2fb881
--- /dev/null
+++ 
b/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/utils/TaskInstanceUtilsTest.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.dao.utils;
+
+import org.apache.dolphinscheduler.common.utils.JSONUtils;
+import org.apache.dolphinscheduler.dao.entity.TaskInstance;
+
+import java.util.Date;
+import java.util.HashMap;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+class TaskInstanceUtilsTest {
+
+    @Test
+    void copyTaskInstance() {
+        TaskInstance source = new TaskInstance();
+        source.setId(1);
+        source.setName("source");
+        source.setSubmitTime(new Date());
+        source.setTaskParams(JSONUtils.toJsonString(new HashMap<>()));
+        TaskInstance target = new TaskInstance();
+        TaskInstanceUtils.copyTaskInstance(source, target);
+        Assertions.assertEquals(target.getId(), source.getId());
+        Assertions.assertEquals(target.getName(), source.getName());
+    }
+}
\ No newline at end of file
diff --git 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java
 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java
index d26d33c062..3469cd4852 100644
--- 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java
+++ 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java
@@ -26,7 +26,7 @@ import 
org.apache.dolphinscheduler.server.master.registry.MasterRegistryClient;
 import org.apache.dolphinscheduler.server.master.rpc.MasterRPCServer;
 import org.apache.dolphinscheduler.server.master.runner.EventExecuteService;
 import org.apache.dolphinscheduler.server.master.runner.FailoverExecuteThread;
-import org.apache.dolphinscheduler.server.master.runner.MasterSchedulerService;
+import 
org.apache.dolphinscheduler.server.master.runner.MasterSchedulerBootstrap;
 import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
 import org.apache.dolphinscheduler.service.task.TaskPluginManager;
 
@@ -59,7 +59,7 @@ public class MasterServer implements IStoppable {
     private TaskPluginManager taskPluginManager;
 
     @Autowired
-    private MasterSchedulerService masterSchedulerService;
+    private MasterSchedulerBootstrap masterSchedulerBootstrap;
 
     @Autowired
     private SchedulerApi schedulerApi;
@@ -94,8 +94,8 @@ public class MasterServer implements IStoppable {
         this.masterRegistryClient.start();
         this.masterRegistryClient.setRegistryStoppable(this);
 
-        this.masterSchedulerService.init();
-        this.masterSchedulerService.start();
+        this.masterSchedulerBootstrap.init();
+        this.masterSchedulerBootstrap.start();
 
         this.eventExecuteService.start();
         this.failoverExecuteThread.start();
@@ -130,7 +130,7 @@ public class MasterServer implements IStoppable {
             ThreadUtils.sleep(Constants.SERVER_CLOSE_WAIT_TIME.toMillis());
             // close
             this.schedulerApi.close();
-            this.masterSchedulerService.close();
+            this.masterSchedulerBootstrap.close();
             this.masterRPCServer.close();
             this.masterRegistryClient.closeRegistry();
             // close spring Context and will invoke method with @PreDestroy 
annotation to destroy beans.
diff --git 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/TaskDelayEventHandler.java
 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/TaskDelayEventHandler.java
new file mode 100644
index 0000000000..8e2dcced23
--- /dev/null
+++ 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/TaskDelayEventHandler.java
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.server.master.event;
+
+import org.apache.dolphinscheduler.common.enums.StateEventType;
+import org.apache.dolphinscheduler.common.enums.TaskEventType;
+import org.apache.dolphinscheduler.dao.entity.TaskInstance;
+import org.apache.dolphinscheduler.dao.utils.TaskInstanceUtils;
+import org.apache.dolphinscheduler.plugin.task.api.enums.ExecutionStatus;
+import org.apache.dolphinscheduler.remote.command.TaskExecuteRunningAckCommand;
+import 
org.apache.dolphinscheduler.server.master.cache.ProcessInstanceExecCacheManager;
+import org.apache.dolphinscheduler.server.master.processor.queue.TaskEvent;
+import 
org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteRunnable;
+import 
org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteThreadPool;
+import org.apache.dolphinscheduler.service.process.ProcessService;
+
+import java.util.Optional;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Component;
+
+@Component
+public class TaskDelayEventHandler implements TaskEventHandler {
+
+    private final Logger logger = 
LoggerFactory.getLogger(TaskDelayEventHandler.class);
+
+    @Autowired
+    private ProcessInstanceExecCacheManager processInstanceExecCacheManager;
+
+    @Autowired
+    private ProcessService processService;
+
+    @Autowired
+    private WorkflowExecuteThreadPool workflowExecuteThreadPool;
+
+    @Override
+    public void handleTaskEvent(TaskEvent taskEvent) throws 
TaskEventHandleError {
+        int taskInstanceId = taskEvent.getTaskInstanceId();
+        int processInstanceId = taskEvent.getProcessInstanceId();
+
+        WorkflowExecuteRunnable workflowExecuteRunnable =
+            
this.processInstanceExecCacheManager.getByProcessInstanceId(processInstanceId);
+        if (workflowExecuteRunnable == null) {
+            sendAckToWorker(taskEvent);
+            throw new TaskEventHandleError("Cannot find related workflow 
instance from cache");
+        }
+        Optional<TaskInstance> taskInstanceOptional = 
workflowExecuteRunnable.getTaskInstance(taskInstanceId);
+        if (!taskInstanceOptional.isPresent()) {
+            sendAckToWorker(taskEvent);
+            return;
+        }
+        TaskInstance taskInstance = taskInstanceOptional.get();
+        if (taskInstance.getState().typeIsFinished()) {
+            logger.warn(
+                "The current task status is: {}, will not handle the running 
event, this event is delay, will discard this event: {}",
+                taskInstance.getState(),
+                taskEvent);
+            sendAckToWorker(taskEvent);
+            return;
+        }
+
+        TaskInstance oldTaskInstance = new TaskInstance();
+        TaskInstanceUtils.copyTaskInstance(taskInstance, oldTaskInstance);
+        try {
+            taskInstance.setState(taskEvent.getState());
+            taskInstance.setStartTime(taskEvent.getStartTime());
+            taskInstance.setHost(taskEvent.getWorkerAddress());
+            taskInstance.setLogPath(taskEvent.getLogPath());
+            taskInstance.setExecutePath(taskEvent.getExecutePath());
+            taskInstance.setPid(taskEvent.getProcessId());
+            taskInstance.setAppLink(taskEvent.getAppIds());
+            if (!processService.updateTaskInstance(taskInstance)) {
+                throw new TaskEventHandleError("Handle task delay event error, 
update taskInstance to db failed");
+            }
+            sendAckToWorker(taskEvent);
+        } catch (Exception ex) {
+            TaskInstanceUtils.copyTaskInstance(oldTaskInstance, taskInstance);
+            if (ex instanceof TaskEventHandleError) {
+                throw ex;
+            }
+            throw new TaskEventHandleError("Handle task dispatch event error, 
update taskInstance to db failed", ex);
+        }
+        StateEvent stateEvent = new StateEvent();
+        stateEvent.setProcessInstanceId(taskEvent.getProcessInstanceId());
+        stateEvent.setTaskInstanceId(taskEvent.getTaskInstanceId());
+        stateEvent.setExecutionStatus(taskEvent.getState());
+        stateEvent.setType(StateEventType.TASK_STATE_CHANGE);
+        workflowExecuteThreadPool.submitStateEvent(stateEvent);
+
+    }
+
+    private void sendAckToWorker(TaskEvent taskEvent) {
+        // If event handle success, send ack to worker to otherwise the worker 
will retry this event
+        TaskExecuteRunningAckCommand taskExecuteRunningAckCommand =
+            new 
TaskExecuteRunningAckCommand(ExecutionStatus.SUCCESS.getCode(), 
taskEvent.getTaskInstanceId());
+        
taskEvent.getChannel().writeAndFlush(taskExecuteRunningAckCommand.convert2Command());
+    }
+
+    @Override
+    public TaskEventType getHandleEventType() {
+        return TaskEventType.DELAY;
+    }
+}
diff --git 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/TaskDispatchEventHandler.java
 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/TaskDispatchEventHandler.java
new file mode 100644
index 0000000000..9378f0c36e
--- /dev/null
+++ 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/TaskDispatchEventHandler.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.server.master.event;
+
+import org.apache.dolphinscheduler.common.enums.TaskEventType;
+import org.apache.dolphinscheduler.dao.entity.TaskInstance;
+import org.apache.dolphinscheduler.dao.utils.TaskInstanceUtils;
+import org.apache.dolphinscheduler.plugin.task.api.enums.ExecutionStatus;
+import 
org.apache.dolphinscheduler.server.master.cache.ProcessInstanceExecCacheManager;
+import org.apache.dolphinscheduler.server.master.processor.queue.TaskEvent;
+import 
org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteRunnable;
+import org.apache.dolphinscheduler.service.process.ProcessService;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Component;
+
+@Component
+public class TaskDispatchEventHandler implements TaskEventHandler {
+
+    private final Logger logger = 
LoggerFactory.getLogger(TaskDispatchEventHandler.class);
+
+    @Autowired
+    private ProcessInstanceExecCacheManager processInstanceExecCacheManager;
+
+    @Autowired
+    private ProcessService processService;
+
+    @Override
+    public void handleTaskEvent(TaskEvent taskEvent) throws 
TaskEventHandleError {
+        int taskInstanceId = taskEvent.getTaskInstanceId();
+        int processInstanceId = taskEvent.getProcessInstanceId();
+
+        WorkflowExecuteRunnable workflowExecuteRunnable =
+            
this.processInstanceExecCacheManager.getByProcessInstanceId(processInstanceId);
+        if (workflowExecuteRunnable == null) {
+            throw new TaskEventHandleError("Cannot find related workflow 
instance from cache");
+        }
+        TaskInstance taskInstance = 
workflowExecuteRunnable.getTaskInstance(taskInstanceId)
+            .orElseThrow(() -> new TaskEventHandleError("Cannot find related 
taskInstance from cache"));
+        if (taskInstance.getState() != ExecutionStatus.SUBMITTED_SUCCESS) {
+            logger.warn(
+                "The current taskInstance status is not SUBMITTED_SUCCESS, so 
the dispatch event will be discarded, the current is a delay event, event: {}",
+                taskEvent);
+            return;
+        }
+
+        // todo: we need to just log the old status and rollback these two 
field, no need to copy all fields
+        TaskInstance oldTaskInstance = new TaskInstance();
+        TaskInstanceUtils.copyTaskInstance(taskInstance, oldTaskInstance);
+        // update the taskInstance status
+        taskInstance.setState(ExecutionStatus.DISPATCH);
+        taskInstance.setHost(taskEvent.getWorkerAddress());
+        try {
+            if (!processService.updateTaskInstance(taskInstance)) {
+                throw new TaskEventHandleError("Handle task dispatch event 
error, update taskInstance to db failed");
+            }
+        } catch (Exception ex) {
+            // rollback status
+            TaskInstanceUtils.copyTaskInstance(oldTaskInstance, taskInstance);
+            if (ex instanceof TaskEventHandleError) {
+                throw ex;
+            }
+            throw new TaskEventHandleError("Handle task running event error, 
update taskInstance to db failed", ex);
+        }
+    }
+
+    @Override
+    public TaskEventType getHandleEventType() {
+        return TaskEventType.DISPATCH;
+    }
+}
diff --git 
a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/Event.java
 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/TaskEventHandleError.java
similarity index 72%
copy from 
dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/Event.java
copy to 
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/TaskEventHandleError.java
index 26c3a3beab..deae719f4b 100644
--- 
a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/Event.java
+++ 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/TaskEventHandleError.java
@@ -15,12 +15,15 @@
  * limitations under the License.
  */
 
-package org.apache.dolphinscheduler.common.enums;
+package org.apache.dolphinscheduler.server.master.event;
 
-public enum Event {
-    DISPATCH,
-    DELAY,
-    RUNNING,
-    RESULT,
-    WORKER_REJECT
+public class TaskEventHandleError extends Exception {
+
+    public TaskEventHandleError(String message) {
+        super(message);
+    }
+
+    public TaskEventHandleError(String message, Throwable throwable) {
+        super(message, throwable);
+    }
 }
diff --git 
a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/Event.java
 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/TaskEventHandleException.java
similarity index 71%
copy from 
dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/Event.java
copy to 
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/TaskEventHandleException.java
index 26c3a3beab..e53a2bbcc3 100644
--- 
a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/Event.java
+++ 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/TaskEventHandleException.java
@@ -15,12 +15,15 @@
  * limitations under the License.
  */
 
-package org.apache.dolphinscheduler.common.enums;
+package org.apache.dolphinscheduler.server.master.event;
 
-public enum Event {
-    DISPATCH,
-    DELAY,
-    RUNNING,
-    RESULT,
-    WORKER_REJECT
+public class TaskEventHandleException extends Exception {
+
+    public TaskEventHandleException(String message) {
+        super(message);
+    }
+
+    public TaskEventHandleException(String message, Throwable throwable) {
+        super(message, throwable);
+    }
 }
diff --git 
a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/Event.java
 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/TaskEventHandler.java
similarity index 57%
copy from 
dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/Event.java
copy to 
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/TaskEventHandler.java
index 26c3a3beab..5d214cbb5a 100644
--- 
a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/Event.java
+++ 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/TaskEventHandler.java
@@ -15,12 +15,20 @@
  * limitations under the License.
  */
 
-package org.apache.dolphinscheduler.common.enums;
+package org.apache.dolphinscheduler.server.master.event;
 
-public enum Event {
-    DISPATCH,
-    DELAY,
-    RUNNING,
-    RESULT,
-    WORKER_REJECT
+import org.apache.dolphinscheduler.common.enums.TaskEventType;
+import org.apache.dolphinscheduler.server.master.processor.queue.TaskEvent;
+
+public interface TaskEventHandler {
+
+    /**
+     * Handle the task event
+     *
+     * @throws TaskEventHandleError     this exception means we will discord 
this event.
+     * @throws TaskEventHandleException this exception means we need to retry 
this event
+     */
+    void handleTaskEvent(TaskEvent taskEvent) throws TaskEventHandleError, 
TaskEventHandleException;
+
+    TaskEventType getHandleEventType();
 }
diff --git 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/TaskRejectByWorkerEventHandler.java
 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/TaskRejectByWorkerEventHandler.java
new file mode 100644
index 0000000000..d09a8364ca
--- /dev/null
+++ 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/TaskRejectByWorkerEventHandler.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.server.master.event;
+
+import org.apache.dolphinscheduler.common.enums.TaskEventType;
+import org.apache.dolphinscheduler.dao.entity.TaskInstance;
+import org.apache.dolphinscheduler.plugin.task.api.enums.ExecutionStatus;
+import org.apache.dolphinscheduler.remote.command.TaskRecallAckCommand;
+import 
org.apache.dolphinscheduler.server.master.cache.ProcessInstanceExecCacheManager;
+import org.apache.dolphinscheduler.server.master.processor.queue.TaskEvent;
+import 
org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteRunnable;
+
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Component;
+
+@Component
+public class TaskRejectByWorkerEventHandler implements TaskEventHandler {
+
+    @Autowired
+    private ProcessInstanceExecCacheManager processInstanceExecCacheManager;
+
+    @Override
+    public void handleTaskEvent(TaskEvent taskEvent) throws 
TaskEventHandleError {
+        int taskInstanceId = taskEvent.getTaskInstanceId();
+        int processInstanceId = taskEvent.getProcessInstanceId();
+
+        WorkflowExecuteRunnable workflowExecuteRunnable =
+            
this.processInstanceExecCacheManager.getByProcessInstanceId(processInstanceId);
+        if (workflowExecuteRunnable == null) {
+            sendAckToWorker(taskEvent);
+            throw new TaskEventHandleError(
+                "Handle task reject event error, cannot find related workflow 
instance from cache, will discard this event");
+        }
+        TaskInstance taskInstance = 
workflowExecuteRunnable.getTaskInstance(taskInstanceId).orElseThrow(() -> {
+            sendAckToWorker(taskEvent);
+            return new TaskEventHandleError(
+                "Handle task reject event error, cannot find the taskInstance 
from cache, will discord this event");
+        });
+        try {
+            // todo: If the worker submit multiple reject response to master, 
the task instance may be dispatch multiple,
+            // we need to control the worker overload by master rather than 
worker
+            // if the task resubmit and the worker failover, this task may be 
dispatch twice?
+            // todo: we need to clear the taskInstance host and rollback the 
status to submit.
+            workflowExecuteRunnable.resubmit(taskInstance.getTaskCode());
+            sendAckToWorker(taskEvent);
+        } catch (Exception ex) {
+            throw new TaskEventHandleError("Handle task reject event error", 
ex);
+        }
+
+    }
+
+    public void sendAckToWorker(TaskEvent taskEvent) {
+        TaskRecallAckCommand taskRecallAckCommand =
+            new TaskRecallAckCommand(ExecutionStatus.SUCCESS.getCode(), 
taskEvent.getTaskInstanceId());
+        
taskEvent.getChannel().writeAndFlush(taskRecallAckCommand.convert2Command());
+    }
+
+    @Override
+    public TaskEventType getHandleEventType() {
+        return TaskEventType.WORKER_REJECT;
+    }
+}
diff --git 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/TaskResultEventHandler.java
 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/TaskResultEventHandler.java
new file mode 100644
index 0000000000..67df03682f
--- /dev/null
+++ 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/TaskResultEventHandler.java
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.server.master.event;
+
+import org.apache.dolphinscheduler.common.enums.StateEventType;
+import org.apache.dolphinscheduler.common.enums.TaskEventType;
+import org.apache.dolphinscheduler.dao.entity.TaskInstance;
+import org.apache.dolphinscheduler.dao.utils.TaskInstanceUtils;
+import org.apache.dolphinscheduler.plugin.task.api.enums.ExecutionStatus;
+import 
org.apache.dolphinscheduler.remote.command.TaskExecuteResponseAckCommand;
+import 
org.apache.dolphinscheduler.server.master.cache.ProcessInstanceExecCacheManager;
+import org.apache.dolphinscheduler.server.master.processor.queue.TaskEvent;
+import 
org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteRunnable;
+import 
org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteThreadPool;
+import org.apache.dolphinscheduler.server.utils.DataQualityResultOperator;
+import org.apache.dolphinscheduler.service.process.ProcessService;
+
+import java.util.Optional;
+
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Component;
+
+@Component
+public class TaskResultEventHandler implements TaskEventHandler {
+
+    @Autowired
+    private ProcessInstanceExecCacheManager processInstanceExecCacheManager;
+
+    @Autowired
+    private WorkflowExecuteThreadPool workflowExecuteThreadPool;
+
+    @Autowired
+    private DataQualityResultOperator dataQualityResultOperator;
+
+    @Autowired
+    private ProcessService processService;
+
+    @Override
+    public void handleTaskEvent(TaskEvent taskEvent) throws 
TaskEventHandleError, TaskEventHandleException {
+        int taskInstanceId = taskEvent.getTaskInstanceId();
+        int processInstanceId = taskEvent.getProcessInstanceId();
+
+        WorkflowExecuteRunnable workflowExecuteRunnable =
+            
this.processInstanceExecCacheManager.getByProcessInstanceId(processInstanceId);
+        if (workflowExecuteRunnable == null) {
+            sendAckToWorker(taskEvent);
+            throw new TaskEventHandleError(
+                "Handle task result event error, cannot find related workflow 
instance from cache, will discard this event");
+        }
+        Optional<TaskInstance> taskInstanceOptional = 
workflowExecuteRunnable.getTaskInstance(taskInstanceId);
+        if (!taskInstanceOptional.isPresent()) {
+            sendAckToWorker(taskEvent);
+            throw new TaskEventHandleError(
+                "Handle task result event error, cannot find the taskInstance 
from cache, will discord this event");
+        }
+        TaskInstance taskInstance = taskInstanceOptional.get();
+        if (taskInstance.getState().typeIsFinished()) {
+            sendAckToWorker(taskEvent);
+            throw new TaskEventHandleError(
+                "Handle task result event error, the task instance is already 
finished, will discord this event");
+        }
+        dataQualityResultOperator.operateDqExecuteResult(taskEvent, 
taskInstance);
+
+        TaskInstance oldTaskInstance = new TaskInstance();
+        TaskInstanceUtils.copyTaskInstance(taskInstance, oldTaskInstance);
+        try {
+            taskInstance.setStartTime(taskEvent.getStartTime());
+            taskInstance.setHost(taskEvent.getWorkerAddress());
+            taskInstance.setLogPath(taskEvent.getLogPath());
+            taskInstance.setExecutePath(taskEvent.getExecutePath());
+            taskInstance.setPid(taskEvent.getProcessId());
+            taskInstance.setAppLink(taskEvent.getAppIds());
+            taskInstance.setState(taskEvent.getState());
+            taskInstance.setEndTime(taskEvent.getEndTime());
+            taskInstance.setVarPool(taskEvent.getVarPool());
+            processService.changeOutParam(taskInstance);
+            processService.updateTaskInstance(taskInstance);
+            sendAckToWorker(taskEvent);
+        } catch (Exception ex) {
+            TaskInstanceUtils.copyTaskInstance(oldTaskInstance, taskInstance);
+            throw new TaskEventHandleError("Handle task result event error, 
save taskInstance to db error", ex);
+        }
+        StateEvent stateEvent = new StateEvent();
+        stateEvent.setProcessInstanceId(taskEvent.getProcessInstanceId());
+        stateEvent.setTaskInstanceId(taskEvent.getTaskInstanceId());
+        stateEvent.setExecutionStatus(taskEvent.getState());
+        stateEvent.setType(StateEventType.TASK_STATE_CHANGE);
+        workflowExecuteThreadPool.submitStateEvent(stateEvent);
+
+    }
+
+    public void sendAckToWorker(TaskEvent taskEvent) {
+        TaskExecuteResponseAckCommand taskExecuteResponseAckCommand =
+            new 
TaskExecuteResponseAckCommand(ExecutionStatus.SUCCESS.getCode(), 
taskEvent.getTaskInstanceId());
+        
taskEvent.getChannel().writeAndFlush(taskExecuteResponseAckCommand.convert2Command());
+    }
+
+    @Override
+    public TaskEventType getHandleEventType() {
+        return TaskEventType.RESULT;
+    }
+}
diff --git 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/TaskRunningEventHandler.java
 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/TaskRunningEventHandler.java
new file mode 100644
index 0000000000..31152973a2
--- /dev/null
+++ 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/TaskRunningEventHandler.java
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.server.master.event;
+
+import org.apache.dolphinscheduler.common.enums.StateEventType;
+import org.apache.dolphinscheduler.common.enums.TaskEventType;
+import org.apache.dolphinscheduler.dao.entity.TaskInstance;
+import org.apache.dolphinscheduler.dao.utils.TaskInstanceUtils;
+import org.apache.dolphinscheduler.plugin.task.api.enums.ExecutionStatus;
+import org.apache.dolphinscheduler.remote.command.TaskExecuteRunningAckCommand;
+import 
org.apache.dolphinscheduler.server.master.cache.ProcessInstanceExecCacheManager;
+import org.apache.dolphinscheduler.server.master.processor.queue.TaskEvent;
+import 
org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteRunnable;
+import 
org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteThreadPool;
+import org.apache.dolphinscheduler.service.process.ProcessService;
+
+import java.util.Optional;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Component;
+
+@Component
+public class TaskRunningEventHandler implements TaskEventHandler {
+    private final Logger logger = 
LoggerFactory.getLogger(TaskRunningEventHandler.class);
+
+    @Autowired
+    private ProcessInstanceExecCacheManager processInstanceExecCacheManager;
+
+    @Autowired
+    private WorkflowExecuteThreadPool workflowExecuteThreadPool;
+
+    @Autowired
+    private ProcessService processService;
+
+    @Override
+    public void handleTaskEvent(TaskEvent taskEvent) throws 
TaskEventHandleError {
+        int taskInstanceId = taskEvent.getTaskInstanceId();
+        int processInstanceId = taskEvent.getProcessInstanceId();
+
+        WorkflowExecuteRunnable workflowExecuteRunnable =
+            
this.processInstanceExecCacheManager.getByProcessInstanceId(processInstanceId);
+        if (workflowExecuteRunnable == null) {
+            sendAckToWorker(taskEvent);
+            throw new TaskEventHandleError(
+                "Handle task running event error, cannot find related workflow 
instance from cache, will discard this event");
+        }
+        Optional<TaskInstance> taskInstanceOptional = 
workflowExecuteRunnable.getTaskInstance(taskInstanceId);
+        if (!taskInstanceOptional.isPresent()) {
+            sendAckToWorker(taskEvent);
+            throw new TaskEventHandleError(
+                "Handle running event error, cannot find the taskInstance from 
cache, will discord this event");
+        }
+        TaskInstance taskInstance = taskInstanceOptional.get();
+        if (taskInstance.getState().typeIsFinished()) {
+            sendAckToWorker(taskEvent);
+            throw new TaskEventHandleError(
+                "Handle task running event error, this task instance is 
already finished, this event is delay, will discard this event");
+        }
+
+        TaskInstance oldTaskInstance = new TaskInstance();
+        TaskInstanceUtils.copyTaskInstance(taskInstance, oldTaskInstance);
+        try {
+            taskInstance.setState(taskEvent.getState());
+            taskInstance.setStartTime(taskEvent.getStartTime());
+            taskInstance.setHost(taskEvent.getWorkerAddress());
+            taskInstance.setLogPath(taskEvent.getLogPath());
+            taskInstance.setExecutePath(taskEvent.getExecutePath());
+            taskInstance.setPid(taskEvent.getProcessId());
+            taskInstance.setAppLink(taskEvent.getAppIds());
+            if (!processService.updateTaskInstance(taskInstance)) {
+                throw new TaskEventHandleError("Handle task running event 
error, update taskInstance to db failed");
+            }
+            sendAckToWorker(taskEvent);
+        } catch (Exception ex) {
+            TaskInstanceUtils.copyTaskInstance(oldTaskInstance, taskInstance);
+            if (ex instanceof TaskEventHandleError) {
+                throw ex;
+            }
+            throw new TaskEventHandleError("Handle task running event error, 
update taskInstance to db failed", ex);
+        }
+
+        StateEvent stateEvent = new StateEvent();
+        stateEvent.setProcessInstanceId(taskEvent.getProcessInstanceId());
+        stateEvent.setTaskInstanceId(taskEvent.getTaskInstanceId());
+        stateEvent.setExecutionStatus(taskEvent.getState());
+        stateEvent.setType(StateEventType.TASK_STATE_CHANGE);
+        workflowExecuteThreadPool.submitStateEvent(stateEvent);
+    }
+
+    private void sendAckToWorker(TaskEvent taskEvent) {
+        // If event handle success, send ack to worker to otherwise the worker 
will retry this event
+        TaskExecuteRunningAckCommand taskExecuteRunningAckCommand =
+            new 
TaskExecuteRunningAckCommand(ExecutionStatus.SUCCESS.getCode(), 
taskEvent.getTaskInstanceId());
+        
taskEvent.getChannel().writeAndFlush(taskExecuteRunningAckCommand.convert2Command());
+    }
+
+    @Override
+    public TaskEventType getHandleEventType() {
+        return TaskEventType.RUNNING;
+    }
+}
diff --git 
a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/Event.java
 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/WorkflowEvent.java
similarity index 75%
copy from 
dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/Event.java
copy to 
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/WorkflowEvent.java
index 26c3a3beab..5d5bc2eb32 100644
--- 
a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/Event.java
+++ 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/WorkflowEvent.java
@@ -15,12 +15,17 @@
  * limitations under the License.
  */
 
-package org.apache.dolphinscheduler.common.enums;
+package org.apache.dolphinscheduler.server.master.event;
+
+import lombok.AllArgsConstructor;
+import lombok.Data;
+
+@Data
+@AllArgsConstructor
+public class WorkflowEvent {
+
+    private WorkflowEventType workflowEventType;
+
+    private int workflowInstanceId;
 
-public enum Event {
-    DISPATCH,
-    DELAY,
-    RUNNING,
-    RESULT,
-    WORKER_REJECT
 }
diff --git 
a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/Event.java
 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/WorkflowEventHandleError.java
similarity index 71%
copy from 
dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/Event.java
copy to 
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/WorkflowEventHandleError.java
index 26c3a3beab..651f714037 100644
--- 
a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/Event.java
+++ 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/WorkflowEventHandleError.java
@@ -15,12 +15,15 @@
  * limitations under the License.
  */
 
-package org.apache.dolphinscheduler.common.enums;
+package org.apache.dolphinscheduler.server.master.event;
 
-public enum Event {
-    DISPATCH,
-    DELAY,
-    RUNNING,
-    RESULT,
-    WORKER_REJECT
+public class WorkflowEventHandleError extends Exception {
+
+    public WorkflowEventHandleError(String message) {
+        super(message);
+    }
+
+    public WorkflowEventHandleError(String message, Throwable throwable) {
+        super(message, throwable);
+    }
 }
diff --git 
a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/Event.java
 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/WorkflowEventHandleException.java
similarity index 70%
copy from 
dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/Event.java
copy to 
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/WorkflowEventHandleException.java
index 26c3a3beab..c9b84fd10d 100644
--- 
a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/Event.java
+++ 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/WorkflowEventHandleException.java
@@ -15,12 +15,15 @@
  * limitations under the License.
  */
 
-package org.apache.dolphinscheduler.common.enums;
+package org.apache.dolphinscheduler.server.master.event;
 
-public enum Event {
-    DISPATCH,
-    DELAY,
-    RUNNING,
-    RESULT,
-    WORKER_REJECT
+public class WorkflowEventHandleException extends Exception {
+
+    public WorkflowEventHandleException(String message) {
+        super(message);
+    }
+
+    public WorkflowEventHandleException(String message, Throwable throwable) {
+        super(message, throwable);
+    }
 }
diff --git 
a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/Event.java
 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/WorkflowEventHandler.java
similarity index 59%
copy from 
dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/Event.java
copy to 
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/WorkflowEventHandler.java
index 26c3a3beab..600d321566 100644
--- 
a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/Event.java
+++ 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/WorkflowEventHandler.java
@@ -15,12 +15,17 @@
  * limitations under the License.
  */
 
-package org.apache.dolphinscheduler.common.enums;
+package org.apache.dolphinscheduler.server.master.event;
 
-public enum Event {
-    DISPATCH,
-    DELAY,
-    RUNNING,
-    RESULT,
-    WORKER_REJECT
+public interface WorkflowEventHandler {
+
+    /**
+     * Handle a workflow event,
+     *
+     * @throws WorkflowEventHandleError     if this exception happen, means 
the event is broken, need to drop this event.
+     * @throws WorkflowEventHandleException if this exception happen, means we 
need to retry this event.
+     */
+    void handleWorkflowEvent(WorkflowEvent workflowEvent) throws 
WorkflowEventHandleError, WorkflowEventHandleException;
+
+    WorkflowEventType getHandleWorkflowEventType();
 }
diff --git 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/WorkflowEventQueue.java
 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/WorkflowEventQueue.java
new file mode 100644
index 0000000000..86c8b90cfa
--- /dev/null
+++ 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/WorkflowEventQueue.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.server.master.event;
+
+import java.util.concurrent.LinkedBlockingQueue;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.stereotype.Component;
+
+@Component
+public class WorkflowEventQueue {
+
+    private final Logger logger = 
LoggerFactory.getLogger(WorkflowEventQueue.class);
+
+    private static final LinkedBlockingQueue<WorkflowEvent> workflowEventQueue 
= new LinkedBlockingQueue<>();
+
+    /**
+     * Add a workflow event.
+     */
+    public void addEvent(WorkflowEvent workflowEvent) {
+        workflowEventQueue.add(workflowEvent);
+        logger.info("Added workflow event to workflowEvent queue, event: {}", 
workflowEvent);
+    }
+
+    /**
+     * Pool the head of the workflow event queue and wait an workflow event.
+     */
+    public WorkflowEvent poolEvent() throws InterruptedException {
+        return workflowEventQueue.take();
+    }
+
+}
diff --git 
a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/Event.java
 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/WorkflowEventType.java
similarity index 85%
rename from 
dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/Event.java
rename to 
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/WorkflowEventType.java
index 26c3a3beab..b0f5f09e30 100644
--- 
a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/Event.java
+++ 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/WorkflowEventType.java
@@ -15,12 +15,11 @@
  * limitations under the License.
  */
 
-package org.apache.dolphinscheduler.common.enums;
+package org.apache.dolphinscheduler.server.master.event;
+
+public enum WorkflowEventType {
+
+    START_WORKFLOW,
+    ;
 
-public enum Event {
-    DISPATCH,
-    DELAY,
-    RUNNING,
-    RESULT,
-    WORKER_REJECT
 }
diff --git 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/WorkflowStartEventHandler.java
 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/WorkflowStartEventHandler.java
new file mode 100644
index 0000000000..b4d9fc1f85
--- /dev/null
+++ 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/WorkflowStartEventHandler.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.server.master.event;
+
+import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
+import 
org.apache.dolphinscheduler.server.master.cache.ProcessInstanceExecCacheManager;
+import 
org.apache.dolphinscheduler.server.master.metrics.ProcessInstanceMetrics;
+import 
org.apache.dolphinscheduler.server.master.runner.StateWheelExecuteThread;
+import 
org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteRunnable;
+import 
org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteThreadPool;
+import org.apache.dolphinscheduler.server.master.runner.WorkflowSubmitStatue;
+
+import java.util.concurrent.CompletableFuture;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Component;
+
+@Component
+public class WorkflowStartEventHandler implements WorkflowEventHandler {
+
+    private final Logger logger = 
LoggerFactory.getLogger(WorkflowStartEventHandler.class);
+
+    @Autowired
+    private ProcessInstanceExecCacheManager processInstanceExecCacheManager;
+
+    @Autowired
+    private StateWheelExecuteThread stateWheelExecuteThread;
+
+    @Autowired
+    private WorkflowExecuteThreadPool workflowExecuteThreadPool;
+
+    @Autowired
+    private WorkflowEventQueue workflowEventQueue;
+
+    @Override
+    public void handleWorkflowEvent(WorkflowEvent workflowEvent) throws 
WorkflowEventHandleError {
+        logger.info("Handle workflow start event, begin to start a workflow, 
event: {}", workflowEvent);
+        WorkflowExecuteRunnable workflowExecuteRunnable =
+            
processInstanceExecCacheManager.getByProcessInstanceId(workflowEvent.getWorkflowInstanceId());
+        if (workflowExecuteRunnable == null) {
+            throw new WorkflowEventHandleError(
+                "The workflow start event is invalid, cannot find the workflow 
instance from cache");
+        }
+        ProcessInstance processInstance = 
workflowExecuteRunnable.getProcessInstance();
+
+        ProcessInstanceMetrics.incProcessInstanceSubmit();
+        CompletableFuture<WorkflowSubmitStatue> workflowSubmitFuture =
+            CompletableFuture.supplyAsync(workflowExecuteRunnable::call, 
workflowExecuteThreadPool);
+        workflowSubmitFuture.thenAccept(workflowSubmitStatue -> {
+            if (WorkflowSubmitStatue.SUCCESS == workflowSubmitStatue) {
+                // submit failed will resend the event to workflow event queue
+                logger.info("Success submit the workflow instance");
+                if (processInstance.getTimeout() > 0) {
+                    
stateWheelExecuteThread.addProcess4TimeoutCheck(processInstance);
+                }
+            } else {
+                logger.error("Failed to submit the workflow instance, will 
resend the workflow start event: {}",
+                             workflowEvent);
+                workflowEventQueue.addEvent(new 
WorkflowEvent(WorkflowEventType.START_WORKFLOW,
+                                                              
processInstance.getId()));
+            }
+        });
+    }
+
+    @Override
+    public WorkflowEventType getHandleWorkflowEventType() {
+        return WorkflowEventType.START_WORKFLOW;
+    }
+}
diff --git 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/WorkflowStateEventHandler.java
 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/WorkflowStateEventHandler.java
index 37d8ceb1da..3abdd879bb 100644
--- 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/WorkflowStateEventHandler.java
+++ 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/WorkflowStateEventHandler.java
@@ -41,15 +41,14 @@ public class WorkflowStateEventHandler implements 
StateEventHandler {
         ProcessInstance processInstance = 
workflowExecuteRunnable.getProcessInstance();
         ProcessDefinition processDefinition = 
processInstance.getProcessDefinition();
 
-        logger.info("process:{} state {} change to {}",
-            processInstance.getId(),
-            processInstance.getState(),
-            stateEvent.getExecutionStatus());
+        logger.info(
+            "Handle workflow instance state event, the current workflow 
instance state {} will be changed to {}",
+            processInstance.getState(), stateEvent.getExecutionStatus());
 
         if (stateEvent.getExecutionStatus() == ExecutionStatus.STOP) {
             // serial wait execution type needs to wake up the waiting process
             if (processDefinition.getExecutionType().typeIsSerialWait() || 
processDefinition.getExecutionType()
-                .typeIsSerialPriority()) {
+                                                                               
             .typeIsSerialPriority()) {
                 workflowExecuteRunnable.endProcess();
                 return true;
             }
diff --git 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/StateEventProcessor.java
 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/StateEventProcessor.java
index f90277d0eb..18afd11ae5 100644
--- 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/StateEventProcessor.java
+++ 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/StateEventProcessor.java
@@ -17,7 +17,6 @@
 
 package org.apache.dolphinscheduler.server.master.processor;
 
-import org.apache.dolphinscheduler.server.master.event.StateEvent;
 import org.apache.dolphinscheduler.common.enums.StateEventType;
 import org.apache.dolphinscheduler.common.utils.JSONUtils;
 import org.apache.dolphinscheduler.common.utils.LoggerUtils;
@@ -26,6 +25,7 @@ import org.apache.dolphinscheduler.remote.command.Command;
 import org.apache.dolphinscheduler.remote.command.CommandType;
 import org.apache.dolphinscheduler.remote.command.StateEventChangeCommand;
 import org.apache.dolphinscheduler.remote.processor.NettyRequestProcessor;
+import org.apache.dolphinscheduler.server.master.event.StateEvent;
 import 
org.apache.dolphinscheduler.server.master.processor.queue.StateEventResponseService;
 
 import org.slf4j.Logger;
@@ -67,11 +67,12 @@ public class StateEventProcessor implements 
NettyRequestProcessor {
         stateEvent.setType(type);
 
         try {
-            
LoggerUtils.setWorkflowAndTaskInstanceIDMDC(stateEvent.getProcessInstanceId(), 
stateEvent.getTaskInstanceId());
+            
LoggerUtils.setWorkflowAndTaskInstanceIDMDC(stateEvent.getProcessInstanceId(),
+                stateEvent.getTaskInstanceId());
 
-            logger.info("Received state event change command, event: {}", 
stateEvent);
-            stateEventResponseService.addResponse(stateEvent);
-        }finally {
+            logger.info("Received state change command, event: {}", 
stateEvent);
+            stateEventResponseService.addStateChangeEvent(stateEvent);
+        } finally {
             LoggerUtils.removeWorkflowAndTaskInstanceIdMDC();
         }
 
diff --git 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/StateEventResponseService.java
 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/StateEventResponseService.java
index bdf80bbee9..80c90ee1b7 100644
--- 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/StateEventResponseService.java
+++ 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/StateEventResponseService.java
@@ -42,9 +42,6 @@ import org.springframework.stereotype.Component;
 
 import io.netty.channel.Channel;
 
-/**
- * task manager
- */
 @Component
 public class StateEventResponseService {
 
@@ -96,7 +93,7 @@ public class StateEventResponseService {
     /**
      * put task to attemptQueue
      */
-    public void addResponse(StateEvent stateEvent) {
+    public void addStateChangeEvent(StateEvent stateEvent) {
         try {
             // check the event is validated
             eventQueue.put(stateEvent);
@@ -154,6 +151,7 @@ public class StateEventResponseService {
             }
 
             WorkflowExecuteRunnable workflowExecuteThread = 
this.processInstanceExecCacheManager.getByProcessInstanceId(stateEvent.getProcessInstanceId());
+            // We will refresh the task instance status first, if the refresh 
failed the event will not be removed
             switch (stateEvent.getType()) {
                 case TASK_STATE_CHANGE:
                     
workflowExecuteThread.refreshTaskInstance(stateEvent.getTaskInstanceId());
diff --git 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskEvent.java
 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskEvent.java
index 842bcaf333..248d253739 100644
--- 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskEvent.java
+++ 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskEvent.java
@@ -17,7 +17,7 @@
 
 package org.apache.dolphinscheduler.server.master.processor.queue;
 
-import org.apache.dolphinscheduler.common.enums.Event;
+import org.apache.dolphinscheduler.common.enums.TaskEventType;
 import org.apache.dolphinscheduler.plugin.task.api.enums.ExecutionStatus;
 import org.apache.dolphinscheduler.remote.command.TaskExecuteResponseCommand;
 import org.apache.dolphinscheduler.remote.command.TaskExecuteRunningCommand;
@@ -83,7 +83,7 @@ public class TaskEvent {
     /**
      * ack / response
      */
-    private Event event;
+    private TaskEventType event;
 
     /**
      * varPool
@@ -102,7 +102,7 @@ public class TaskEvent {
         event.setProcessInstanceId(processInstanceId);
         event.setTaskInstanceId(taskInstanceId);
         event.setWorkerAddress(workerAddress);
-        event.setEvent(Event.DISPATCH);
+        event.setEvent(TaskEventType.DISPATCH);
         return event;
     }
 
@@ -116,7 +116,7 @@ public class TaskEvent {
         event.setLogPath(command.getLogPath());
         event.setChannel(channel);
         event.setWorkerAddress(ChannelUtils.toAddress(channel).getAddress());
-        event.setEvent(Event.RUNNING);
+        event.setEvent(TaskEventType.RUNNING);
         return event;
     }
 
@@ -134,7 +134,7 @@ public class TaskEvent {
         event.setVarPool(command.getVarPool());
         event.setChannel(channel);
         event.setWorkerAddress(ChannelUtils.toAddress(channel).getAddress());
-        event.setEvent(Event.RESULT);
+        event.setEvent(TaskEventType.RESULT);
         return event;
     }
 
@@ -143,7 +143,7 @@ public class TaskEvent {
         event.setTaskInstanceId(command.getTaskInstanceId());
         event.setProcessInstanceId(command.getProcessInstanceId());
         event.setChannel(channel);
-        event.setEvent(Event.WORKER_REJECT);
+        event.setEvent(TaskEventType.WORKER_REJECT);
         return event;
     }
 }
diff --git 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskExecuteRunnable.java
 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskExecuteRunnable.java
index 9fc96e7564..3c05671a10 100644
--- 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskExecuteRunnable.java
+++ 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskExecuteRunnable.java
@@ -17,29 +17,18 @@
 
 package org.apache.dolphinscheduler.server.master.processor.queue;
 
-import org.apache.dolphinscheduler.common.enums.Event;
-import org.apache.dolphinscheduler.common.enums.StateEventType;
+import org.apache.dolphinscheduler.common.enums.TaskEventType;
 import org.apache.dolphinscheduler.common.utils.LoggerUtils;
-import org.apache.dolphinscheduler.dao.entity.TaskInstance;
-import org.apache.dolphinscheduler.plugin.task.api.enums.ExecutionStatus;
-import 
org.apache.dolphinscheduler.remote.command.TaskExecuteResponseAckCommand;
-import org.apache.dolphinscheduler.remote.command.TaskExecuteRunningAckCommand;
-import org.apache.dolphinscheduler.remote.command.TaskRecallAckCommand;
-import 
org.apache.dolphinscheduler.server.master.cache.ProcessInstanceExecCacheManager;
-import org.apache.dolphinscheduler.server.master.event.StateEvent;
-import 
org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteRunnable;
-import 
org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteThreadPool;
-import org.apache.dolphinscheduler.server.utils.DataQualityResultOperator;
-import org.apache.dolphinscheduler.service.process.ProcessService;
+import org.apache.dolphinscheduler.server.master.event.TaskEventHandleError;
+import 
org.apache.dolphinscheduler.server.master.event.TaskEventHandleException;
+import org.apache.dolphinscheduler.server.master.event.TaskEventHandler;
 
-import java.util.Optional;
+import java.util.Map;
 import java.util.concurrent.ConcurrentLinkedQueue;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import io.netty.channel.Channel;
-
 /**
  * task execute thread
  */
@@ -51,34 +40,37 @@ public class TaskExecuteRunnable implements Runnable {
 
     private final ConcurrentLinkedQueue<TaskEvent> events = new 
ConcurrentLinkedQueue<>();
 
-    private ProcessService processService;
-
-    private WorkflowExecuteThreadPool workflowExecuteThreadPool;
-
-    private ProcessInstanceExecCacheManager processInstanceExecCacheManager;
-
-    private DataQualityResultOperator dataQualityResultOperator;
+    private final Map<TaskEventType, TaskEventHandler> taskEventHandlerMap;
 
-    public TaskExecuteRunnable(int processInstanceId, ProcessService 
processService, WorkflowExecuteThreadPool workflowExecuteThreadPool,
-                               ProcessInstanceExecCacheManager 
processInstanceExecCacheManager, DataQualityResultOperator 
dataQualityResultOperator) {
+    public TaskExecuteRunnable(int processInstanceId, Map<TaskEventType, 
TaskEventHandler> taskEventHandlerMap) {
         this.processInstanceId = processInstanceId;
-        this.processService = processService;
-        this.workflowExecuteThreadPool = workflowExecuteThreadPool;
-        this.processInstanceExecCacheManager = processInstanceExecCacheManager;
-        this.dataQualityResultOperator = dataQualityResultOperator;
+        this.taskEventHandlerMap = taskEventHandlerMap;
     }
 
     @Override
     public void run() {
         while (!this.events.isEmpty()) {
+            // we handle the task event belongs to one task serial, so if the 
event comes in wrong order,
             TaskEvent event = this.events.peek();
             try {
                 
LoggerUtils.setWorkflowAndTaskInstanceIDMDC(event.getProcessInstanceId(), 
event.getTaskInstanceId());
-                persist(event);
-            } catch (Exception e) {
-                logger.error("persist task event error, event:{}", event, e);
+                logger.info("Handle task event begin: {}", event);
+                
taskEventHandlerMap.get(event.getEvent()).handleTaskEvent(event);
+                events.remove(event);
+                logger.info("Handle task event finished: {}", event);
+            } catch (TaskEventHandleException taskEventHandleException) {
+                // we don't need to resubmit this event, since the worker will 
resubmit this event
+                logger.error("Handle task event failed, this event will be 
retry later, event: {}", event,
+                    taskEventHandleException);
+            } catch (TaskEventHandleError taskEventHandleError) {
+                logger.error("Handle task event error, this event will be 
removed, event: {}", event,
+                    taskEventHandleError);
+                events.remove(event);
+            } catch (Exception unknownException) {
+                logger.error("Handle task event error, get a unknown 
exception, this event will be removed, event: {}",
+                    event, unknownException);
+                events.remove(event);
             } finally {
-                this.events.remove(event);
                 LoggerUtils.removeWorkflowAndTaskInstanceIdMDC();
             }
         }
@@ -109,156 +101,4 @@ public class TaskExecuteRunnable implements Runnable {
         return this.events.add(event);
     }
 
-    /**
-     * persist task event
-     *
-     * @param taskEvent taskEvent
-     */
-    private void persist(TaskEvent taskEvent) throws Exception {
-        Event event = taskEvent.getEvent();
-        int taskInstanceId = taskEvent.getTaskInstanceId();
-        int processInstanceId = taskEvent.getProcessInstanceId();
-
-        Optional<TaskInstance> taskInstance;
-        WorkflowExecuteRunnable workflowExecuteRunnable =
-            
this.processInstanceExecCacheManager.getByProcessInstanceId(processInstanceId);
-        if (workflowExecuteRunnable != null && 
workflowExecuteRunnable.checkTaskInstanceById(taskInstanceId)) {
-            taskInstance = 
workflowExecuteRunnable.getTaskInstance(taskInstanceId);
-        } else {
-            taskInstance = 
Optional.ofNullable(processService.findTaskInstanceById(taskInstanceId));
-        }
-
-        boolean needToSendEvent = true;
-        switch (event) {
-            case DISPATCH:
-                needToSendEvent = handleDispatchEvent(taskEvent, taskInstance);
-                // dispatch event do not need to submit state event
-                break;
-            case DELAY:
-            case RUNNING:
-                needToSendEvent = handleRunningEvent(taskEvent, taskInstance);
-                break;
-            case RESULT:
-                needToSendEvent = handleResultEvent(taskEvent, taskInstance);
-                break;
-            case WORKER_REJECT:
-                needToSendEvent =
-                    handleWorkerRejectEvent(taskEvent.getChannel(), 
taskInstance, workflowExecuteRunnable);
-                break;
-            default:
-                throw new IllegalArgumentException("invalid event type : " + 
event);
-        }
-        if (!needToSendEvent) {
-            logger.info("Handle task event: {} success, there is no need to 
send a StateEvent", taskEvent);
-            return;
-        }
-
-        StateEvent stateEvent = new StateEvent();
-        stateEvent.setProcessInstanceId(taskEvent.getProcessInstanceId());
-        stateEvent.setTaskInstanceId(taskEvent.getTaskInstanceId());
-        stateEvent.setExecutionStatus(taskEvent.getState());
-        stateEvent.setType(StateEventType.TASK_STATE_CHANGE);
-        workflowExecuteThreadPool.submitStateEvent(stateEvent);
-    }
-
-    /**
-     * handle dispatch event
-     */
-    private boolean handleDispatchEvent(TaskEvent taskEvent, 
Optional<TaskInstance> taskInstanceOptional) {
-        if (!taskInstanceOptional.isPresent()) {
-            logger.error("taskInstance is null");
-            return false;
-        }
-        TaskInstance taskInstance = taskInstanceOptional.get();
-        if (taskInstance.getState() != ExecutionStatus.SUBMITTED_SUCCESS) {
-            return false;
-        }
-        taskInstance.setState(ExecutionStatus.DISPATCH);
-        taskInstance.setHost(taskEvent.getWorkerAddress());
-        processService.saveTaskInstance(taskInstance);
-        return true;
-    }
-
-    /**
-     * handle running event
-     */
-    private boolean handleRunningEvent(TaskEvent taskEvent, 
Optional<TaskInstance> taskInstanceOptional) {
-        Channel channel = taskEvent.getChannel();
-        if (taskInstanceOptional.isPresent()) {
-            TaskInstance taskInstance = taskInstanceOptional.get();
-            if (taskInstance.getState().typeIsFinished()) {
-                logger.warn("task is finish, running event is meaningless, 
taskInstanceId:{}, state:{}",
-                    taskInstance.getId(),
-                    taskInstance.getState());
-                return false;
-            } else {
-                taskInstance.setState(taskEvent.getState());
-                taskInstance.setStartTime(taskEvent.getStartTime());
-                taskInstance.setHost(taskEvent.getWorkerAddress());
-                taskInstance.setLogPath(taskEvent.getLogPath());
-                taskInstance.setExecutePath(taskEvent.getExecutePath());
-                taskInstance.setPid(taskEvent.getProcessId());
-                taskInstance.setAppLink(taskEvent.getAppIds());
-                processService.saveTaskInstance(taskInstance);
-            }
-        }
-        // if taskInstance is null (maybe deleted) or finish. retry will be 
meaningless . so ack success
-        // send ack to worker
-        TaskExecuteRunningAckCommand taskExecuteRunningAckCommand =
-            new 
TaskExecuteRunningAckCommand(ExecutionStatus.SUCCESS.getCode(), 
taskEvent.getTaskInstanceId());
-        channel.writeAndFlush(taskExecuteRunningAckCommand.convert2Command());
-        return true;
-    }
-
-    /**
-     * handle result event
-     */
-    private boolean handleResultEvent(TaskEvent taskEvent, 
Optional<TaskInstance> taskInstanceOptional) {
-        Channel channel = taskEvent.getChannel();
-        if (taskInstanceOptional.isPresent()) {
-            TaskInstance taskInstance = taskInstanceOptional.get();
-            if (taskInstance.getState().typeIsFinished()) {
-                logger.warn("The current taskInstance has already been 
finished, taskEvent: {}", taskEvent);
-                return false;
-            }
-
-            dataQualityResultOperator.operateDqExecuteResult(taskEvent, 
taskInstance);
-
-            taskInstance.setStartTime(taskEvent.getStartTime());
-            taskInstance.setHost(taskEvent.getWorkerAddress());
-            taskInstance.setLogPath(taskEvent.getLogPath());
-            taskInstance.setExecutePath(taskEvent.getExecutePath());
-            taskInstance.setPid(taskEvent.getProcessId());
-            taskInstance.setAppLink(taskEvent.getAppIds());
-            taskInstance.setState(taskEvent.getState());
-            taskInstance.setEndTime(taskEvent.getEndTime());
-            taskInstance.setVarPool(taskEvent.getVarPool());
-            processService.changeOutParam(taskInstance);
-            processService.saveTaskInstance(taskInstance);
-        }
-        // if taskInstance is null (maybe deleted) . retry will be meaningless 
. so response success
-        TaskExecuteResponseAckCommand taskExecuteResponseAckCommand =
-            new 
TaskExecuteResponseAckCommand(ExecutionStatus.SUCCESS.getCode(), 
taskEvent.getTaskInstanceId());
-        channel.writeAndFlush(taskExecuteResponseAckCommand.convert2Command());
-        return true;
-    }
-
-    /**
-     * handle result event
-     */
-    private boolean handleWorkerRejectEvent(Channel channel,
-                                            Optional<TaskInstance> 
taskInstanceOptional,
-                                            WorkflowExecuteRunnable 
executeThread) throws Exception {
-        TaskInstance taskInstance =
-            taskInstanceOptional.orElseThrow(() -> new 
RuntimeException("taskInstance is null"));
-        if (executeThread != null) {
-            executeThread.resubmit(taskInstance.getTaskCode());
-        }
-        if (channel != null) {
-            TaskRecallAckCommand taskRecallAckCommand =
-                new TaskRecallAckCommand(ExecutionStatus.SUCCESS.getCode(), 
taskInstance.getId());
-            channel.writeAndFlush(taskRecallAckCommand.convert2Command());
-        }
-        return true;
-    }
 }
diff --git 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskExecuteThreadPool.java
 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskExecuteThreadPool.java
index ae578d4c03..8d5f2fd3db 100644
--- 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskExecuteThreadPool.java
+++ 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskExecuteThreadPool.java
@@ -17,12 +17,14 @@
 
 package org.apache.dolphinscheduler.server.master.processor.queue;
 
+import org.apache.dolphinscheduler.common.enums.TaskEventType;
 import 
org.apache.dolphinscheduler.server.master.cache.ProcessInstanceExecCacheManager;
 import org.apache.dolphinscheduler.server.master.config.MasterConfig;
-import 
org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteThreadPool;
-import org.apache.dolphinscheduler.server.utils.DataQualityResultOperator;
-import org.apache.dolphinscheduler.service.process.ProcessService;
+import org.apache.dolphinscheduler.server.master.event.TaskEventHandler;
 
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 
 import javax.annotation.PostConstruct;
@@ -48,21 +50,10 @@ public class TaskExecuteThreadPool extends 
ThreadPoolTaskExecutor {
     @Autowired
     private ProcessInstanceExecCacheManager processInstanceExecCacheManager;
 
-    /**
-     * process service
-     */
-    @Autowired
-    private ProcessService processService;
-
-    /**
-     * data quality result operator
-     */
     @Autowired
-    private DataQualityResultOperator dataQualityResultOperator;
+    private List<TaskEventHandler> taskEventHandlerList;
 
-
-    @Autowired
-    private WorkflowExecuteThreadPool workflowExecuteThreadPool;
+    private Map<TaskEventType, TaskEventHandler> taskEventHandlerMap = new 
HashMap<>();
 
     /**
      * task event thread map
@@ -75,6 +66,8 @@ public class TaskExecuteThreadPool extends 
ThreadPoolTaskExecutor {
         this.setThreadNamePrefix("Task-Execute-Thread-");
         this.setMaxPoolSize(masterConfig.getExecThreads());
         this.setCorePoolSize(masterConfig.getExecThreads());
+        taskEventHandlerList.forEach(
+            taskEventHandler -> 
taskEventHandlerMap.put(taskEventHandler.getHandleEventType(), 
taskEventHandler));
     }
 
     public void submitTaskEvent(TaskEvent taskEvent) {
@@ -83,11 +76,7 @@ public class TaskExecuteThreadPool extends 
ThreadPoolTaskExecutor {
             return;
         }
         TaskExecuteRunnable taskExecuteRunnable = 
taskExecuteThreadMap.computeIfAbsent(taskEvent.getProcessInstanceId(),
-            (processInstanceId) -> new TaskExecuteRunnable(processInstanceId,
-                processService,
-                workflowExecuteThreadPool,
-                processInstanceExecCacheManager,
-                dataQualityResultOperator));
+            (processInstanceId) -> new TaskExecuteRunnable(processInstanceId, 
taskEventHandlerMap));
         taskExecuteRunnable.addEvent(taskEvent);
     }
 
diff --git 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerService.java
 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerBootstrap.java
similarity index 57%
rename from 
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerService.java
rename to 
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerBootstrap.java
index e88d84312c..870cc1b24c 100644
--- 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerService.java
+++ 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerBootstrap.java
@@ -30,6 +30,9 @@ import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
 import 
org.apache.dolphinscheduler.server.master.cache.ProcessInstanceExecCacheManager;
 import org.apache.dolphinscheduler.server.master.config.MasterConfig;
 import 
org.apache.dolphinscheduler.server.master.dispatch.executor.NettyExecutorManager;
+import org.apache.dolphinscheduler.server.master.event.WorkflowEvent;
+import org.apache.dolphinscheduler.server.master.event.WorkflowEventQueue;
+import org.apache.dolphinscheduler.server.master.event.WorkflowEventType;
 import org.apache.dolphinscheduler.server.master.exception.MasterException;
 import org.apache.dolphinscheduler.server.master.metrics.MasterServerMetrics;
 import 
org.apache.dolphinscheduler.server.master.metrics.ProcessInstanceMetrics;
@@ -43,9 +46,7 @@ import org.apache.commons.collections4.CollectionUtils;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
-import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.ThreadPoolExecutor;
 
 import org.slf4j.Logger;
@@ -53,18 +54,13 @@ import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Service;
 
-import lombok.NonNull;
-
 /**
  * Master scheduler thread, this thread will consume the commands from 
database and trigger processInstance executed.
  */
 @Service
-public class MasterSchedulerService extends BaseDaemonThread {
+public class MasterSchedulerBootstrap extends BaseDaemonThread {
 
-    /**
-     * logger of MasterSchedulerService
-     */
-    private static final Logger logger = 
LoggerFactory.getLogger(MasterSchedulerService.class);
+    private static final Logger logger = 
LoggerFactory.getLogger(MasterSchedulerBootstrap.class);
 
     @Autowired
     private ProcessService processService;
@@ -83,12 +79,6 @@ public class MasterSchedulerService extends BaseDaemonThread 
{
      */
     private ThreadPoolExecutor masterPrepareExecService;
 
-    /**
-     * workflow exec service
-     */
-    @Autowired
-    private WorkflowExecuteThreadPool workflowExecuteThreadPool;
-
     @Autowired
     private ProcessInstanceExecCacheManager processInstanceExecCacheManager;
 
@@ -98,13 +88,15 @@ public class MasterSchedulerService extends 
BaseDaemonThread {
     @Autowired
     private CuringParamsService curingGlobalParamsService;
 
-    private final LinkedBlockingQueue<ProcessInstance> 
submitFailedProcessInstances = new LinkedBlockingQueue<>();
+    @Autowired
+    private WorkflowEventQueue workflowEventQueue;
 
-    private Thread failedProcessInstanceResubmitThread;
+    @Autowired
+    private WorkflowEventLooper workflowEventLooper;
 
     private String masterAddress;
 
-    protected MasterSchedulerService() {
+    protected MasterSchedulerBootstrap() {
         super("MasterCommandLoopThread");
     }
 
@@ -114,23 +106,19 @@ public class MasterSchedulerService extends 
BaseDaemonThread {
     public void init() {
         this.masterPrepareExecService = (ThreadPoolExecutor) 
ThreadUtils.newDaemonFixedThreadExecutor("MasterPreExecThread", 
masterConfig.getPreExecThreads());
         this.masterAddress = NetUtils.getAddr(masterConfig.getListenPort());
-        this.failedProcessInstanceResubmitThread = new 
FailedProcessInstanceResubmitThread(submitFailedProcessInstances);
-        
ProcessInstanceMetrics.registerProcessInstanceResubmitGauge(submitFailedProcessInstances::size);
     }
 
     @Override
     public synchronized void start() {
-        logger.info("Master schedule service starting..");
+        logger.info("Master schedule bootstrap starting..");
         super.start();
-        this.failedProcessInstanceResubmitThread.start();
-        logger.info("Master schedule service started...");
+        workflowEventLooper.start();
+        logger.info("Master schedule bootstrap started...");
     }
 
     public void close() {
-        logger.info("Master schedule service stopping...");
-        // these process instances will be failover, so we can safa clear here
-        submitFailedProcessInstances.clear();
-        logger.info("Master schedule service stopped...");
+        logger.info("Master schedule bootstrap stopping...");
+        logger.info("Master schedule bootstrap stopped...");
     }
 
     /**
@@ -140,15 +128,51 @@ public class MasterSchedulerService extends 
BaseDaemonThread {
     public void run() {
         while (Stopper.isRunning()) {
             try {
-                boolean isOverload = 
OSUtils.isOverload(masterConfig.getMaxCpuLoadAvg(), 
masterConfig.getReservedMemory());
+                // todo: if the workflow event queue is much, we need to 
handle the back pressure
+                boolean isOverload =
+                    OSUtils.isOverload(masterConfig.getMaxCpuLoadAvg(), 
masterConfig.getReservedMemory());
                 if (isOverload) {
                     MasterServerMetrics.incMasterOverload();
                     Thread.sleep(Constants.SLEEP_TIME_MILLIS);
                     continue;
                 }
-                scheduleWorkflow();
+                List<Command> commands = findCommands();
+                if (CollectionUtils.isEmpty(commands)) {
+                    // indicate that no command ,sleep for 1s
+                    Thread.sleep(Constants.SLEEP_TIME_MILLIS);
+                    continue;
+                }
+
+                List<ProcessInstance> processInstances = 
command2ProcessInstance(commands);
+                if (CollectionUtils.isEmpty(processInstances)) {
+                    // indicate that the command transform to processInstance 
error, sleep for 1s
+                    Thread.sleep(Constants.SLEEP_TIME_MILLIS);
+                    continue;
+                }
+                MasterServerMetrics.incMasterConsumeCommand(commands.size());
+
+                processInstances.forEach(processInstance -> {
+                    try {
+                        
LoggerUtils.setWorkflowInstanceIdMDC(processInstance.getId());
+                        if 
(processInstanceExecCacheManager.contains(processInstance.getId())) {
+                            logger.error("The workflow instance is already 
been cached, this case shouldn't be happened");
+                        }
+                        WorkflowExecuteRunnable workflowRunnable = new 
WorkflowExecuteRunnable(processInstance,
+                                                                               
                processService,
+                                                                               
                nettyExecutorManager,
+                                                                               
                processAlertManager,
+                                                                               
                masterConfig,
+                                                                               
                stateWheelExecuteThread,
+                                                                               
                curingGlobalParamsService);
+                        
processInstanceExecCacheManager.cache(processInstance.getId(), 
workflowRunnable);
+                        workflowEventQueue.addEvent(new 
WorkflowEvent(WorkflowEventType.START_WORKFLOW,
+                                                                      
processInstance.getId()));
+                    } finally {
+                        LoggerUtils.removeWorkflowInstanceIdMDC();
+                    }
+                });
             } catch (InterruptedException interruptedException) {
-                logger.warn("Master schedule service interrupted, close the 
loop", interruptedException);
+                logger.warn("Master schedule bootstrap interrupted, close the 
loop", interruptedException);
                 Thread.currentThread().interrupt();
                 break;
             } catch (Exception e) {
@@ -159,72 +183,9 @@ public class MasterSchedulerService extends 
BaseDaemonThread {
         }
     }
 
-    /**
-     * Query command from database by slot, and transform to workflow 
instance, then submit to workflowExecuteThreadPool.
-     */
-    private void scheduleWorkflow() throws InterruptedException, 
MasterException {
-        List<Command> commands = findCommands();
-        if (CollectionUtils.isEmpty(commands)) {
-            // indicate that no command ,sleep for 1s
-            Thread.sleep(Constants.SLEEP_TIME_MILLIS);
-            return;
-        }
-
-        List<ProcessInstance> processInstances = 
command2ProcessInstance(commands);
-        if (CollectionUtils.isEmpty(processInstances)) {
-            // indicate that the command transform to processInstance error, 
sleep for 1s
-            Thread.sleep(Constants.SLEEP_TIME_MILLIS);
-            return;
-        }
-        MasterServerMetrics.incMasterConsumeCommand(commands.size());
-
-        for (ProcessInstance processInstance : processInstances) {
-            submitProcessInstance(processInstance);
-        }
-    }
-
-    private void submitProcessInstance(@NonNull ProcessInstance 
processInstance) {
-        try {
-            LoggerUtils.setWorkflowInstanceIdMDC(processInstance.getId());
-            logger.info("Master schedule service starting workflow instance");
-            final WorkflowExecuteRunnable workflowExecuteRunnable = new 
WorkflowExecuteRunnable(
-                processInstance
-                , processService
-                , nettyExecutorManager
-                , processAlertManager
-                , masterConfig
-                , stateWheelExecuteThread
-                , curingGlobalParamsService);
-
-            
this.processInstanceExecCacheManager.cache(processInstance.getId(), 
workflowExecuteRunnable);
-            if (processInstance.getTimeout() > 0) {
-                
stateWheelExecuteThread.addProcess4TimeoutCheck(processInstance);
-            }
-            ProcessInstanceMetrics.incProcessInstanceSubmit();
-            CompletableFuture<WorkflowSubmitStatue> workflowSubmitFuture = 
CompletableFuture.supplyAsync(
-                workflowExecuteRunnable::call, workflowExecuteThreadPool);
-            workflowSubmitFuture.thenAccept(workflowSubmitStatue -> {
-                if (WorkflowSubmitStatue.FAILED == workflowSubmitStatue) {
-                    // submit failed
-                    
processInstanceExecCacheManager.removeByProcessInstanceId(processInstance.getId());
-                    
stateWheelExecuteThread.removeProcess4TimeoutCheck(processInstance.getId());
-                    submitFailedProcessInstances.add(processInstance);
-                }
-            });
-            logger.info("Master schedule service started workflow instance");
-
-        } catch (Exception ex) {
-            
processInstanceExecCacheManager.removeByProcessInstanceId(processInstance.getId());
-            
stateWheelExecuteThread.removeProcess4TimeoutCheck(processInstance.getId());
-            logger.info("Master submit workflow to thread pool failed, will 
remove workflow runnable from cache manager", ex);
-        } finally {
-            LoggerUtils.removeWorkflowInstanceIdMDC();
-        }
-    }
-
     private List<ProcessInstance> command2ProcessInstance(List<Command> 
commands) throws InterruptedException {
         long commandTransformStartTime = System.currentTimeMillis();
-        logger.info("Master schedule service transforming command to 
ProcessInstance, commandSize: {}", commands.size());
+        logger.info("Master schedule bootstrap transforming command to 
ProcessInstance, commandSize: {}", commands.size());
         List<ProcessInstance> processInstances = 
Collections.synchronizedList(new ArrayList<>(commands.size()));
         CountDownLatch latch = new CountDownLatch(commands.size());
         for (final Command command : commands) {
@@ -254,7 +215,7 @@ public class MasterSchedulerService extends 
BaseDaemonThread {
 
         // make sure to finish handling command each time before next scan
         latch.await();
-        logger.info("Master schedule service transformed command to 
ProcessInstance, commandSize: {}, processInstanceSize: {}",
+        logger.info("Master schedule bootstrap transformed command to 
ProcessInstance, commandSize: {}, processInstanceSize: {}",
             commands.size(), processInstances.size());
         
ProcessInstanceMetrics.recordProcessInstanceGenerateTime(System.currentTimeMillis()
 - commandTransformStartTime);
         return processInstances;
@@ -273,7 +234,7 @@ public class MasterSchedulerService extends 
BaseDaemonThread {
             int pageSize = masterConfig.getFetchCommandNum();
             final List<Command> result = 
processService.findCommandPageBySlot(pageSize, pageNumber, masterCount, 
thisMasterSlot);
             if (CollectionUtils.isNotEmpty(result)) {
-                logger.info("Master schedule service loop command success, 
command size: {}, current slot: {}, total slot size: {}",
+                logger.info("Master schedule bootstrap loop command success, 
command size: {}, current slot: {}, total slot size: {}",
                     result.size(), thisMasterSlot, masterCount);
             }
             
ProcessInstanceMetrics.recordCommandQueryTime(System.currentTimeMillis() - 
scheduleStartTime);
@@ -297,34 +258,4 @@ public class MasterSchedulerService extends 
BaseDaemonThread {
         return state;
     }
 
-    private class FailedProcessInstanceResubmitThread extends Thread {
-
-        private final LinkedBlockingQueue<ProcessInstance> 
submitFailedProcessInstances;
-
-        public 
FailedProcessInstanceResubmitThread(LinkedBlockingQueue<ProcessInstance> 
submitFailedProcessInstances) {
-            logger.info("Starting workflow resubmit thread");
-            this.submitFailedProcessInstances = submitFailedProcessInstances;
-            this.setDaemon(true);
-            this.setName("SubmitFailedProcessInstanceHandleThread");
-            logger.info("Started workflow resubmit thread");
-        }
-
-        @Override
-        public void run() {
-            while (Stopper.isRunning()) {
-                try {
-                    ProcessInstance processInstance = 
submitFailedProcessInstances.take();
-                    submitProcessInstance(processInstance);
-                } catch (InterruptedException e) {
-                    Thread.currentThread().interrupt();
-                    logger.warn("SubmitFailedProcessInstanceHandleThread has 
been interrupted, will return");
-                    break;
-                }
-
-                // avoid the failed-fast cause CPU higher
-                ThreadUtils.sleep(Constants.SLEEP_TIME_MILLIS);
-            }
-        }
-    }
-
 }
diff --git 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowEventLooper.java
 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowEventLooper.java
new file mode 100644
index 0000000000..ee2e70bfd0
--- /dev/null
+++ 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowEventLooper.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.server.master.runner;
+
+import org.apache.dolphinscheduler.common.Constants;
+import org.apache.dolphinscheduler.common.thread.BaseDaemonThread;
+import org.apache.dolphinscheduler.common.thread.Stopper;
+import org.apache.dolphinscheduler.common.thread.ThreadUtils;
+import org.apache.dolphinscheduler.common.utils.LoggerUtils;
+import org.apache.dolphinscheduler.server.master.event.WorkflowEvent;
+import 
org.apache.dolphinscheduler.server.master.event.WorkflowEventHandleError;
+import 
org.apache.dolphinscheduler.server.master.event.WorkflowEventHandleException;
+import org.apache.dolphinscheduler.server.master.event.WorkflowEventHandler;
+import org.apache.dolphinscheduler.server.master.event.WorkflowEventQueue;
+import org.apache.dolphinscheduler.server.master.event.WorkflowEventType;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import javax.annotation.PostConstruct;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Component;
+
+@Component
+public class WorkflowEventLooper extends BaseDaemonThread {
+
+    private final Logger logger = 
LoggerFactory.getLogger(WorkflowEventLooper.class);
+
+    @Autowired
+    private WorkflowEventQueue workflowEventQueue;
+
+    @Autowired
+    private List<WorkflowEventHandler> workflowEventHandlerList;
+
+    private final Map<WorkflowEventType, WorkflowEventHandler> 
workflowEventHandlerMap = new HashMap<>();
+
+    protected WorkflowEventLooper() {
+        super("WorkflowEventLooper");
+    }
+
+    @PostConstruct
+    public void init() {
+        workflowEventHandlerList.forEach(workflowEventHandler -> 
workflowEventHandlerMap.put(workflowEventHandler.getHandleWorkflowEventType(),
+                                                                               
              workflowEventHandler));
+    }
+
+    @Override
+    public synchronized void start() {
+        logger.info("WorkflowEventLooper thread starting");
+        super.start();
+        logger.info("WorkflowEventLooper thread started");
+    }
+
+    public void run() {
+        WorkflowEvent workflowEvent = null;
+        while (Stopper.isRunning()) {
+            try {
+                workflowEvent = workflowEventQueue.poolEvent();
+                
LoggerUtils.setWorkflowInstanceIdMDC(workflowEvent.getWorkflowInstanceId());
+                logger.info("Workflow event looper receive a workflow event: 
{}, will handle this", workflowEvent);
+                WorkflowEventHandler workflowEventHandler =
+                    
workflowEventHandlerMap.get(workflowEvent.getWorkflowEventType());
+                workflowEventHandler.handleWorkflowEvent(workflowEvent);
+            } catch (InterruptedException e) {
+                logger.warn("WorkflowEventLooper thread is interrupted, will 
close this loop", e);
+                Thread.currentThread().interrupt();
+                break;
+            } catch (WorkflowEventHandleException 
workflowEventHandleException) {
+                logger.error("Handle workflow event failed, will add this 
event to event queue again, event: {}",
+                    workflowEvent, workflowEventHandleException);
+                workflowEventQueue.addEvent(workflowEvent);
+                ThreadUtils.sleep(Constants.SLEEP_TIME_MILLIS);
+            } catch (WorkflowEventHandleError workflowEventHandleError) {
+                logger.error("Handle workflow event error, will drop this 
event, event: {}",
+                             workflowEvent,
+                             workflowEventHandleError);
+            } catch (Exception unknownException) {
+                logger.error(
+                    "Handle workflow event failed, get a unknown exception, 
will add this event to event queue again, event: {}",
+                    workflowEvent, unknownException);
+                workflowEventQueue.addEvent(workflowEvent);
+                ThreadUtils.sleep(Constants.SLEEP_TIME_MILLIS);
+            } finally {
+                LoggerUtils.removeWorkflowInstanceIdMDC();
+            }
+        }
+    }
+
+}
diff --git 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java
 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java
index 1834e413f8..94f5f51b80 100644
--- 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java
+++ 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java
@@ -119,39 +119,18 @@ import lombok.NonNull;
  */
 public class WorkflowExecuteRunnable implements Callable<WorkflowSubmitStatue> 
{
 
-    /**
-     * logger of WorkflowExecuteThread
-     */
     private static final Logger logger = 
LoggerFactory.getLogger(WorkflowExecuteRunnable.class);
 
-    /**
-     * process service
-     */
     private final ProcessService processService;
 
-    /**
-     * alert manager
-     */
     private final ProcessAlertManager processAlertManager;
 
-    /**
-     * netty executor manager
-     */
     private final NettyExecutorManager nettyExecutorManager;
 
-    /**
-     * process instance
-     */
     private final ProcessInstance processInstance;
 
-    /**
-     * process definition
-     */
     private ProcessDefinition processDefinition;
 
-    /**
-     * the object of DAG
-     */
     private DAG<String, TaskNode, TaskNodeRelation> dag;
 
     /**
@@ -159,10 +138,8 @@ public class WorkflowExecuteRunnable implements 
Callable<WorkflowSubmitStatue> {
      */
     private String key;
 
-    /**
-     * start flag, true: start nodes submit completely
-     */
-    private volatile boolean isStart = false;
+
+    private WorkflowRunnableStatus workflowRunnableStatus = 
WorkflowRunnableStatus.CREATED;
 
     /**
      * submit failure nodes
@@ -224,7 +201,7 @@ public class WorkflowExecuteRunnable implements 
Callable<WorkflowSubmitStatue> {
     private final ConcurrentLinkedQueue<StateEvent> stateEvents = new 
ConcurrentLinkedQueue<>();
 
     /**
-     * ready to submit task queue
+     * The StandBy task list, will be executed, need to know, the taskInstance 
in this queue may doesn't have id.
      */
     private final PeerTaskInstancePriorityQueue readyToSubmitTaskQueue = new 
PeerTaskInstancePriorityQueue();
 
@@ -234,14 +211,8 @@ public class WorkflowExecuteRunnable implements 
Callable<WorkflowSubmitStatue> {
      */
     private final Map<Long, TaskInstance> waitToRetryTaskInstanceMap = new 
ConcurrentHashMap<>();
 
-    /**
-     * state wheel execute thread
-     */
     private final StateWheelExecuteThread stateWheelExecuteThread;
 
-    /**
-     * curing global params service
-     */
     private final CuringParamsService curingParamsService;
 
     private final String masterAddress;
@@ -276,14 +247,17 @@ public class WorkflowExecuteRunnable implements 
Callable<WorkflowSubmitStatue> {
      * the process start nodes are submitted completely.
      */
     public boolean isStart() {
-        return this.isStart;
+        return WorkflowRunnableStatus.STARTED == workflowRunnableStatus;
     }
 
     /**
      * handle event
      */
     public void handleEvents() {
-        if (!isStart) {
+        if (!isStart()) {
+            logger.info(
+                "The workflow instance is not started, will not handle its 
state event, current state event size: {}",
+                stateEvents);
             return;
         }
         StateEvent stateEvent = null;
@@ -387,45 +361,53 @@ public class WorkflowExecuteRunnable implements 
Callable<WorkflowSubmitStatue> {
     }
 
     public void taskFinished(TaskInstance taskInstance) throws 
StateEventHandleException {
-        logger.info("TaskInstance finished task code:{} state:{} ",
-            taskInstance.getTaskCode(),
-            taskInstance.getState());
-
-        activeTaskProcessorMaps.remove(taskInstance.getTaskCode());
-        stateWheelExecuteThread.removeTask4TimeoutCheck(processInstance, 
taskInstance);
-        stateWheelExecuteThread.removeTask4RetryCheck(processInstance, 
taskInstance);
-        stateWheelExecuteThread.removeTask4StateCheck(processInstance, 
taskInstance);
-
-        if (taskInstance.getState().typeIsSuccess()) {
-            completeTaskMap.put(taskInstance.getTaskCode(), 
taskInstance.getId());
-            // todo: merge the last taskInstance
-            processInstance.setVarPool(taskInstance.getVarPool());
-            processService.saveProcessInstance(processInstance);
-            if (!processInstance.isBlocked()) {
-                submitPostNode(Long.toString(taskInstance.getTaskCode()));
-            }
-        } else if (taskInstance.taskCanRetry() && processInstance.getState() 
!= ExecutionStatus.READY_STOP) {
-            // retry task
-            logger.info("Retry taskInstance taskInstance state: {}", 
taskInstance.getState());
-            retryTaskInstance(taskInstance);
-        } else if (taskInstance.getState().typeIsFailure()) {
-            completeTaskMap.put(taskInstance.getTaskCode(), 
taskInstance.getId());
-            // There are child nodes and the failure policy is: CONTINUE
-            if (processInstance.getFailureStrategy() == 
FailureStrategy.CONTINUE
-                && 
DagHelper.haveAllNodeAfterNode(Long.toString(taskInstance.getTaskCode()), dag)) 
{
-                submitPostNode(Long.toString(taskInstance.getTaskCode()));
-            } else {
-                errorTaskMap.put(taskInstance.getTaskCode(), 
taskInstance.getId());
-                if (processInstance.getFailureStrategy() == 
FailureStrategy.END) {
-                    killAllTasks();
+        logger.info("TaskInstance finished task code:{} state:{}", 
taskInstance.getTaskCode(), taskInstance.getState());
+        try {
+
+            activeTaskProcessorMaps.remove(taskInstance.getTaskCode());
+            stateWheelExecuteThread.removeTask4TimeoutCheck(processInstance, 
taskInstance);
+            stateWheelExecuteThread.removeTask4RetryCheck(processInstance, 
taskInstance);
+            stateWheelExecuteThread.removeTask4StateCheck(processInstance, 
taskInstance);
+
+            if (taskInstance.getState().typeIsSuccess()) {
+                completeTaskMap.put(taskInstance.getTaskCode(), 
taskInstance.getId());
+                // todo: merge the last taskInstance
+                processInstance.setVarPool(taskInstance.getVarPool());
+                processService.saveProcessInstance(processInstance);
+                if (!processInstance.isBlocked()) {
+                    submitPostNode(Long.toString(taskInstance.getTaskCode()));
+                }
+            } else if (taskInstance.taskCanRetry() && 
processInstance.getState() != ExecutionStatus.READY_STOP) {
+                // retry task
+                logger.info("Retry taskInstance taskInstance state: {}", 
taskInstance.getState());
+                retryTaskInstance(taskInstance);
+            } else if (taskInstance.getState().typeIsFailure()) {
+                completeTaskMap.put(taskInstance.getTaskCode(), 
taskInstance.getId());
+                // There are child nodes and the failure policy is: CONTINUE
+                if (processInstance.getFailureStrategy() == 
FailureStrategy.CONTINUE && DagHelper.haveAllNodeAfterNode(
+                    Long.toString(taskInstance.getTaskCode()),
+                    dag)) {
+                    submitPostNode(Long.toString(taskInstance.getTaskCode()));
+                } else {
+                    errorTaskMap.put(taskInstance.getTaskCode(), 
taskInstance.getId());
+                    if (processInstance.getFailureStrategy() == 
FailureStrategy.END) {
+                        killAllTasks();
+                    }
                 }
+            } else if (taskInstance.getState().typeIsFinished()) {
+                // todo: when the task instance type is pause, then it should 
not in completeTaskMap
+                completeTaskMap.put(taskInstance.getTaskCode(), 
taskInstance.getId());
             }
-        } else if (taskInstance.getState().typeIsFinished()) {
-            // todo: when the task instance type is pause, then it should not 
in completeTaskMap
-            completeTaskMap.put(taskInstance.getTaskCode(), 
taskInstance.getId());
+            logger.info("TaskInstance finished will try to update the workflow 
instance state, task code:{} state:{}",
+                        taskInstance.getTaskCode(),
+                        taskInstance.getState());
+            this.updateProcessInstanceState();
+        } catch (Exception ex) {
+            logger.error("Task finish failed, get a exception, will remove 
this taskInstance from completeTaskMap", ex);
+            // remove the task from complete map, so that we can finish in the 
next time.
+            completeTaskMap.remove(taskInstance.getTaskCode());
+            throw ex;
         }
-
-        this.updateProcessInstanceState();
     }
 
     /**
@@ -672,17 +654,29 @@ public class WorkflowExecuteRunnable implements 
Callable<WorkflowSubmitStatue> {
      */
     @Override
     public WorkflowSubmitStatue call() {
-        if (this.taskInstanceMap.size() > 0 || isStart) {
-            logger.warn("The workflow has already been started");
+        if (isStart()) {
+            // This case should not been happened
+            logger.warn("[WorkflowInstance-{}] The workflow has already been 
started", processInstance.getId());
             return WorkflowSubmitStatue.DUPLICATED_SUBMITTED;
         }
 
         try {
             LoggerUtils.setWorkflowInstanceIdMDC(processInstance.getId());
-            buildFlowDag();
-            initTaskQueue();
-            submitPostNode(null);
-            isStart = true;
+            if (workflowRunnableStatus == WorkflowRunnableStatus.CREATED) {
+                buildFlowDag();
+                workflowRunnableStatus = WorkflowRunnableStatus.INITIALIZE_DAG;
+                logger.info("workflowStatue changed to :{}", 
workflowRunnableStatus);
+            }
+            if (workflowRunnableStatus == 
WorkflowRunnableStatus.INITIALIZE_DAG) {
+                initTaskQueue();
+                workflowRunnableStatus = 
WorkflowRunnableStatus.INITIALIZE_QUEUE;
+                logger.info("workflowStatue changed to :{}", 
workflowRunnableStatus);
+            }
+            if (workflowRunnableStatus == 
WorkflowRunnableStatus.INITIALIZE_QUEUE) {
+                submitPostNode(null);
+                workflowRunnableStatus = WorkflowRunnableStatus.STARTED;
+                logger.info("workflowStatue changed to :{}", 
workflowRunnableStatus);
+            }
             return WorkflowSubmitStatue.SUCCESS;
         } catch (Exception e) {
             logger.error("Start workflow error", e);
@@ -749,9 +743,6 @@ public class WorkflowExecuteRunnable implements 
Callable<WorkflowSubmitStatue> {
      * @throws Exception exception
      */
     private void buildFlowDag() throws Exception {
-        if (this.dag != null) {
-            return;
-        }
         processDefinition = 
processService.findProcessDefinition(processInstance.getProcessDefinitionCode(),
 processInstance.getProcessDefinitionVersion());
         processInstance.setProcessDefinition(processDefinition);
 
@@ -886,9 +877,9 @@ public class WorkflowExecuteRunnable implements 
Callable<WorkflowSubmitStatue> {
             }
         }
         logger.info("Initialize task queue, dependFailedTaskMap: {}, 
completeTaskMap: {}, errorTaskMap: {}",
-            dependFailedTaskMap,
-            completeTaskMap,
-            errorTaskMap);
+                    dependFailedTaskMap,
+                    completeTaskMap,
+                    errorTaskMap);
     }
 
     /**
@@ -911,7 +902,11 @@ public class WorkflowExecuteRunnable implements 
Callable<WorkflowSubmitStatue> {
 
             boolean submit = taskProcessor.action(TaskAction.SUBMIT);
             if (!submit) {
-                logger.error("process id:{} name:{} submit standby task id:{} 
name:{} failed!", processInstance.getId(), processInstance.getName(), 
taskInstance.getId(), taskInstance.getName());
+                logger.error("process id:{} name:{} submit standby task id:{} 
name:{} failed!",
+                             processInstance.getId(),
+                             processInstance.getName(),
+                             taskInstance.getId(),
+                             taskInstance.getName());
                 return Optional.empty();
             }
 
@@ -962,7 +957,10 @@ public class WorkflowExecuteRunnable implements 
Callable<WorkflowSubmitStatue> {
             }
             return Optional.of(taskInstance);
         } catch (Exception e) {
-            logger.error("submit standby task error", e);
+            logger.error("submit standby task error, taskCode: {}, 
taskInstanceId: {}",
+                         taskInstance.getTaskCode(),
+                         taskInstance.getId(),
+                         e);
             return Optional.empty();
         }
     }
@@ -1215,9 +1213,19 @@ public class WorkflowExecuteRunnable implements 
Callable<WorkflowSubmitStatue> {
      */
     private Map<String, TaskInstance> getCompleteTaskInstanceMap() {
         Map<String, TaskInstance> completeTaskInstanceMap = new HashMap<>();
-        for (Integer taskInstanceId : completeTaskMap.values()) {
+        for (Map.Entry<Long, Integer> entry : completeTaskMap.entrySet()) {
+            Long taskConde = entry.getKey();
+            Integer taskInstanceId = entry.getValue();
             TaskInstance taskInstance = taskInstanceMap.get(taskInstanceId);
+            if (taskInstance == null) {
+                logger.warn("Cannot find the taskInstance from 
taskInstanceMap, taskInstanceId: {}, taskConde: {}",
+                            taskInstanceId,
+                            taskConde);
+                // This case will happen when we submit to db failed, then the 
taskInstanceId is 0
+                continue;
+            }
             
completeTaskInstanceMap.put(Long.toString(taskInstance.getTaskCode()), 
taskInstance);
+
         }
         return completeTaskInstanceMap;
     }
@@ -1429,6 +1437,7 @@ public class WorkflowExecuteRunnable implements 
Callable<WorkflowSubmitStatue> {
      */
     private boolean processFailed() {
         if (hasFailedTask()) {
+            logger.info("The current process has failed task, the current 
process failed");
             if (processInstance.getFailureStrategy() == FailureStrategy.END) {
                 return true;
             }
@@ -1504,22 +1513,29 @@ public class WorkflowExecuteRunnable implements 
Callable<WorkflowSubmitStatue> {
 
         if (activeTaskProcessorMaps.size() > 0 || hasRetryTaskInStandBy()) {
             // active task and retry task exists
-            return runningState(state);
+            ExecutionStatus executionStatus = runningState(state);
+            logger.info("The workflowInstance has task running, the 
workflowInstance status is {}", executionStatus);
+            return executionStatus;
         }
 
         // block
         if (state == ExecutionStatus.READY_BLOCK) {
-            return processReadyBlock();
+            ExecutionStatus executionStatus = processReadyBlock();
+            logger.info("The workflowInstance is ready to block, the 
workflowInstance status is {}", executionStatus);
         }
 
         // waiting thread
         if (hasWaitingThreadTask()) {
+            logger.info("The workflowInstance has waiting thread task, the 
workflow status is {}",
+                        ExecutionStatus.WAITING_THREAD);
             return ExecutionStatus.WAITING_THREAD;
         }
 
         // pause
         if (state == ExecutionStatus.READY_PAUSE) {
-            return processReadyPause();
+            ExecutionStatus executionStatus = processReadyPause();
+            logger.info("The workflowInstance is ready to pause, the workflow 
status is {}", executionStatus);
+            return executionStatus;
         }
 
         // stop
@@ -1527,15 +1543,18 @@ public class WorkflowExecuteRunnable implements 
Callable<WorkflowSubmitStatue> {
             List<TaskInstance> stopList = 
getCompleteTaskByState(ExecutionStatus.STOP);
             List<TaskInstance> killList = 
getCompleteTaskByState(ExecutionStatus.KILL);
             List<TaskInstance> failList = 
getCompleteTaskByState(ExecutionStatus.FAILURE);
+            ExecutionStatus executionStatus;
             if (CollectionUtils.isNotEmpty(stopList) || 
CollectionUtils.isNotEmpty(killList) || CollectionUtils.isNotEmpty(failList) || 
!isComplementEnd()) {
-                return ExecutionStatus.STOP;
+                executionStatus = ExecutionStatus.STOP;
             } else {
-                return ExecutionStatus.SUCCESS;
+                executionStatus = ExecutionStatus.SUCCESS;
             }
+            logger.info("The workflowInstance is ready to stop, the workflow 
status is {}", executionStatus);
         }
 
         // process failure
         if (processFailed()) {
+            logger.info("The workflowInstance is failed, the workflow status 
is {}", ExecutionStatus.FAILURE);
             return ExecutionStatus.FAILURE;
         }
 
@@ -1579,15 +1598,21 @@ public class WorkflowExecuteRunnable implements 
Callable<WorkflowSubmitStatue> {
     private void updateProcessInstanceState() throws StateEventHandleException 
{
         ExecutionStatus state = getProcessInstanceState(processInstance);
         if (processInstance.getState() != state) {
+            logger.info("Update workflowInstance states, origin state: {}, 
target state: {}",
+                        processInstance.getState(),
+                        state);
             updateWorkflowInstanceStatesToDB(state);
 
             StateEvent stateEvent = new StateEvent();
             stateEvent.setExecutionStatus(processInstance.getState());
             stateEvent.setProcessInstanceId(this.processInstance.getId());
             stateEvent.setType(StateEventType.PROCESS_STATE_CHANGE);
-            // this.processStateChangeHandler(stateEvent);
             // replace with `stateEvents`, make sure `WorkflowExecuteThread` 
can be deleted to avoid memory leaks
             this.stateEvents.add(stateEvent);
+        } else {
+            logger.info("There is no need to update the workflow instance 
state, origin state: {}, target state: {}",
+                        processInstance.getState(),
+                        state);
         }
     }
 
@@ -1602,12 +1627,9 @@ public class WorkflowExecuteRunnable implements 
Callable<WorkflowSubmitStatue> {
     private void updateWorkflowInstanceStatesToDB(ExecutionStatus newStates) 
throws StateEventHandleException {
         ExecutionStatus originStates = processInstance.getState();
         if (originStates != newStates) {
-            logger.info("work flow process instance [id: {}, name:{}], state 
change from {} to {}, cmd type: {}",
-                processInstance.getId(),
-                processInstance.getName(),
-                originStates,
-                newStates,
-                processInstance.getCommandType());
+            logger.info("Begin to update workflow instance state , state will 
change from {} to {}",
+                        originStates,
+                        newStates);
 
             processInstance.setState(newStates);
             if (newStates.typeIsFinished()) {
@@ -1657,8 +1679,8 @@ public class WorkflowExecuteRunnable implements 
Callable<WorkflowSubmitStatue> {
      *
      * @param taskInstance task instance
      */
-    private void removeTaskFromStandbyList(TaskInstance taskInstance) {
-        readyToSubmitTaskQueue.remove(taskInstance);
+    private boolean removeTaskFromStandbyList(TaskInstance taskInstance) {
+        return readyToSubmitTaskQueue.remove(taskInstance);
     }
 
     /**
@@ -1748,10 +1770,20 @@ public class WorkflowExecuteRunnable implements 
Callable<WorkflowSubmitStatue> {
                 if (!taskInstanceOptional.isPresent()) {
                     this.taskFailedSubmit = true;
                     // Remove and add to complete map and error map
-                    removeTaskFromStandbyList(task);
+                    if (!removeTaskFromStandbyList(task)) {
+                        logger.error(
+                            "Task submit failed, remove from standby list 
failed, workflowInstanceId: {}, taskCode: {}",
+                            processInstance.getId(),
+                            task.getTaskCode());
+                    }
                     completeTaskMap.put(task.getTaskCode(), task.getId());
+                    taskInstanceMap.put(task.getId(), task);
                     errorTaskMap.put(task.getTaskCode(), task.getId());
-                    logger.error("Task submitted failed, processInstanceId: 
{}, taskInstanceId: {}", task.getProcessInstanceId(), task.getId());
+                    activeTaskProcessorMaps.remove(task.getTaskCode());
+                    logger.error("Task submitted failed, workflowInstanceId: 
{}, taskInstanceId: {}, taskCode: {}",
+                                 task.getProcessInstanceId(),
+                                 task.getId(),
+                                 task.getTaskCode());
                 } else {
                     removeTaskFromStandbyList(task);
                 }
@@ -1927,4 +1959,10 @@ public class WorkflowExecuteRunnable implements 
Callable<WorkflowSubmitStatue> {
         }
     }
 
+    private enum WorkflowRunnableStatus {
+        CREATED, INITIALIZE_DAG, INITIALIZE_QUEUE, STARTED,
+        ;
+
+    }
+
 }
\ No newline at end of file
diff --git 
a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/TaskRecallCommand.java
 
b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/TaskRecallCommand.java
index 3d33d8c363..0cd81dc3e2 100644
--- 
a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/TaskRecallCommand.java
+++ 
b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/TaskRecallCommand.java
@@ -17,7 +17,6 @@
 
 package org.apache.dolphinscheduler.remote.command;
 
-import org.apache.dolphinscheduler.common.enums.Event;
 import org.apache.dolphinscheduler.common.utils.JSONUtils;
 
 import java.io.Serializable;
diff --git 
a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java
 
b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java
index 5024a944f1..075991c955 100644
--- 
a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java
+++ 
b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java
@@ -1284,8 +1284,8 @@ public class ProcessServiceImpl implements ProcessService 
{
                     break;
                 }
                 logger.error(
-                    "task commit to db failed , taskId {} has already retry {} 
times, please check the database",
-                    taskInstance.getId(),
+                    "task commit to db failed , taskCode: {} has already retry 
{} times, please check the database",
+                    taskInstance.getTaskCode(),
                     retryTimes);
                 Thread.sleep(commitInterval);
             } catch (Exception e) {
@@ -1298,6 +1298,7 @@ public class ProcessServiceImpl implements ProcessService 
{
     }
 
     /**
+     * // todo: This method need to refactor, we find when the db down, but 
the taskInstanceId is not 0. It's better to change to void, rather than return 
TaskInstance
      * submit task to db
      * submit sub process to command
      *
@@ -1316,9 +1317,9 @@ public class ProcessServiceImpl implements ProcessService 
{
         TaskInstance task = submitTaskInstanceToDB(taskInstance, 
processInstance);
         if (task == null) {
             logger.error("Save taskInstance to db error, task name:{}, process 
id:{} state: {} ",
-                taskInstance.getName(),
-                taskInstance.getProcessInstance(),
-                processInstance.getState());
+                         taskInstance.getName(),
+                         taskInstance.getProcessInstance().getId(),
+                         processInstance.getState());
             return null;
         }
 
@@ -1328,8 +1329,8 @@ public class ProcessServiceImpl implements ProcessService 
{
 
         logger.info(
             "End save taskInstance to db successfully:{}, taskInstanceName: 
{}, taskInstance state:{}, processInstanceId:{}, processInstanceState: {}",
-            taskInstance.getId(),
-            taskInstance.getName(),
+            task.getId(),
+            task.getName(),
             task.getState(),
             processInstance.getId(),
             processInstance.getState());
@@ -1564,7 +1565,10 @@ public class ProcessServiceImpl implements 
ProcessService {
     public TaskInstance submitTaskInstanceToDB(TaskInstance taskInstance, 
ProcessInstance processInstance) {
         ExecutionStatus processInstanceState = processInstance.getState();
         if (processInstanceState.typeIsFinished() || processInstanceState == 
ExecutionStatus.READY_STOP) {
-            logger.warn("processInstance {} was {}, skip submit task", 
processInstance.getProcessDefinitionCode(), processInstanceState);
+            logger.warn("processInstance: {} state was: {}, skip submit this 
task, taskCode: {}",
+                        processInstance.getId(),
+                        processInstanceState,
+                        taskInstance.getTaskCode());
             return null;
         }
         if (processInstanceState == ExecutionStatus.READY_PAUSE) {
diff --git 
a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/cache/ResponseCache.java
 
b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/cache/ResponseCache.java
index fb3c84da68..f8e47fc43a 100644
--- 
a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/cache/ResponseCache.java
+++ 
b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/cache/ResponseCache.java
@@ -17,7 +17,7 @@
 
 package org.apache.dolphinscheduler.server.worker.cache;
 
-import org.apache.dolphinscheduler.common.enums.Event;
+import org.apache.dolphinscheduler.common.enums.TaskEventType;
 import org.apache.dolphinscheduler.remote.command.Command;
 
 import java.util.Map;
@@ -48,7 +48,7 @@ public class ResponseCache {
      * @param command command
      * @param event event ACK/RESULT
      */
-    public void cache(Integer taskInstanceId, Command command, Event event) {
+    public void cache(Integer taskInstanceId, Command command, TaskEventType 
event) {
         switch (event) {
             case RUNNING:
                 runningCache.put(taskInstanceId, command);
diff --git 
a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskCallbackService.java
 
b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskCallbackService.java
index 3f70974344..21616a1f60 100644
--- 
a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskCallbackService.java
+++ 
b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskCallbackService.java
@@ -19,7 +19,7 @@ package org.apache.dolphinscheduler.server.worker.processor;
 
 import static org.apache.dolphinscheduler.common.Constants.SLEEP_TIME_MILLIS;
 
-import org.apache.dolphinscheduler.common.enums.Event;
+import org.apache.dolphinscheduler.common.enums.TaskEventType;
 import org.apache.dolphinscheduler.plugin.task.api.TaskConstants;
 import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
 import org.apache.dolphinscheduler.remote.NettyRemotingClient;
@@ -236,7 +236,7 @@ public class TaskCallbackService {
     public void sendTaskExecuteRunningCommand(TaskExecutionContext 
taskExecutionContext) {
         TaskExecuteRunningCommand command = 
buildTaskExecuteRunningCommand(taskExecutionContext);
         // add response cache
-        ResponseCache.get().cache(taskExecutionContext.getTaskInstanceId(), 
command.convert2Command(), Event.RUNNING);
+        ResponseCache.get().cache(taskExecutionContext.getTaskInstanceId(), 
command.convert2Command(), TaskEventType.RUNNING);
         send(taskExecutionContext.getTaskInstanceId(), 
command.convert2Command());
     }
 
@@ -256,7 +256,7 @@ public class TaskCallbackService {
     public void sendTaskExecuteResponseCommand(TaskExecutionContext 
taskExecutionContext) {
         TaskExecuteResponseCommand command = 
buildTaskExecuteResponseCommand(taskExecutionContext);
         // add response cache
-        ResponseCache.get().cache(taskExecutionContext.getTaskInstanceId(), 
command.convert2Command(), Event.RESULT);
+        ResponseCache.get().cache(taskExecutionContext.getTaskInstanceId(), 
command.convert2Command(), TaskEventType.RESULT);
         send(taskExecutionContext.getTaskInstanceId(), 
command.convert2Command());
     }
 
@@ -270,7 +270,7 @@ public class TaskCallbackService {
      */
     public void sendRecallCommand(TaskExecutionContext taskExecutionContext) {
         TaskRecallCommand taskRecallCommand = 
buildRecallCommand(taskExecutionContext);
-        ResponseCache.get().cache(taskExecutionContext.getTaskInstanceId(), 
taskRecallCommand.convert2Command(), Event.WORKER_REJECT);
+        ResponseCache.get().cache(taskExecutionContext.getTaskInstanceId(), 
taskRecallCommand.convert2Command(), TaskEventType.WORKER_REJECT);
         send(taskExecutionContext.getTaskInstanceId(), 
taskRecallCommand.convert2Command());
     }
 }
diff --git 
a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/RetryReportTaskStatusThread.java
 
b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/RetryReportTaskStatusThread.java
index f691f7c8de..73b8d84cf7 100644
--- 
a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/RetryReportTaskStatusThread.java
+++ 
b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/RetryReportTaskStatusThread.java
@@ -84,6 +84,7 @@ public class RetryReportTaskStatusThread extends 
BaseDaemonThread {
     private void retryRunningCommand(ResponseCache instance) {
         if (!instance.getRunningCache().isEmpty()) {
             Map<Integer, Command> runningCache = instance.getRunningCache();
+            logger.info("Send task running retry command starting, waiting to 
retry size: {}", runningCache.size());
             for (Map.Entry<Integer, Command> entry : runningCache.entrySet()) {
                 Integer taskInstanceId = entry.getKey();
                 Command runningCommand = entry.getValue();
@@ -93,12 +94,14 @@ public class RetryReportTaskStatusThread extends 
BaseDaemonThread {
                     logger.error("Retry send running command to master error, 
taskInstanceId: {}, command: {}", taskInstanceId, runningCommand);
                 }
             }
+            logger.info("Send task running retry command finished, waiting to 
retry size: {}", runningCache.size());
         }
     }
 
     private void retryResponseCommand(ResponseCache instance) {
-        if (!instance.getResponseCache().isEmpty()) {
-            Map<Integer, Command> responseCache = instance.getResponseCache();
+        Map<Integer, Command> responseCache = instance.getResponseCache();
+        if (!responseCache.isEmpty()) {
+            logger.info("Send task response retry command starting, waiting to 
retry size: {}", responseCache.size());
             for (Map.Entry<Integer, Command> entry : responseCache.entrySet()) 
{
                 Integer taskInstanceId = entry.getKey();
                 Command responseCommand = entry.getValue();
@@ -108,12 +111,14 @@ public class RetryReportTaskStatusThread extends 
BaseDaemonThread {
                     logger.error("Retry send response command to master error, 
taskInstanceId: {}, command: {}", taskInstanceId, responseCommand);
                 }
             }
+            logger.info("Send task response retry command finished, waiting to 
retry size: {}", responseCache.size());
         }
     }
 
     private void retryRecallCommand(ResponseCache instance) {
-        if (!instance.getRecallCache().isEmpty()) {
-            Map<Integer, Command> recallCache = instance.getRecallCache();
+        Map<Integer, Command> recallCache = instance.getRecallCache();
+        if (!recallCache.isEmpty()) {
+            logger.info("Send task recall retry command starting, waiting to 
retry size: {}", recallCache.size());
             for (Map.Entry<Integer, Command> entry : recallCache.entrySet()) {
                 Integer taskInstanceId = entry.getKey();
                 Command responseCommand = entry.getValue();
@@ -123,6 +128,7 @@ public class RetryReportTaskStatusThread extends 
BaseDaemonThread {
                     logger.error("Retry send recall command to master error, 
taskInstanceId: {}, command: {}", taskInstanceId, responseCommand);
                 }
             }
+            logger.info("Send task recall retry command finished, waiting to 
retry size: {}", recallCache.size());
         }
     }
 }

Reply via email to