This is an automated email from the ASF dual-hosted git repository.

caishunfeng pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git


The following commit(s) were added to refs/heads/dev by this push:
     new 2119e41800 [Improvement] Move delay calculation to Master (#15278)
2119e41800 is described below

commit 2119e41800de0b0df7341e166a48fd8283fab98e
Author: Wenjun Ruan <[email protected]>
AuthorDate: Wed Dec 6 09:37:14 2023 +0800

    [Improvement] Move delay calculation to Master (#15278)
---
 .../master/event/TaskTimeoutStateEventHandler.java |  2 +-
 ...ogicITaskInstanceDispatchOperationFunction.java | 43 ++++---------
 .../LogicITaskInstanceKillOperationFunction.java   | 22 +++----
 .../LogicITaskInstancePauseOperationFunction.java  | 14 ++---
 .../server/master/runner/BaseTaskDispatcher.java   |  1 -
 .../master/runner/BaseTaskExecuteRunnable.java     | 55 ++++++++++++++++
 .../{execute => }/DefaultTaskExecuteRunnable.java  |  4 +-
 ...a => GlobalMasterTaskExecuteRunnableQueue.java} | 32 ++++++----
 ...lobalMasterTaskExecuteRunnableQueueLooper.java} | 24 +++----
 .../runner/GlobalTaskDispatchWaitingQueue.java     |  6 +-
 .../GlobalTaskDispatchWaitingQueueLooper.java      |  1 -
 .../MasterDelayTaskExecuteRunnableDelayQueue.java  | 54 ----------------
 .../master/runner/MasterTaskExecutorBootstrap.java |  8 +--
 ....java => PriorityDelayTaskExecuteRunnable.java} | 73 ++++++++--------------
 .../master/runner/StreamTaskExecuteRunnable.java   |  2 -
 .../runner/{execute => }/TaskExecuteRunnable.java  |  4 +-
 .../{execute => }/TaskExecuteRunnableFactory.java  |  7 ++-
 .../{execute => }/TaskExecutionContextFactory.java |  2 +-
 .../master/runner/WorkflowExecuteRunnable.java     |  2 -
 .../runner/dispatcher/MasterTaskDispatcher.java    |  2 +-
 .../master/runner/dispatcher/TaskDispatcher.java   |  2 +-
 .../runner/dispatcher/WorkerTaskDispatcher.java    |  2 +-
 ...eRunnable.java => AsyncMasterTaskExecutor.java} | 10 +--
 ...ry.java => AsyncMasterTaskExecutorFactory.java} |  8 +--
 .../execute/AsyncTaskCallbackFunctionImpl.java     | 22 +++----
 .../execute/DefaultTaskExecuteRunnableFactory.java |  3 +
 .../execute/MasterDelayTaskExecuteRunnable.java    | 68 --------------------
 ...xecuteRunnable.java => MasterTaskExecutor.java} | 10 +--
 ...Factory.java => MasterTaskExecutorFactory.java} |  4 +-
 ....java => MasterTaskExecutorFactoryBuilder.java} | 15 +++--
 ...leHolder.java => MasterTaskExecutorHolder.java} | 14 ++---
 ...Pool.java => MasterTaskExecutorThreadPool.java} |  6 +-
 ...teRunnable.java => SyncMasterTaskExecutor.java} | 10 +--
 ...ory.java => SyncMasterTaskExecutorFactory.java} |  8 +--
 .../BaseTaskExecuteRunnableDispatchOperator.java   | 26 +++++++-
 .../BaseTaskExecuteRunnableKillOperator.java       |  2 +-
 .../BaseTaskExecuteRunnablePauseOperator.java      |  2 +-
 .../BaseTaskExecuteRunnableTimeoutOperator.java    |  2 +-
 .../LogicTaskExecuteRunnableDispatchOperator.java  |  6 +-
 .../TaskExecuteRunnableDispatchOperator.java       | 14 ++---
 .../operator/TaskExecuteRunnableOperator.java      |  2 +-
 .../TaskExecuteRunnableOperatorManager.java        |  2 +-
 .../operator/TaskExecuteRunnablePauseOperator.java |  2 +-
 .../master/runner/WorkflowExecuteRunnableTest.java |  1 -
 .../dispatcher/MasterTaskDispatcherTest.java       |  2 +-
 .../dispatcher/WorkerTaskDispatcherTest.java       |  2 +-
 ...a => PriorityDelayTaskExecuteRunnableTest.java} |  8 ++-
 .../shell/BaseLinuxShellInterceptorBuilder.java    |  7 ++-
 .../server/worker/WorkerServer.java                |  6 +-
 .../worker/registry/WorkerWaitingStrategy.java     |  6 +-
 .../rpc/StreamingTaskInstanceOperatorImpl.java     |  8 +--
 ...faultWorkerDelayTaskExecuteRunnableFactory.java | 59 -----------------
 ...unnable.java => DefaultWorkerTaskExecutor.java} | 14 ++---
 ....java => DefaultWorkerTaskExecutorFactory.java} | 44 +++++++------
 ...ue.java => GlobalTaskInstanceWaitingQueue.java} |  4 +-
 ...a => GlobalTaskInstanceWaitingQueueLooper.java} | 35 +++--------
 .../runner/WorkerDelayTaskExecuteRunnable.java     | 69 --------------------
 .../server/worker/runner/WorkerExecService.java    | 14 ++---
 .../server/worker/runner/WorkerManagerThread.java  | 23 +++----
 ...xecuteRunnable.java => WorkerTaskExecutor.java} | 18 +++---
 ...Factory.java => WorkerTaskExecutorFactory.java} |  4 +-
 ....java => WorkerTaskExecutorFactoryBuilder.java} | 16 ++---
 .../TaskInstanceDispatchOperationFunction.java     |  6 +-
 .../TaskInstanceKillOperationFunction.java         |  8 +--
 ...est.java => DefaultWorkerTaskExecutorTest.java} | 10 +--
 65 files changed, 370 insertions(+), 592 deletions(-)

diff --git 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/TaskTimeoutStateEventHandler.java
 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/TaskTimeoutStateEventHandler.java
index 16476ad7f7..c04eb9c338 100644
--- 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/TaskTimeoutStateEventHandler.java
+++ 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/TaskTimeoutStateEventHandler.java
@@ -22,8 +22,8 @@ import org.apache.dolphinscheduler.common.enums.TimeoutFlag;
 import org.apache.dolphinscheduler.dao.entity.TaskInstance;
 import org.apache.dolphinscheduler.plugin.task.api.enums.TaskTimeoutStrategy;
 import org.apache.dolphinscheduler.server.master.metrics.TaskMetrics;
+import 
org.apache.dolphinscheduler.server.master.runner.DefaultTaskExecuteRunnable;
 import 
org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteRunnable;
-import 
org.apache.dolphinscheduler.server.master.runner.execute.DefaultTaskExecuteRunnable;
 
 import java.util.Map;
 
diff --git 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/rpc/LogicITaskInstanceDispatchOperationFunction.java
 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/rpc/LogicITaskInstanceDispatchOperationFunction.java
index c60e860aa7..f69ff4fb77 100644
--- 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/rpc/LogicITaskInstanceDispatchOperationFunction.java
+++ 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/rpc/LogicITaskInstanceDispatchOperationFunction.java
@@ -17,18 +17,14 @@
 
 package org.apache.dolphinscheduler.server.master.rpc;
 
-import org.apache.dolphinscheduler.common.utils.DateUtils;
 import 
org.apache.dolphinscheduler.extract.master.transportor.LogicTaskDispatchRequest;
 import 
org.apache.dolphinscheduler.extract.master.transportor.LogicTaskDispatchResponse;
 import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
-import org.apache.dolphinscheduler.plugin.task.api.enums.TaskExecutionStatus;
 import org.apache.dolphinscheduler.plugin.task.api.utils.LogUtils;
-import 
org.apache.dolphinscheduler.server.master.runner.MasterDelayTaskExecuteRunnableDelayQueue;
-import 
org.apache.dolphinscheduler.server.master.runner.execute.MasterDelayTaskExecuteRunnable;
-import 
org.apache.dolphinscheduler.server.master.runner.execute.MasterTaskExecuteRunnableFactoryBuilder;
+import 
org.apache.dolphinscheduler.server.master.runner.GlobalMasterTaskExecuteRunnableQueue;
 import 
org.apache.dolphinscheduler.server.master.runner.execute.MasterTaskExecutionContextHolder;
-
-import java.util.concurrent.TimeUnit;
+import 
org.apache.dolphinscheduler.server.master.runner.execute.MasterTaskExecutor;
+import 
org.apache.dolphinscheduler.server.master.runner.execute.MasterTaskExecutorFactoryBuilder;
 
 import lombok.extern.slf4j.Slf4j;
 
@@ -42,10 +38,10 @@ public class LogicITaskInstanceDispatchOperationFunction
             ITaskInstanceOperationFunction<LogicTaskDispatchRequest, 
LogicTaskDispatchResponse> {
 
     @Autowired
-    private MasterTaskExecuteRunnableFactoryBuilder 
masterTaskExecuteRunnableFactoryBuilder;
+    private MasterTaskExecutorFactoryBuilder masterTaskExecutorFactoryBuilder;
 
     @Autowired
-    private MasterDelayTaskExecuteRunnableDelayQueue 
masterDelayTaskExecuteRunnableDelayQueue;
+    private GlobalMasterTaskExecuteRunnableQueue 
globalMasterTaskExecuteRunnableQueue;
 
     @Override
     public LogicTaskDispatchResponse operate(LogicTaskDispatchRequest 
taskDispatchRequest) {
@@ -63,34 +59,17 @@ public class LogicITaskInstanceDispatchOperationFunction
 
             
MasterTaskExecutionContextHolder.putTaskExecutionContext(taskExecutionContext);
 
-            int delayTime = taskExecutionContext.getDelayTime();
-            if (delayTime > 0) {
-                // todo: calculate the delay in master dispatcher then we 
don't need to use a queue to store the task
-                final long remainTime =
-                        
DateUtils.getRemainTime(DateUtils.timeStampToDate(taskExecutionContext.getFirstSubmitTime()),
-                                TimeUnit.SECONDS.toMillis(delayTime));
-                if (remainTime > 0) {
-                    log.info(
-                            "Current taskInstance: {} is choosing delay 
execution, delay time: {}/ms, remainTime: {}/ms",
-                            taskExecutionContext.getTaskName(),
-                            
TimeUnit.SECONDS.toMillis(taskExecutionContext.getDelayTime()), remainTime);
-                    
taskExecutionContext.setCurrentExecutionStatus(TaskExecutionStatus.DELAY_EXECUTION);
-                    // todo: send delay execution message
-                    return 
LogicTaskDispatchResponse.success(taskExecutionContext.getTaskInstanceId());
-                }
-            }
-            final MasterDelayTaskExecuteRunnable 
masterDelayTaskExecuteRunnable =
-                    masterTaskExecuteRunnableFactoryBuilder
-                            
.createWorkerDelayTaskExecuteRunnableFactory(taskExecutionContext.getTaskType())
-                            
.createWorkerTaskExecuteRunnable(taskExecutionContext);
-            if (masterDelayTaskExecuteRunnableDelayQueue
-                    
.submitMasterDelayTaskExecuteRunnable(masterDelayTaskExecuteRunnable)) {
+            MasterTaskExecutor masterTaskExecutor = 
masterTaskExecutorFactoryBuilder
+                    
.createMasterTaskExecutorFactory(taskExecutionContext.getTaskType())
+                    .createMasterTaskExecutor(taskExecutionContext);
+            if (globalMasterTaskExecuteRunnableQueue
+                    .submitMasterTaskExecuteRunnable(masterTaskExecutor)) {
                 log.info("Submit LogicTask: {} to 
MasterDelayTaskExecuteRunnableDelayQueue success", taskInstanceName);
                 return LogicTaskDispatchResponse.success(taskInstanceId);
             } else {
                 log.error(
                         "Submit LogicTask: {} to 
MasterDelayTaskExecuteRunnableDelayQueue failed, current task waiting queue 
size: {} is full",
-                        taskInstanceName, 
masterDelayTaskExecuteRunnableDelayQueue.size());
+                        taskInstanceName, 
globalMasterTaskExecuteRunnableQueue.size());
                 return LogicTaskDispatchResponse.failed(taskInstanceId,
                         "MasterDelayTaskExecuteRunnableDelayQueue is full");
             }
diff --git 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/rpc/LogicITaskInstanceKillOperationFunction.java
 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/rpc/LogicITaskInstanceKillOperationFunction.java
index 6b43a7690a..27fc52333d 100644
--- 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/rpc/LogicITaskInstanceKillOperationFunction.java
+++ 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/rpc/LogicITaskInstanceKillOperationFunction.java
@@ -21,10 +21,10 @@ import 
org.apache.dolphinscheduler.extract.master.transportor.LogicTaskKillReque
 import 
org.apache.dolphinscheduler.extract.master.transportor.LogicTaskKillResponse;
 import org.apache.dolphinscheduler.plugin.task.api.utils.LogUtils;
 import 
org.apache.dolphinscheduler.server.master.exception.MasterTaskExecuteException;
-import 
org.apache.dolphinscheduler.server.master.runner.MasterDelayTaskExecuteRunnableDelayQueue;
-import 
org.apache.dolphinscheduler.server.master.runner.execute.MasterTaskExecuteRunnable;
-import 
org.apache.dolphinscheduler.server.master.runner.execute.MasterTaskExecuteRunnableHolder;
+import 
org.apache.dolphinscheduler.server.master.runner.GlobalMasterTaskExecuteRunnableQueue;
 import 
org.apache.dolphinscheduler.server.master.runner.execute.MasterTaskExecutionContextHolder;
+import 
org.apache.dolphinscheduler.server.master.runner.execute.MasterTaskExecutor;
+import 
org.apache.dolphinscheduler.server.master.runner.execute.MasterTaskExecutorHolder;
 
 import lombok.extern.slf4j.Slf4j;
 
@@ -38,7 +38,7 @@ public class LogicITaskInstanceKillOperationFunction
             ITaskInstanceOperationFunction<LogicTaskKillRequest, 
LogicTaskKillResponse> {
 
     @Autowired
-    private MasterDelayTaskExecuteRunnableDelayQueue 
masterDelayTaskExecuteRunnableDelayQueue;
+    private GlobalMasterTaskExecuteRunnableQueue 
globalMasterTaskExecuteRunnableQueue;
 
     @Override
     public LogicTaskKillResponse operate(LogicTaskKillRequest taskKillRequest) 
{
@@ -46,16 +46,16 @@ public class LogicITaskInstanceKillOperationFunction
         try {
             LogUtils.setTaskInstanceIdMDC(taskKillRequest.getTaskInstanceId());
             log.info("Received killLogicTask request: {}", taskKillRequest);
-            final MasterTaskExecuteRunnable masterTaskExecuteRunnable =
-                    
MasterTaskExecuteRunnableHolder.getMasterTaskExecuteRunnable(taskInstanceId);
-            if (masterTaskExecuteRunnable == null) {
+            final MasterTaskExecutor masterTaskExecutor =
+                    
MasterTaskExecutorHolder.getMasterTaskExecutor(taskInstanceId);
+            if (masterTaskExecutor == null) {
                 log.error("Cannot find the MasterTaskExecuteRunnable, this 
task may already been killed");
                 return LogicTaskKillResponse.fail("Cannot find the 
MasterTaskExecuteRunnable");
             }
             try {
-                masterTaskExecuteRunnable.cancelTask();
-                masterDelayTaskExecuteRunnableDelayQueue
-                        
.removeMasterDelayTaskExecuteRunnable(masterTaskExecuteRunnable);
+                masterTaskExecutor.cancelTask();
+                globalMasterTaskExecuteRunnableQueue
+                        .removeMasterTaskExecuteRunnable(masterTaskExecutor);
                 return LogicTaskKillResponse.success();
             } catch (MasterTaskExecuteException e) {
                 log.error("Cancel MasterTaskExecuteRunnable failed ", e);
@@ -63,7 +63,7 @@ public class LogicITaskInstanceKillOperationFunction
             } finally {
                 // todo: If cancel failed, we cannot remove the context?
                 
MasterTaskExecutionContextHolder.removeTaskExecutionContext(taskInstanceId);
-                
MasterTaskExecuteRunnableHolder.removeMasterTaskExecuteRunnable(taskInstanceId);
+                
MasterTaskExecutorHolder.removeMasterTaskExecutor(taskInstanceId);
             }
         } finally {
             LogUtils.removeTaskInstanceIdMDC();
diff --git 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/rpc/LogicITaskInstancePauseOperationFunction.java
 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/rpc/LogicITaskInstancePauseOperationFunction.java
index 912193b91e..a95d2fc667 100644
--- 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/rpc/LogicITaskInstancePauseOperationFunction.java
+++ 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/rpc/LogicITaskInstancePauseOperationFunction.java
@@ -22,8 +22,8 @@ import 
org.apache.dolphinscheduler.extract.master.transportor.LogicTaskPauseResp
 import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
 import org.apache.dolphinscheduler.plugin.task.api.utils.LogUtils;
 import 
org.apache.dolphinscheduler.server.master.exception.MasterTaskExecuteException;
-import 
org.apache.dolphinscheduler.server.master.runner.execute.MasterTaskExecuteRunnable;
-import 
org.apache.dolphinscheduler.server.master.runner.execute.MasterTaskExecuteRunnableHolder;
+import 
org.apache.dolphinscheduler.server.master.runner.execute.MasterTaskExecutor;
+import 
org.apache.dolphinscheduler.server.master.runner.execute.MasterTaskExecutorHolder;
 
 import lombok.extern.slf4j.Slf4j;
 
@@ -39,16 +39,16 @@ public class LogicITaskInstancePauseOperationFunction
     public LogicTaskPauseResponse operate(LogicTaskPauseRequest 
taskPauseRequest) {
         try {
             
LogUtils.setTaskInstanceIdMDC(taskPauseRequest.getTaskInstanceId());
-            final MasterTaskExecuteRunnable masterTaskExecuteRunnable =
-                    
MasterTaskExecuteRunnableHolder.getMasterTaskExecuteRunnable(taskPauseRequest.getTaskInstanceId());
-            if (masterTaskExecuteRunnable == null) {
+            final MasterTaskExecutor masterTaskExecutor =
+                    
MasterTaskExecutorHolder.getMasterTaskExecutor(taskPauseRequest.getTaskInstanceId());
+            if (masterTaskExecutor == null) {
                 log.info("Cannot find the MasterTaskExecuteRunnable");
                 return LogicTaskPauseResponse.fail("Cannot find the 
MasterTaskExecuteRunnable");
             }
-            final TaskExecutionContext taskExecutionContext = 
masterTaskExecuteRunnable.getTaskExecutionContext();
+            final TaskExecutionContext taskExecutionContext = 
masterTaskExecutor.getTaskExecutionContext();
             
LogUtils.setWorkflowAndTaskInstanceIDMDC(taskExecutionContext.getProcessInstanceId(),
                     taskExecutionContext.getTaskInstanceId());
-            masterTaskExecuteRunnable.pauseTask();
+            masterTaskExecutor.pauseTask();
             return LogicTaskPauseResponse.success();
         } catch (MasterTaskExecuteException e) {
             log.error("Pause MasterTaskExecuteRunnable failed", e);
diff --git 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/BaseTaskDispatcher.java
 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/BaseTaskDispatcher.java
index 0ad515e93a..30ab8fadec 100644
--- 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/BaseTaskDispatcher.java
+++ 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/BaseTaskDispatcher.java
@@ -30,7 +30,6 @@ import 
org.apache.dolphinscheduler.server.master.exception.TaskDispatchException
 import org.apache.dolphinscheduler.server.master.processor.queue.TaskEvent;
 import 
org.apache.dolphinscheduler.server.master.processor.queue.TaskEventService;
 import 
org.apache.dolphinscheduler.server.master.runner.dispatcher.TaskDispatcher;
-import 
org.apache.dolphinscheduler.server.master.runner.execute.TaskExecuteRunnable;
 
 import java.util.Date;
 import java.util.Optional;
diff --git 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/BaseTaskExecuteRunnable.java
 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/BaseTaskExecuteRunnable.java
new file mode 100644
index 0000000000..fefdbf3493
--- /dev/null
+++ 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/BaseTaskExecuteRunnable.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.server.master.runner;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
+import org.apache.dolphinscheduler.dao.entity.TaskInstance;
+import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
+
+public abstract class BaseTaskExecuteRunnable implements TaskExecuteRunnable {
+
+    protected final ProcessInstance workflowInstance;
+    protected final TaskInstance taskInstance;
+    protected final TaskExecutionContext taskExecutionContext;
+
+    public BaseTaskExecuteRunnable(ProcessInstance workflowInstance,
+                                   TaskInstance taskInstance,
+                                   TaskExecutionContext taskExecutionContext) {
+        this.taskInstance = checkNotNull(taskInstance);
+        this.workflowInstance = checkNotNull(workflowInstance);
+        this.taskExecutionContext = checkNotNull(taskExecutionContext);
+    }
+
+    @Override
+    public ProcessInstance getWorkflowInstance() {
+        return workflowInstance;
+    }
+
+    @Override
+    public TaskInstance getTaskInstance() {
+        return taskInstance;
+    }
+
+    @Override
+    public TaskExecutionContext getTaskExecutionContext() {
+        return taskExecutionContext;
+    }
+
+}
diff --git 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/execute/DefaultTaskExecuteRunnable.java
 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/DefaultTaskExecuteRunnable.java
similarity index 94%
rename from 
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/execute/DefaultTaskExecuteRunnable.java
rename to 
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/DefaultTaskExecuteRunnable.java
index 6f736139b6..c1b13717bd 100644
--- 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/execute/DefaultTaskExecuteRunnable.java
+++ 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/DefaultTaskExecuteRunnable.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.dolphinscheduler.server.master.runner.execute;
+package org.apache.dolphinscheduler.server.master.runner;
 
 import static com.google.common.base.Preconditions.checkNotNull;
 
@@ -24,7 +24,7 @@ import org.apache.dolphinscheduler.dao.entity.TaskInstance;
 import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
 import 
org.apache.dolphinscheduler.server.master.runner.operator.TaskExecuteRunnableOperatorManager;
 
-public class DefaultTaskExecuteRunnable extends PriorityTaskExecuteRunnable {
+public class DefaultTaskExecuteRunnable extends 
PriorityDelayTaskExecuteRunnable {
 
     private final TaskExecuteRunnableOperatorManager 
taskExecuteRunnableOperatorManager;
 
diff --git 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/GlobalTaskDispatchWaitingQueue.java
 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/GlobalMasterTaskExecuteRunnableQueue.java
similarity index 53%
copy from 
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/GlobalTaskDispatchWaitingQueue.java
copy to 
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/GlobalMasterTaskExecuteRunnableQueue.java
index a8f8f88498..9005416b92 100644
--- 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/GlobalTaskDispatchWaitingQueue.java
+++ 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/GlobalMasterTaskExecuteRunnableQueue.java
@@ -17,30 +17,36 @@
 
 package org.apache.dolphinscheduler.server.master.runner;
 
-import 
org.apache.dolphinscheduler.server.master.runner.execute.DefaultTaskExecuteRunnable;
+import 
org.apache.dolphinscheduler.server.master.runner.execute.MasterTaskExecutor;
 
-import java.util.concurrent.PriorityBlockingQueue;
-
-import lombok.extern.slf4j.Slf4j;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
 
 import org.springframework.stereotype.Component;
 
-@Slf4j
+/**
+ *
+ */
 @Component
-public class GlobalTaskDispatchWaitingQueue {
+public class GlobalMasterTaskExecuteRunnableQueue {
 
-    private final PriorityBlockingQueue<DefaultTaskExecuteRunnable> queue = 
new PriorityBlockingQueue<>();
+    private final BlockingQueue<MasterTaskExecutor> 
masterTaskExecutorBlockingQueue =
+            new LinkedBlockingQueue<>();
+
+    public boolean submitMasterTaskExecuteRunnable(MasterTaskExecutor 
masterTaskExecutor) {
+        return masterTaskExecutorBlockingQueue.offer(masterTaskExecutor);
+    }
 
-    public void 
submitNeedToDispatchTaskExecuteRunnable(DefaultTaskExecuteRunnable 
priorityTaskExecuteRunnable) {
-        queue.put(priorityTaskExecuteRunnable);
+    public MasterTaskExecutor takeMasterTaskExecuteRunnable() throws 
InterruptedException {
+        return masterTaskExecutorBlockingQueue.take();
     }
 
-    public DefaultTaskExecuteRunnable takeNeedToDispatchTaskExecuteRunnable() 
throws InterruptedException {
-        return queue.take();
+    public boolean removeMasterTaskExecuteRunnable(MasterTaskExecutor 
masterTaskExecutor) {
+        return masterTaskExecutorBlockingQueue.remove(masterTaskExecutor);
     }
 
-    public int getWaitingDispatchTaskNumber() {
-        return queue.size();
+    public int size() {
+        return masterTaskExecutorBlockingQueue.size();
     }
 
 }
diff --git 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterDelayTaskExecuteRunnableDelayQueueLooper.java
 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/GlobalMasterTaskExecuteRunnableQueueLooper.java
similarity index 74%
rename from 
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterDelayTaskExecuteRunnableDelayQueueLooper.java
rename to 
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/GlobalMasterTaskExecuteRunnableQueueLooper.java
index 557f2ca447..e0b1f80704 100644
--- 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterDelayTaskExecuteRunnableDelayQueueLooper.java
+++ 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/GlobalMasterTaskExecuteRunnableQueueLooper.java
@@ -18,9 +18,9 @@
 package org.apache.dolphinscheduler.server.master.runner;
 
 import org.apache.dolphinscheduler.common.thread.BaseDaemonThread;
-import 
org.apache.dolphinscheduler.server.master.runner.execute.MasterDelayTaskExecuteRunnable;
-import 
org.apache.dolphinscheduler.server.master.runner.execute.MasterTaskExecuteRunnableHolder;
-import 
org.apache.dolphinscheduler.server.master.runner.execute.MasterTaskExecuteRunnableThreadPool;
+import 
org.apache.dolphinscheduler.server.master.runner.execute.MasterTaskExecutor;
+import 
org.apache.dolphinscheduler.server.master.runner.execute.MasterTaskExecutorHolder;
+import 
org.apache.dolphinscheduler.server.master.runner.execute.MasterTaskExecutorThreadPool;
 
 import java.util.concurrent.atomic.AtomicBoolean;
 
@@ -31,17 +31,17 @@ import org.springframework.stereotype.Component;
 
 @Slf4j
 @Component
-public class MasterDelayTaskExecuteRunnableDelayQueueLooper extends 
BaseDaemonThread implements AutoCloseable {
+public class GlobalMasterTaskExecuteRunnableQueueLooper extends 
BaseDaemonThread implements AutoCloseable {
 
     @Autowired
-    private MasterDelayTaskExecuteRunnableDelayQueue 
masterDelayTaskExecuteRunnableDelayQueue;
+    private GlobalMasterTaskExecuteRunnableQueue 
globalMasterTaskExecuteRunnableQueue;
 
     @Autowired
-    private MasterTaskExecuteRunnableThreadPool 
masterTaskExecuteRunnableThreadPool;
+    private MasterTaskExecutorThreadPool masterTaskExecutorThreadPool;
 
     private final AtomicBoolean RUNNING_FLAG = new AtomicBoolean(false);
 
-    public MasterDelayTaskExecuteRunnableDelayQueueLooper() {
+    public GlobalMasterTaskExecuteRunnableQueueLooper() {
         super("MasterDelayTaskExecuteRunnableDelayQueueLooper");
     }
 
@@ -53,7 +53,7 @@ public class MasterDelayTaskExecuteRunnableDelayQueueLooper 
extends BaseDaemonTh
         }
         log.info("MasterDelayTaskExecuteRunnableDelayQueueLooper starting...");
         super.start();
-        masterTaskExecuteRunnableThreadPool.start();
+        masterTaskExecutorThreadPool.start();
         log.info("MasterDelayTaskExecuteRunnableDelayQueueLooper started...");
     }
 
@@ -61,10 +61,10 @@ public class MasterDelayTaskExecuteRunnableDelayQueueLooper 
extends BaseDaemonTh
     public void run() {
         while (RUNNING_FLAG.get()) {
             try {
-                final MasterDelayTaskExecuteRunnable 
masterDelayTaskExecuteRunnable =
-                        
masterDelayTaskExecuteRunnableDelayQueue.takeMasterDelayTaskExecuteRunnable();
-                
masterTaskExecuteRunnableThreadPool.submitMasterTaskExecuteRunnable(masterDelayTaskExecuteRunnable);
-                
MasterTaskExecuteRunnableHolder.putMasterTaskExecuteRunnable(masterDelayTaskExecuteRunnable);
+                final MasterTaskExecutor masterTaskExecutor =
+                        
globalMasterTaskExecuteRunnableQueue.takeMasterTaskExecuteRunnable();
+                
masterTaskExecutorThreadPool.submitMasterTaskExecutor(masterTaskExecutor);
+                
MasterTaskExecutorHolder.putMasterTaskExecuteRunnable(masterTaskExecutor);
             } catch (InterruptedException ex) {
                 Thread.currentThread().interrupt();
                 log.warn("MasterDelayTaskExecuteRunnableDelayQueueLooper has 
been interrupted, will stop loop");
diff --git 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/GlobalTaskDispatchWaitingQueue.java
 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/GlobalTaskDispatchWaitingQueue.java
index a8f8f88498..f4d50537c6 100644
--- 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/GlobalTaskDispatchWaitingQueue.java
+++ 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/GlobalTaskDispatchWaitingQueue.java
@@ -17,9 +17,7 @@
 
 package org.apache.dolphinscheduler.server.master.runner;
 
-import 
org.apache.dolphinscheduler.server.master.runner.execute.DefaultTaskExecuteRunnable;
-
-import java.util.concurrent.PriorityBlockingQueue;
+import java.util.concurrent.DelayQueue;
 
 import lombok.extern.slf4j.Slf4j;
 
@@ -29,7 +27,7 @@ import org.springframework.stereotype.Component;
 @Component
 public class GlobalTaskDispatchWaitingQueue {
 
-    private final PriorityBlockingQueue<DefaultTaskExecuteRunnable> queue = 
new PriorityBlockingQueue<>();
+    private final DelayQueue<DefaultTaskExecuteRunnable> queue = new 
DelayQueue<>();
 
     public void 
submitNeedToDispatchTaskExecuteRunnable(DefaultTaskExecuteRunnable 
priorityTaskExecuteRunnable) {
         queue.put(priorityTaskExecuteRunnable);
diff --git 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/GlobalTaskDispatchWaitingQueueLooper.java
 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/GlobalTaskDispatchWaitingQueueLooper.java
index c54aa5aff8..b496bea5a5 100644
--- 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/GlobalTaskDispatchWaitingQueueLooper.java
+++ 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/GlobalTaskDispatchWaitingQueueLooper.java
@@ -21,7 +21,6 @@ import 
org.apache.dolphinscheduler.common.thread.BaseDaemonThread;
 import org.apache.dolphinscheduler.common.thread.ThreadUtils;
 import 
org.apache.dolphinscheduler.server.master.runner.dispatcher.TaskDispatchFactory;
 import 
org.apache.dolphinscheduler.server.master.runner.dispatcher.TaskDispatcher;
-import 
org.apache.dolphinscheduler.server.master.runner.execute.DefaultTaskExecuteRunnable;
 
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
diff --git 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterDelayTaskExecuteRunnableDelayQueue.java
 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterDelayTaskExecuteRunnableDelayQueue.java
deleted file mode 100644
index bdd1510527..0000000000
--- 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterDelayTaskExecuteRunnableDelayQueue.java
+++ /dev/null
@@ -1,54 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.dolphinscheduler.server.master.runner;
-
-import 
org.apache.dolphinscheduler.server.master.runner.execute.MasterDelayTaskExecuteRunnable;
-import 
org.apache.dolphinscheduler.server.master.runner.execute.MasterTaskExecuteRunnable;
-
-import java.util.concurrent.DelayQueue;
-
-import org.springframework.stereotype.Component;
-
-/**
- *
- */
-@Component
-public class MasterDelayTaskExecuteRunnableDelayQueue {
-
-    private final DelayQueue<MasterDelayTaskExecuteRunnable> 
masterDelayTaskExecuteRunnableDelayQueue =
-            new DelayQueue<>();
-
-    public boolean 
submitMasterDelayTaskExecuteRunnable(MasterDelayTaskExecuteRunnable 
masterDelayTaskExecuteRunnable) {
-        return 
masterDelayTaskExecuteRunnableDelayQueue.offer(masterDelayTaskExecuteRunnable);
-    }
-
-    public MasterDelayTaskExecuteRunnable takeMasterDelayTaskExecuteRunnable() 
throws InterruptedException {
-        return masterDelayTaskExecuteRunnableDelayQueue.take();
-    }
-
-    // todo: if we move the delay process to master, than we don't need this 
method, since dispatchProcess can directly
-    // submit to thread pool
-    public boolean 
removeMasterDelayTaskExecuteRunnable(MasterTaskExecuteRunnable 
masterTaskExecuteRunnable) {
-        return 
masterDelayTaskExecuteRunnableDelayQueue.remove(masterTaskExecuteRunnable);
-    }
-
-    public int size() {
-        return masterDelayTaskExecuteRunnableDelayQueue.size();
-    }
-
-}
diff --git 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterTaskExecutorBootstrap.java
 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterTaskExecutorBootstrap.java
index 744e560c00..3e99d2141c 100644
--- 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterTaskExecutorBootstrap.java
+++ 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterTaskExecutorBootstrap.java
@@ -32,7 +32,7 @@ public class MasterTaskExecutorBootstrap implements 
AutoCloseable {
     private GlobalTaskDispatchWaitingQueueLooper 
globalTaskDispatchWaitingQueueLooper;
 
     @Autowired
-    private MasterDelayTaskExecuteRunnableDelayQueueLooper 
masterDelayTaskExecuteRunnableDelayQueueLooper;
+    private GlobalMasterTaskExecuteRunnableQueueLooper 
globalMasterTaskExecuteRunnableQueueLooper;
 
     @Autowired
     private AsyncMasterTaskDelayQueueLooper asyncMasterTaskDelayQueueLooper;
@@ -40,7 +40,7 @@ public class MasterTaskExecutorBootstrap implements 
AutoCloseable {
     public synchronized void start() {
         log.info("MasterTaskExecutorBootstrap starting...");
         globalTaskDispatchWaitingQueueLooper.start();
-        masterDelayTaskExecuteRunnableDelayQueueLooper.start();
+        globalMasterTaskExecuteRunnableQueueLooper.start();
         asyncMasterTaskDelayQueueLooper.start();
         log.info("MasterTaskExecutorBootstrap started...");
     }
@@ -51,8 +51,8 @@ public class MasterTaskExecutorBootstrap implements 
AutoCloseable {
         try (
                 final GlobalTaskDispatchWaitingQueueLooper 
globalTaskDispatchWaitingQueueLooper1 =
                         globalTaskDispatchWaitingQueueLooper;
-                final MasterDelayTaskExecuteRunnableDelayQueueLooper 
masterDelayTaskExecuteRunnableDelayQueueLooper1 =
-                        masterDelayTaskExecuteRunnableDelayQueueLooper;
+                final GlobalMasterTaskExecuteRunnableQueueLooper 
globalMasterTaskExecuteRunnableQueueLooper1 =
+                        globalMasterTaskExecuteRunnableQueueLooper;
                 final AsyncMasterTaskDelayQueueLooper 
asyncMasterTaskDelayQueueLooper1 =
                         asyncMasterTaskDelayQueueLooper) {
             // closed the resource
diff --git 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/execute/PriorityTaskExecuteRunnable.java
 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/PriorityDelayTaskExecuteRunnable.java
similarity index 57%
rename from 
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/execute/PriorityTaskExecuteRunnable.java
rename to 
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/PriorityDelayTaskExecuteRunnable.java
index 2e23feb7bd..255ec6c8ac 100644
--- 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/execute/PriorityTaskExecuteRunnable.java
+++ 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/PriorityDelayTaskExecuteRunnable.java
@@ -15,62 +15,57 @@
  * limitations under the License.
  */
 
-package org.apache.dolphinscheduler.server.master.runner.execute;
-
-import static com.google.common.base.Preconditions.checkNotNull;
+package org.apache.dolphinscheduler.server.master.runner;
 
+import org.apache.dolphinscheduler.common.utils.DateUtils;
 import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
 import org.apache.dolphinscheduler.dao.entity.TaskInstance;
 import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
 
-import org.jetbrains.annotations.NotNull;
-
-public abstract class PriorityTaskExecuteRunnable implements 
TaskExecuteRunnable, Comparable<TaskExecuteRunnable> {
-
-    private final ProcessInstance workflowInstance;
-    private final TaskInstance taskInstance;
-    private final TaskExecutionContext taskExecutionContext;
-
-    public PriorityTaskExecuteRunnable(ProcessInstance workflowInstance,
-                                       TaskInstance taskInstance,
-                                       TaskExecutionContext 
taskExecutionContext) {
-        this.taskInstance = checkNotNull(taskInstance);
-        this.workflowInstance = checkNotNull(workflowInstance);
-        this.taskExecutionContext = checkNotNull(taskExecutionContext);
-    }
+import java.util.concurrent.Delayed;
+import java.util.concurrent.TimeUnit;
 
-    @Override
-    public ProcessInstance getWorkflowInstance() {
-        return workflowInstance;
-    }
+public abstract class PriorityDelayTaskExecuteRunnable extends 
BaseTaskExecuteRunnable implements Delayed {
 
-    @Override
-    public TaskInstance getTaskInstance() {
-        return taskInstance;
+    public PriorityDelayTaskExecuteRunnable(ProcessInstance workflowInstance,
+                                            TaskInstance taskInstance,
+                                            TaskExecutionContext 
taskExecutionContext) {
+        super(workflowInstance, taskInstance, taskExecutionContext);
     }
 
     @Override
-    public TaskExecutionContext getTaskExecutionContext() {
-        return taskExecutionContext;
+    public long getDelay(TimeUnit unit) {
+        return unit.convert(
+                
DateUtils.getRemainTime(taskExecutionContext.getFirstSubmitTime(),
+                        taskExecutionContext.getDelayTime() * 60L),
+                TimeUnit.SECONDS);
     }
 
     @Override
-    public int compareTo(@NotNull TaskExecuteRunnable other) {
+    public int compareTo(Delayed o) {
+        if (o == null) {
+            return 1;
+        }
+        int delayTimeCompareResult =
+                Long.compare(this.getDelay(TimeUnit.MILLISECONDS), 
o.getDelay(TimeUnit.MILLISECONDS));
+        if (delayTimeCompareResult != 0) {
+            return delayTimeCompareResult;
+        }
+        PriorityDelayTaskExecuteRunnable other = 
(PriorityDelayTaskExecuteRunnable) o;
         // the smaller dispatch fail times, the higher priority
         int dispatchFailTimesCompareResult = 
taskExecutionContext.getDispatchFailTimes()
                 - other.getTaskExecutionContext().getDispatchFailTimes();
         if (dispatchFailTimesCompareResult != 0) {
             return dispatchFailTimesCompareResult;
         }
-
         int workflowInstancePriorityCompareResult = 
workflowInstance.getProcessInstancePriority().getCode()
                 - 
other.getWorkflowInstance().getProcessInstancePriority().getCode();
         if (workflowInstancePriorityCompareResult != 0) {
             return workflowInstancePriorityCompareResult;
         }
-        int workflowInstanceIdCompareResult = workflowInstance.getId() - 
other.getWorkflowInstance().getId();
+        long workflowInstanceIdCompareResult = 
workflowInstance.getId().compareTo(other.getWorkflowInstance().getId());
         if (workflowInstanceIdCompareResult != 0) {
-            return workflowInstanceIdCompareResult;
+            return workflowInstancePriorityCompareResult;
         }
         int taskInstancePriorityCompareResult = 
taskInstance.getTaskInstancePriority().getCode()
                 - other.getTaskInstance().getTaskInstancePriority().getCode();
@@ -84,21 +79,7 @@ public abstract class PriorityTaskExecuteRunnable implements 
TaskExecuteRunnable
             return -taskGroupPriorityCompareResult;
         }
         // The task instance shouldn't be equals
-        return taskInstance.getId() - other.getTaskInstance().getId();
-    }
-
-    @Override
-    public boolean equals(Object obj) {
-        if (obj instanceof PriorityTaskExecuteRunnable) {
-            PriorityTaskExecuteRunnable other = (PriorityTaskExecuteRunnable) 
obj;
-            return compareTo(other) == 0;
-        }
-        return false;
-    }
-
-    @Override
-    public int hashCode() {
-        return taskInstance.getId();
+        return taskInstance.getId().compareTo(other.getTaskInstance().getId());
     }
 
 }
diff --git 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/StreamTaskExecuteRunnable.java
 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/StreamTaskExecuteRunnable.java
index 24e524d171..ab6fc555a6 100644
--- 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/StreamTaskExecuteRunnable.java
+++ 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/StreamTaskExecuteRunnable.java
@@ -57,9 +57,7 @@ import 
org.apache.dolphinscheduler.server.master.event.StateEventHandleException
 import org.apache.dolphinscheduler.server.master.metrics.TaskMetrics;
 import org.apache.dolphinscheduler.server.master.processor.queue.TaskEvent;
 import 
org.apache.dolphinscheduler.server.master.runner.dispatcher.WorkerTaskDispatcher;
-import 
org.apache.dolphinscheduler.server.master.runner.execute.DefaultTaskExecuteRunnable;
 import 
org.apache.dolphinscheduler.server.master.runner.execute.DefaultTaskExecuteRunnableFactory;
-import 
org.apache.dolphinscheduler.server.master.runner.execute.TaskExecutionContextFactory;
 import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
 import org.apache.dolphinscheduler.service.process.ProcessService;
 import org.apache.dolphinscheduler.spi.enums.ResourceType;
diff --git 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/execute/TaskExecuteRunnable.java
 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/TaskExecuteRunnable.java
similarity index 90%
rename from 
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/execute/TaskExecuteRunnable.java
rename to 
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/TaskExecuteRunnable.java
index 02980aff49..8f66189617 100644
--- 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/execute/TaskExecuteRunnable.java
+++ 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/TaskExecuteRunnable.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.dolphinscheduler.server.master.runner.execute;
+package org.apache.dolphinscheduler.server.master.runner;
 
 import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
 import org.apache.dolphinscheduler.dao.entity.TaskInstance;
@@ -25,7 +25,7 @@ import 
org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
  * This interface is used to define a task which is executing.
  * todo: split to MasterTaskExecuteRunnable and WorkerTaskExecuteRunnable
  */
-public interface TaskExecuteRunnable extends Comparable<TaskExecuteRunnable> {
+public interface TaskExecuteRunnable {
 
     void dispatch();
 
diff --git 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/execute/TaskExecuteRunnableFactory.java
 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/TaskExecuteRunnableFactory.java
similarity index 89%
rename from 
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/execute/TaskExecuteRunnableFactory.java
rename to 
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/TaskExecuteRunnableFactory.java
index e3afb2d6b3..43ee971160 100644
--- 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/execute/TaskExecuteRunnableFactory.java
+++ 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/TaskExecuteRunnableFactory.java
@@ -15,11 +15,16 @@
  * limitations under the License.
  */
 
-package org.apache.dolphinscheduler.server.master.runner.execute;
+package org.apache.dolphinscheduler.server.master.runner;
 
 import org.apache.dolphinscheduler.dao.entity.TaskInstance;
 import 
org.apache.dolphinscheduler.server.master.exception.TaskExecuteRunnableCreateException;
 
+/**
+ * Use to create TaskExecuteRunnable
+ *
+ * @param <T> TaskExecuteRunnable
+ */
 public interface TaskExecuteRunnableFactory<T extends TaskExecuteRunnable> {
 
     T createTaskExecuteRunnable(TaskInstance taskInstance) throws 
TaskExecuteRunnableCreateException;
diff --git 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/execute/TaskExecutionContextFactory.java
 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/TaskExecutionContextFactory.java
similarity index 99%
rename from 
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/execute/TaskExecutionContextFactory.java
rename to 
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/TaskExecutionContextFactory.java
index d8f72564bd..3fb2a9c850 100644
--- 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/execute/TaskExecutionContextFactory.java
+++ 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/TaskExecutionContextFactory.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.dolphinscheduler.server.master.runner.execute;
+package org.apache.dolphinscheduler.server.master.runner;
 
 import static org.apache.dolphinscheduler.common.constants.Constants.ADDRESS;
 import static org.apache.dolphinscheduler.common.constants.Constants.DATABASE;
diff --git 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java
 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java
index 00c1515648..35ad1e8c82 100644
--- 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java
+++ 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java
@@ -80,9 +80,7 @@ import 
org.apache.dolphinscheduler.server.master.event.TaskStateEvent;
 import org.apache.dolphinscheduler.server.master.event.WorkflowStateEvent;
 import org.apache.dolphinscheduler.server.master.graph.IWorkflowGraph;
 import org.apache.dolphinscheduler.server.master.metrics.TaskMetrics;
-import 
org.apache.dolphinscheduler.server.master.runner.execute.DefaultTaskExecuteRunnable;
 import 
org.apache.dolphinscheduler.server.master.runner.execute.DefaultTaskExecuteRunnableFactory;
-import 
org.apache.dolphinscheduler.server.master.runner.execute.TaskExecuteRunnable;
 import org.apache.dolphinscheduler.server.master.utils.TaskUtils;
 import org.apache.dolphinscheduler.server.master.utils.WorkflowInstanceUtils;
 import org.apache.dolphinscheduler.service.alert.ListenerEventAlertManager;
diff --git 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/dispatcher/MasterTaskDispatcher.java
 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/dispatcher/MasterTaskDispatcher.java
index d30a1e554d..cca7112975 100644
--- 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/dispatcher/MasterTaskDispatcher.java
+++ 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/dispatcher/MasterTaskDispatcher.java
@@ -27,7 +27,7 @@ import 
org.apache.dolphinscheduler.server.master.config.MasterConfig;
 import 
org.apache.dolphinscheduler.server.master.exception.TaskDispatchException;
 import 
org.apache.dolphinscheduler.server.master.processor.queue.TaskEventService;
 import org.apache.dolphinscheduler.server.master.runner.BaseTaskDispatcher;
-import 
org.apache.dolphinscheduler.server.master.runner.execute.TaskExecuteRunnable;
+import org.apache.dolphinscheduler.server.master.runner.TaskExecuteRunnable;
 
 import java.util.Optional;
 
diff --git 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/dispatcher/TaskDispatcher.java
 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/dispatcher/TaskDispatcher.java
index 32a195fb5a..f595d5a490 100644
--- 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/dispatcher/TaskDispatcher.java
+++ 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/dispatcher/TaskDispatcher.java
@@ -19,7 +19,7 @@ package 
org.apache.dolphinscheduler.server.master.runner.dispatcher;
 
 import 
org.apache.dolphinscheduler.server.master.dispatch.exceptions.WorkerGroupNotFoundException;
 import 
org.apache.dolphinscheduler.server.master.exception.TaskDispatchException;
-import 
org.apache.dolphinscheduler.server.master.runner.execute.TaskExecuteRunnable;
+import org.apache.dolphinscheduler.server.master.runner.TaskExecuteRunnable;
 
 /**
  * Used to do task dispatcher.
diff --git 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/dispatcher/WorkerTaskDispatcher.java
 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/dispatcher/WorkerTaskDispatcher.java
index 36739a1163..3320082694 100644
--- 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/dispatcher/WorkerTaskDispatcher.java
+++ 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/dispatcher/WorkerTaskDispatcher.java
@@ -31,7 +31,7 @@ import 
org.apache.dolphinscheduler.server.master.dispatch.host.HostManager;
 import 
org.apache.dolphinscheduler.server.master.exception.TaskDispatchException;
 import 
org.apache.dolphinscheduler.server.master.processor.queue.TaskEventService;
 import org.apache.dolphinscheduler.server.master.runner.BaseTaskDispatcher;
-import 
org.apache.dolphinscheduler.server.master.runner.execute.TaskExecuteRunnable;
+import org.apache.dolphinscheduler.server.master.runner.TaskExecuteRunnable;
 
 import java.util.Optional;
 
diff --git 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/execute/AsyncMasterDelayTaskExecuteRunnable.java
 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/execute/AsyncMasterTaskExecutor.java
similarity index 82%
rename from 
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/execute/AsyncMasterDelayTaskExecuteRunnable.java
rename to 
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/execute/AsyncMasterTaskExecutor.java
index 8a6e6d8e87..d289a318da 100644
--- 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/execute/AsyncMasterDelayTaskExecuteRunnable.java
+++ 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/execute/AsyncMasterTaskExecutor.java
@@ -26,14 +26,14 @@ import 
org.apache.dolphinscheduler.server.master.runner.task.LogicTaskPluginFact
 import lombok.extern.slf4j.Slf4j;
 
 @Slf4j
-public class AsyncMasterDelayTaskExecuteRunnable extends 
MasterDelayTaskExecuteRunnable {
+public class AsyncMasterTaskExecutor extends MasterTaskExecutor {
 
     private final AsyncMasterTaskDelayQueue asyncMasterTaskDelayQueue;
 
-    public AsyncMasterDelayTaskExecuteRunnable(TaskExecutionContext 
taskExecutionContext,
-                                               LogicTaskPluginFactoryBuilder 
logicTaskPluginFactoryBuilder,
-                                               
LogicTaskInstanceExecutionEventSenderManager 
logicTaskInstanceExecutionEventSenderManager,
-                                               AsyncMasterTaskDelayQueue 
asyncTaskDelayQueue) {
+    public AsyncMasterTaskExecutor(TaskExecutionContext taskExecutionContext,
+                                   LogicTaskPluginFactoryBuilder 
logicTaskPluginFactoryBuilder,
+                                   
LogicTaskInstanceExecutionEventSenderManager 
logicTaskInstanceExecutionEventSenderManager,
+                                   AsyncMasterTaskDelayQueue 
asyncTaskDelayQueue) {
         super(taskExecutionContext, logicTaskPluginFactoryBuilder, 
logicTaskInstanceExecutionEventSenderManager);
         this.asyncMasterTaskDelayQueue = asyncTaskDelayQueue;
     }
diff --git 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/execute/AsyncMasterDelayTaskExecuteRunnableFactory.java
 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/execute/AsyncMasterTaskExecutorFactory.java
similarity index 83%
rename from 
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/execute/AsyncMasterDelayTaskExecuteRunnableFactory.java
rename to 
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/execute/AsyncMasterTaskExecutorFactory.java
index a71f394b7d..6c81cd7862 100644
--- 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/execute/AsyncMasterDelayTaskExecuteRunnableFactory.java
+++ 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/execute/AsyncMasterTaskExecutorFactory.java
@@ -25,9 +25,9 @@ import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Component;
 
 @Component
-public class AsyncMasterDelayTaskExecuteRunnableFactory
+public class AsyncMasterTaskExecutorFactory
         implements
-            
MasterDelayTaskExecuteRunnableFactory<AsyncMasterDelayTaskExecuteRunnable> {
+            MasterTaskExecutorFactory<AsyncMasterTaskExecutor> {
 
     @Autowired
     private LogicTaskPluginFactoryBuilder logicTaskPluginFactoryBuilder;
@@ -39,8 +39,8 @@ public class AsyncMasterDelayTaskExecuteRunnableFactory
     private AsyncMasterTaskDelayQueue asyncTaskDelayQueue;
 
     @Override
-    public AsyncMasterDelayTaskExecuteRunnable 
createWorkerTaskExecuteRunnable(TaskExecutionContext taskExecutionContext) {
-        return new AsyncMasterDelayTaskExecuteRunnable(taskExecutionContext,
+    public AsyncMasterTaskExecutor 
createMasterTaskExecutor(TaskExecutionContext taskExecutionContext) {
+        return new AsyncMasterTaskExecutor(taskExecutionContext,
                 logicTaskPluginFactoryBuilder,
                 logicTaskInstanceExecutionEventSenderManager,
                 asyncTaskDelayQueue);
diff --git 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/execute/AsyncTaskCallbackFunctionImpl.java
 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/execute/AsyncTaskCallbackFunctionImpl.java
index e9d4fe4430..6c83a3ccb9 100644
--- 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/execute/AsyncTaskCallbackFunctionImpl.java
+++ 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/execute/AsyncTaskCallbackFunctionImpl.java
@@ -26,41 +26,41 @@ import lombok.extern.slf4j.Slf4j;
 @Slf4j
 public class AsyncTaskCallbackFunctionImpl implements 
AsyncTaskCallbackFunction {
 
-    private final AsyncMasterDelayTaskExecuteRunnable 
asyncMasterDelayTaskExecuteRunnable;
+    private final AsyncMasterTaskExecutor asyncMasterTaskExecuteRunnable;
 
-    public AsyncTaskCallbackFunctionImpl(@NonNull 
AsyncMasterDelayTaskExecuteRunnable asyncMasterDelayTaskExecuteRunnable) {
-        this.asyncMasterDelayTaskExecuteRunnable = 
asyncMasterDelayTaskExecuteRunnable;
+    public AsyncTaskCallbackFunctionImpl(@NonNull AsyncMasterTaskExecutor 
asyncMasterTaskExecuteRunnable) {
+        this.asyncMasterTaskExecuteRunnable = asyncMasterTaskExecuteRunnable;
     }
 
     @Override
     public void executeSuccess() {
-        asyncMasterDelayTaskExecuteRunnable.getTaskExecutionContext()
+        asyncMasterTaskExecuteRunnable.getTaskExecutionContext()
                 .setCurrentExecutionStatus(TaskExecutionStatus.SUCCESS);
         executeFinished();
     }
 
     @Override
     public void executeFailed() {
-        asyncMasterDelayTaskExecuteRunnable.getTaskExecutionContext()
+        asyncMasterTaskExecuteRunnable.getTaskExecutionContext()
                 .setCurrentExecutionStatus(TaskExecutionStatus.FAILURE);
         executeFinished();
     }
 
     @Override
     public void executeThrowing(Throwable throwable) {
-        asyncMasterDelayTaskExecuteRunnable.afterThrowing(throwable);
+        asyncMasterTaskExecuteRunnable.afterThrowing(throwable);
     }
 
     private void executeFinished() {
         TaskInstanceLogHeader.printFinalizeTaskHeader();
-        int taskInstanceId = 
asyncMasterDelayTaskExecuteRunnable.getTaskExecutionContext().getTaskInstanceId();
+        int taskInstanceId = 
asyncMasterTaskExecuteRunnable.getTaskExecutionContext().getTaskInstanceId();
         
MasterTaskExecutionContextHolder.removeTaskExecutionContext(taskInstanceId);
-        
MasterTaskExecuteRunnableHolder.removeMasterTaskExecuteRunnable(taskInstanceId);
+        MasterTaskExecutorHolder.removeMasterTaskExecutor(taskInstanceId);
         log.info("Task execute finished, removed the TaskExecutionContext");
-        asyncMasterDelayTaskExecuteRunnable.sendTaskResult();
+        asyncMasterTaskExecuteRunnable.sendTaskResult();
         log.info(
                 "Execute task finished, will send the task execute result to 
master, the current task execute result is {}",
-                
asyncMasterDelayTaskExecuteRunnable.getTaskExecutionContext().getCurrentExecutionStatus().name());
-        asyncMasterDelayTaskExecuteRunnable.closeLogAppender();
+                
asyncMasterTaskExecuteRunnable.getTaskExecutionContext().getCurrentExecutionStatus().name());
+        asyncMasterTaskExecuteRunnable.closeLogAppender();
     }
 }
diff --git 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/execute/DefaultTaskExecuteRunnableFactory.java
 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/execute/DefaultTaskExecuteRunnableFactory.java
index 64443855fa..ab749b5861 100644
--- 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/execute/DefaultTaskExecuteRunnableFactory.java
+++ 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/execute/DefaultTaskExecuteRunnableFactory.java
@@ -21,6 +21,9 @@ import org.apache.dolphinscheduler.dao.entity.TaskInstance;
 import 
org.apache.dolphinscheduler.server.master.cache.ProcessInstanceExecCacheManager;
 import 
org.apache.dolphinscheduler.server.master.exception.TaskExecuteRunnableCreateException;
 import 
org.apache.dolphinscheduler.server.master.exception.TaskExecutionContextCreateException;
+import 
org.apache.dolphinscheduler.server.master.runner.DefaultTaskExecuteRunnable;
+import 
org.apache.dolphinscheduler.server.master.runner.TaskExecuteRunnableFactory;
+import 
org.apache.dolphinscheduler.server.master.runner.TaskExecutionContextFactory;
 import 
org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteRunnable;
 import 
org.apache.dolphinscheduler.server.master.runner.operator.TaskExecuteRunnableOperatorManager;
 
diff --git 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/execute/MasterDelayTaskExecuteRunnable.java
 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/execute/MasterDelayTaskExecuteRunnable.java
deleted file mode 100644
index 70f3e93521..0000000000
--- 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/execute/MasterDelayTaskExecuteRunnable.java
+++ /dev/null
@@ -1,68 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.dolphinscheduler.server.master.runner.execute;
-
-import org.apache.dolphinscheduler.common.utils.DateUtils;
-import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
-import 
org.apache.dolphinscheduler.server.master.runner.message.LogicTaskInstanceExecutionEventSenderManager;
-import 
org.apache.dolphinscheduler.server.master.runner.task.LogicTaskPluginFactoryBuilder;
-
-import java.util.concurrent.Delayed;
-import java.util.concurrent.TimeUnit;
-
-public abstract class MasterDelayTaskExecuteRunnable extends 
MasterTaskExecuteRunnable implements Delayed {
-
-    public MasterDelayTaskExecuteRunnable(TaskExecutionContext 
taskExecutionContext,
-                                          LogicTaskPluginFactoryBuilder 
logicTaskPluginFactoryBuilder,
-                                          
LogicTaskInstanceExecutionEventSenderManager 
logicTaskInstanceExecutionEventSenderManager) {
-        super(taskExecutionContext, logicTaskPluginFactoryBuilder, 
logicTaskInstanceExecutionEventSenderManager);
-    }
-
-    @Override
-    public boolean equals(Object obj) {
-        if (!(obj instanceof MasterDelayTaskExecuteRunnable)) {
-            return false;
-        }
-        MasterDelayTaskExecuteRunnable other = 
(MasterDelayTaskExecuteRunnable) obj;
-        return other.getTaskExecutionContext().getTaskInstanceId() == 
this.getTaskExecutionContext()
-                .getTaskInstanceId();
-    }
-
-    @Override
-    public int hashCode() {
-        return this.getTaskExecutionContext().getTaskInstanceId();
-    }
-
-    @Override
-    public long getDelay(TimeUnit unit) {
-        TaskExecutionContext taskExecutionContext = getTaskExecutionContext();
-        return unit.convert(
-                DateUtils.getRemainTime(
-                        taskExecutionContext.getFirstSubmitTime(), 
taskExecutionContext.getDelayTime() * 60L),
-                TimeUnit.SECONDS);
-    }
-
-    @Override
-    public int compareTo(Delayed o) {
-        if (o == null) {
-            return 1;
-        }
-        return Long.compare(this.getDelay(TimeUnit.MILLISECONDS), 
o.getDelay(TimeUnit.MILLISECONDS));
-    }
-
-}
diff --git 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/execute/MasterTaskExecuteRunnable.java
 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/execute/MasterTaskExecutor.java
similarity index 94%
rename from 
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/execute/MasterTaskExecuteRunnable.java
rename to 
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/execute/MasterTaskExecutor.java
index 9e08c1a4d2..8c1e2feaba 100644
--- 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/execute/MasterTaskExecuteRunnable.java
+++ 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/execute/MasterTaskExecutor.java
@@ -35,16 +35,16 @@ import 
org.apache.dolphinscheduler.server.master.runner.task.LogicTaskPluginFact
 import lombok.extern.slf4j.Slf4j;
 
 @Slf4j
-public abstract class MasterTaskExecuteRunnable implements Runnable {
+public abstract class MasterTaskExecutor implements Runnable {
 
     protected final TaskExecutionContext taskExecutionContext;
     protected final LogicTaskPluginFactoryBuilder 
logicTaskPluginFactoryBuilder;
     protected final LogicTaskInstanceExecutionEventSenderManager 
logicTaskInstanceExecutionEventSenderManager;
     protected ILogicTask logicTask;
 
-    public MasterTaskExecuteRunnable(TaskExecutionContext taskExecutionContext,
-                                     LogicTaskPluginFactoryBuilder 
logicTaskPluginFactoryBuilder,
-                                     
LogicTaskInstanceExecutionEventSenderManager 
logicTaskInstanceExecutionEventSenderManager) {
+    public MasterTaskExecutor(TaskExecutionContext taskExecutionContext,
+                              LogicTaskPluginFactoryBuilder 
logicTaskPluginFactoryBuilder,
+                              LogicTaskInstanceExecutionEventSenderManager 
logicTaskInstanceExecutionEventSenderManager) {
         this.taskExecutionContext = taskExecutionContext;
         this.logicTaskPluginFactoryBuilder = logicTaskPluginFactoryBuilder;
         this.logicTaskInstanceExecutionEventSenderManager = 
logicTaskInstanceExecutionEventSenderManager;
@@ -68,7 +68,7 @@ public abstract class MasterTaskExecuteRunnable implements 
Runnable {
                 "Get a exception when execute the task, sent the task execute 
result to master, the current task execute result is {}",
                 taskExecutionContext.getCurrentExecutionStatus());
         
MasterTaskExecutionContextHolder.removeTaskExecutionContext(taskExecutionContext.getTaskInstanceId());
-        
MasterTaskExecuteRunnableHolder.removeMasterTaskExecuteRunnable(taskExecutionContext.getTaskInstanceId());
+        
MasterTaskExecutorHolder.removeMasterTaskExecutor(taskExecutionContext.getTaskInstanceId());
         log.info("Get a exception when execute the task, removed the 
TaskExecutionContext");
         closeLogAppender();
     }
diff --git 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/execute/MasterDelayTaskExecuteRunnableFactory.java
 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/execute/MasterTaskExecutorFactory.java
similarity index 83%
rename from 
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/execute/MasterDelayTaskExecuteRunnableFactory.java
rename to 
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/execute/MasterTaskExecutorFactory.java
index 0fd79dfabe..d1f76aedbc 100644
--- 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/execute/MasterDelayTaskExecuteRunnableFactory.java
+++ 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/execute/MasterTaskExecutorFactory.java
@@ -19,8 +19,8 @@ package 
org.apache.dolphinscheduler.server.master.runner.execute;
 
 import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
 
-public interface MasterDelayTaskExecuteRunnableFactory<T extends 
MasterDelayTaskExecuteRunnable> {
+public interface MasterTaskExecutorFactory<T extends MasterTaskExecutor> {
 
-    T createWorkerTaskExecuteRunnable(TaskExecutionContext 
taskExecutionContext);
+    T createMasterTaskExecutor(TaskExecutionContext taskExecutionContext);
 
 }
diff --git 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/execute/MasterTaskExecuteRunnableFactoryBuilder.java
 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/execute/MasterTaskExecutorFactoryBuilder.java
similarity index 75%
rename from 
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/execute/MasterTaskExecuteRunnableFactoryBuilder.java
rename to 
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/execute/MasterTaskExecutorFactoryBuilder.java
index c5689f6a1b..84c342c074 100644
--- 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/execute/MasterTaskExecuteRunnableFactoryBuilder.java
+++ 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/execute/MasterTaskExecutorFactoryBuilder.java
@@ -28,24 +28,27 @@ import org.springframework.stereotype.Component;
 
 import com.google.common.collect.Sets;
 
+/**
+ * Use to create MasterTaskExecutorFactory
+ */
 @Component
-public class MasterTaskExecuteRunnableFactoryBuilder {
+public class MasterTaskExecutorFactoryBuilder {
 
     @Autowired
-    private AsyncMasterDelayTaskExecuteRunnableFactory 
asyncMasterDelayTaskExecuteRunnableFactory;
+    private AsyncMasterTaskExecutorFactory asyncMasterTaskExecutorFactory;
 
     @Autowired
-    private SyncMasterDelayTaskExecuteRunnableFactory 
syncMasterDelayTaskExecuteRunnableFactory;
+    private SyncMasterTaskExecutorFactory syncMasterTaskExecutorFactory;
 
     private static final Set<String> ASYNC_TASK_TYPE = Sets.newHashSet(
             DependentLogicTask.TASK_TYPE,
             SubWorkflowLogicTask.TASK_TYPE,
             DynamicLogicTask.TASK_TYPE);
 
-    public MasterDelayTaskExecuteRunnableFactory<? extends 
MasterDelayTaskExecuteRunnable> 
createWorkerDelayTaskExecuteRunnableFactory(String taskType) {
+    public MasterTaskExecutorFactory<? extends MasterTaskExecutor> 
createMasterTaskExecutorFactory(String taskType) {
         if (ASYNC_TASK_TYPE.contains(taskType)) {
-            return asyncMasterDelayTaskExecuteRunnableFactory;
+            return asyncMasterTaskExecutorFactory;
         }
-        return syncMasterDelayTaskExecuteRunnableFactory;
+        return syncMasterTaskExecutorFactory;
     }
 }
diff --git 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/execute/MasterTaskExecuteRunnableHolder.java
 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/execute/MasterTaskExecutorHolder.java
similarity index 66%
rename from 
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/execute/MasterTaskExecuteRunnableHolder.java
rename to 
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/execute/MasterTaskExecutorHolder.java
index 6b29897611..983542cae2 100644
--- 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/execute/MasterTaskExecuteRunnableHolder.java
+++ 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/execute/MasterTaskExecutorHolder.java
@@ -25,20 +25,20 @@ import lombok.extern.slf4j.Slf4j;
 
 @Slf4j
 @UtilityClass
-public class MasterTaskExecuteRunnableHolder {
+public class MasterTaskExecutorHolder {
 
-    private static final Map<Integer, MasterTaskExecuteRunnable> 
SUBMITTED_MASTER_TASK_MAP = new ConcurrentHashMap<>();
+    private static final Map<Integer, MasterTaskExecutor> 
SUBMITTED_MASTER_TASK_MAP = new ConcurrentHashMap<>();
 
-    public void putMasterTaskExecuteRunnable(MasterTaskExecuteRunnable 
masterTaskExecuteRunnable) {
-        
SUBMITTED_MASTER_TASK_MAP.put(masterTaskExecuteRunnable.getTaskExecutionContext().getTaskInstanceId(),
-                masterTaskExecuteRunnable);
+    public void putMasterTaskExecuteRunnable(MasterTaskExecutor 
masterTaskExecutor) {
+        
SUBMITTED_MASTER_TASK_MAP.put(masterTaskExecutor.getTaskExecutionContext().getTaskInstanceId(),
+                masterTaskExecutor);
     }
 
-    public MasterTaskExecuteRunnable getMasterTaskExecuteRunnable(Integer 
taskInstanceId) {
+    public MasterTaskExecutor getMasterTaskExecutor(Integer taskInstanceId) {
         return SUBMITTED_MASTER_TASK_MAP.get(taskInstanceId);
     }
 
-    public void removeMasterTaskExecuteRunnable(Integer taskInstanceId) {
+    public void removeMasterTaskExecutor(Integer taskInstanceId) {
         SUBMITTED_MASTER_TASK_MAP.remove(taskInstanceId);
     }
 
diff --git 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/execute/MasterTaskExecuteRunnableThreadPool.java
 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/execute/MasterTaskExecutorThreadPool.java
similarity index 89%
rename from 
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/execute/MasterTaskExecuteRunnableThreadPool.java
rename to 
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/execute/MasterTaskExecutorThreadPool.java
index 4f542556e4..d61d058d22 100644
--- 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/execute/MasterTaskExecuteRunnableThreadPool.java
+++ 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/execute/MasterTaskExecutorThreadPool.java
@@ -30,7 +30,7 @@ import com.google.common.util.concurrent.MoreExecutors;
 
 @Slf4j
 @Component
-public class MasterTaskExecuteRunnableThreadPool {
+public class MasterTaskExecutorThreadPool {
 
     @Autowired
     private MasterConfig masterConfig;
@@ -44,8 +44,8 @@ public class MasterTaskExecuteRunnableThreadPool {
         log.info("MasterTaskExecuteRunnableThreadPool started...");
     }
 
-    public void submitMasterTaskExecuteRunnable(MasterTaskExecuteRunnable 
masterTaskExecuteRunnable) {
-        listeningExecutorService.submit(masterTaskExecuteRunnable);
+    public void submitMasterTaskExecutor(MasterTaskExecutor 
masterTaskExecutor) {
+        listeningExecutorService.submit(masterTaskExecutor);
     }
 
 }
diff --git 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/execute/SyncMasterDelayTaskExecuteRunnable.java
 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/execute/SyncMasterTaskExecutor.java
similarity index 82%
rename from 
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/execute/SyncMasterDelayTaskExecuteRunnable.java
rename to 
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/execute/SyncMasterTaskExecutor.java
index 62365f78cb..7f303487ef 100644
--- 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/execute/SyncMasterDelayTaskExecuteRunnable.java
+++ 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/execute/SyncMasterTaskExecutor.java
@@ -27,11 +27,11 @@ import 
org.apache.dolphinscheduler.server.master.runner.task.LogicTaskPluginFact
 import lombok.extern.slf4j.Slf4j;
 
 @Slf4j
-public class SyncMasterDelayTaskExecuteRunnable extends 
MasterDelayTaskExecuteRunnable {
+public class SyncMasterTaskExecutor extends MasterTaskExecutor {
 
-    public SyncMasterDelayTaskExecuteRunnable(TaskExecutionContext 
taskExecutionContext,
-                                              LogicTaskPluginFactoryBuilder 
logicTaskPluginFactoryBuilder,
-                                              
LogicTaskInstanceExecutionEventSenderManager 
logicTaskInstanceExecutionEventSenderManager) {
+    public SyncMasterTaskExecutor(TaskExecutionContext taskExecutionContext,
+                                  LogicTaskPluginFactoryBuilder 
logicTaskPluginFactoryBuilder,
+                                  LogicTaskInstanceExecutionEventSenderManager 
logicTaskInstanceExecutionEventSenderManager) {
         super(taskExecutionContext, logicTaskPluginFactoryBuilder, 
logicTaskInstanceExecutionEventSenderManager);
     }
 
@@ -56,7 +56,7 @@ public class SyncMasterDelayTaskExecuteRunnable extends 
MasterDelayTaskExecuteRu
                 taskExecutionContext.getCurrentExecutionStatus().name());
         closeLogAppender();
         
MasterTaskExecutionContextHolder.removeTaskExecutionContext(taskExecutionContext.getTaskInstanceId());
-        
MasterTaskExecuteRunnableHolder.removeMasterTaskExecuteRunnable(taskExecutionContext.getTaskInstanceId());
+        
MasterTaskExecutorHolder.removeMasterTaskExecutor(taskExecutionContext.getTaskInstanceId());
     }
 
 }
diff --git 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/execute/SyncMasterDelayTaskExecuteRunnableFactory.java
 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/execute/SyncMasterTaskExecutorFactory.java
similarity index 81%
rename from 
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/execute/SyncMasterDelayTaskExecuteRunnableFactory.java
rename to 
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/execute/SyncMasterTaskExecutorFactory.java
index 2bf829c021..b591711e3d 100644
--- 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/execute/SyncMasterDelayTaskExecuteRunnableFactory.java
+++ 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/execute/SyncMasterTaskExecutorFactory.java
@@ -28,9 +28,9 @@ import org.springframework.stereotype.Component;
 
 @Slf4j
 @Component
-public class SyncMasterDelayTaskExecuteRunnableFactory
+public class SyncMasterTaskExecutorFactory
         implements
-            
MasterDelayTaskExecuteRunnableFactory<SyncMasterDelayTaskExecuteRunnable> {
+            MasterTaskExecutorFactory<SyncMasterTaskExecutor> {
 
     @Autowired
     private LogicTaskPluginFactoryBuilder logicTaskPluginFactoryBuilder;
@@ -38,8 +38,8 @@ public class SyncMasterDelayTaskExecuteRunnableFactory
     private LogicTaskInstanceExecutionEventSenderManager 
logicTaskInstanceExecutionEventSenderManager;
 
     @Override
-    public SyncMasterDelayTaskExecuteRunnable 
createWorkerTaskExecuteRunnable(TaskExecutionContext taskExecutionContext) {
-        return new SyncMasterDelayTaskExecuteRunnable(taskExecutionContext, 
logicTaskPluginFactoryBuilder,
+    public SyncMasterTaskExecutor 
createMasterTaskExecutor(TaskExecutionContext taskExecutionContext) {
+        return new SyncMasterTaskExecutor(taskExecutionContext, 
logicTaskPluginFactoryBuilder,
                 logicTaskInstanceExecutionEventSenderManager);
     }
 }
diff --git 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/operator/BaseTaskExecuteRunnableDispatchOperator.java
 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/operator/BaseTaskExecuteRunnableDispatchOperator.java
index 6294f581ec..6f0419ae97 100644
--- 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/operator/BaseTaskExecuteRunnableDispatchOperator.java
+++ 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/operator/BaseTaskExecuteRunnableDispatchOperator.java
@@ -17,19 +17,41 @@
 
 package org.apache.dolphinscheduler.server.master.runner.operator;
 
+import org.apache.dolphinscheduler.dao.entity.TaskInstance;
+import org.apache.dolphinscheduler.dao.repository.TaskInstanceDao;
+import org.apache.dolphinscheduler.plugin.task.api.enums.TaskExecutionStatus;
+import 
org.apache.dolphinscheduler.server.master.runner.DefaultTaskExecuteRunnable;
 import 
org.apache.dolphinscheduler.server.master.runner.GlobalTaskDispatchWaitingQueue;
-import 
org.apache.dolphinscheduler.server.master.runner.execute.DefaultTaskExecuteRunnable;
 
+import java.util.concurrent.TimeUnit;
+
+import lombok.extern.slf4j.Slf4j;
+
+@Slf4j
 public abstract class BaseTaskExecuteRunnableDispatchOperator implements 
TaskExecuteRunnableOperator {
 
     private final GlobalTaskDispatchWaitingQueue 
globalTaskDispatchWaitingQueue;
 
-    public 
BaseTaskExecuteRunnableDispatchOperator(GlobalTaskDispatchWaitingQueue 
globalTaskDispatchWaitingQueue) {
+    private final TaskInstanceDao taskInstanceDao;
+
+    public BaseTaskExecuteRunnableDispatchOperator(
+                                                   
GlobalTaskDispatchWaitingQueue globalTaskDispatchWaitingQueue,
+                                                   TaskInstanceDao 
taskInstanceDao) {
         this.globalTaskDispatchWaitingQueue = globalTaskDispatchWaitingQueue;
+        this.taskInstanceDao = taskInstanceDao;
     }
 
     @Override
     public void operate(DefaultTaskExecuteRunnable taskExecuteRunnable) {
+        long remainTime = taskExecuteRunnable.getDelay(TimeUnit.SECONDS);
+        TaskInstance taskInstance = taskExecuteRunnable.getTaskInstance();
+        if (remainTime > 0) {
+            taskInstance.setState(TaskExecutionStatus.DELAY_EXECUTION);
+            taskInstanceDao.updateById(taskInstance);
+            log.info("Current taskInstance: {} is choose delay execution, 
delay time: {}/s, remainTime: {}/s",
+                    taskInstance.getName(),
+                    taskInstance.getDelayTime(), remainTime);
+        }
         
globalTaskDispatchWaitingQueue.submitNeedToDispatchTaskExecuteRunnable(taskExecuteRunnable);
     }
 }
diff --git 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/operator/BaseTaskExecuteRunnableKillOperator.java
 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/operator/BaseTaskExecuteRunnableKillOperator.java
index a1d0a893fc..1b7a92db98 100644
--- 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/operator/BaseTaskExecuteRunnableKillOperator.java
+++ 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/operator/BaseTaskExecuteRunnableKillOperator.java
@@ -20,7 +20,7 @@ package 
org.apache.dolphinscheduler.server.master.runner.operator;
 import org.apache.dolphinscheduler.dao.entity.TaskInstance;
 import org.apache.dolphinscheduler.dao.repository.TaskInstanceDao;
 import org.apache.dolphinscheduler.plugin.task.api.enums.TaskExecutionStatus;
-import 
org.apache.dolphinscheduler.server.master.runner.execute.DefaultTaskExecuteRunnable;
+import 
org.apache.dolphinscheduler.server.master.runner.DefaultTaskExecuteRunnable;
 
 import java.util.Date;
 
diff --git 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/operator/BaseTaskExecuteRunnablePauseOperator.java
 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/operator/BaseTaskExecuteRunnablePauseOperator.java
index 383752d3f3..8163817afc 100644
--- 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/operator/BaseTaskExecuteRunnablePauseOperator.java
+++ 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/operator/BaseTaskExecuteRunnablePauseOperator.java
@@ -18,7 +18,7 @@
 package org.apache.dolphinscheduler.server.master.runner.operator;
 
 import org.apache.dolphinscheduler.dao.entity.TaskInstance;
-import 
org.apache.dolphinscheduler.server.master.runner.execute.DefaultTaskExecuteRunnable;
+import 
org.apache.dolphinscheduler.server.master.runner.DefaultTaskExecuteRunnable;
 
 import lombok.extern.slf4j.Slf4j;
 
diff --git 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/operator/BaseTaskExecuteRunnableTimeoutOperator.java
 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/operator/BaseTaskExecuteRunnableTimeoutOperator.java
index 4068bfa082..ef9dc80901 100644
--- 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/operator/BaseTaskExecuteRunnableTimeoutOperator.java
+++ 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/operator/BaseTaskExecuteRunnableTimeoutOperator.java
@@ -21,7 +21,7 @@ import org.apache.dolphinscheduler.dao.entity.TaskInstance;
 import org.apache.dolphinscheduler.dao.repository.TaskInstanceDao;
 import org.apache.dolphinscheduler.plugin.task.api.enums.TaskExecutionStatus;
 import org.apache.dolphinscheduler.plugin.task.api.enums.TaskTimeoutStrategy;
-import 
org.apache.dolphinscheduler.server.master.runner.execute.DefaultTaskExecuteRunnable;
+import 
org.apache.dolphinscheduler.server.master.runner.DefaultTaskExecuteRunnable;
 
 import java.util.Date;
 
diff --git 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/operator/LogicTaskExecuteRunnableDispatchOperator.java
 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/operator/LogicTaskExecuteRunnableDispatchOperator.java
index d5b0802372..ed1b777aeb 100644
--- 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/operator/LogicTaskExecuteRunnableDispatchOperator.java
+++ 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/operator/LogicTaskExecuteRunnableDispatchOperator.java
@@ -17,6 +17,7 @@
 
 package org.apache.dolphinscheduler.server.master.runner.operator;
 
+import org.apache.dolphinscheduler.dao.repository.TaskInstanceDao;
 import 
org.apache.dolphinscheduler.server.master.runner.GlobalTaskDispatchWaitingQueue;
 
 import org.springframework.stereotype.Component;
@@ -24,8 +25,9 @@ import org.springframework.stereotype.Component;
 @Component
 public class LogicTaskExecuteRunnableDispatchOperator extends 
BaseTaskExecuteRunnableDispatchOperator {
 
-    public 
LogicTaskExecuteRunnableDispatchOperator(GlobalTaskDispatchWaitingQueue 
globalTaskDispatchWaitingQueue) {
-        super(globalTaskDispatchWaitingQueue);
+    public 
LogicTaskExecuteRunnableDispatchOperator(GlobalTaskDispatchWaitingQueue 
globalTaskDispatchWaitingQueue,
+                                                    TaskInstanceDao 
taskInstanceDao) {
+        super(globalTaskDispatchWaitingQueue, taskInstanceDao);
     }
 
 }
diff --git 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/operator/TaskExecuteRunnableDispatchOperator.java
 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/operator/TaskExecuteRunnableDispatchOperator.java
index 5a9f070641..5a31f1138d 100644
--- 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/operator/TaskExecuteRunnableDispatchOperator.java
+++ 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/operator/TaskExecuteRunnableDispatchOperator.java
@@ -17,20 +17,16 @@
 
 package org.apache.dolphinscheduler.server.master.runner.operator;
 
+import org.apache.dolphinscheduler.dao.repository.TaskInstanceDao;
 import 
org.apache.dolphinscheduler.server.master.runner.GlobalTaskDispatchWaitingQueue;
-import 
org.apache.dolphinscheduler.server.master.runner.execute.DefaultTaskExecuteRunnable;
 
-import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Component;
 
 @Component
-public class TaskExecuteRunnableDispatchOperator implements 
TaskExecuteRunnableOperator {
+public class TaskExecuteRunnableDispatchOperator extends 
BaseTaskExecuteRunnableDispatchOperator {
 
-    @Autowired
-    private GlobalTaskDispatchWaitingQueue globalTaskDispatchWaitingQueue;
-
-    @Override
-    public void operate(DefaultTaskExecuteRunnable taskExecuteRunnable) {
-        
globalTaskDispatchWaitingQueue.submitNeedToDispatchTaskExecuteRunnable(taskExecuteRunnable);
+    public TaskExecuteRunnableDispatchOperator(GlobalTaskDispatchWaitingQueue 
globalTaskDispatchWaitingQueue,
+                                               TaskInstanceDao 
taskInstanceDao) {
+        super(globalTaskDispatchWaitingQueue, taskInstanceDao);
     }
 }
diff --git 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/operator/TaskExecuteRunnableOperator.java
 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/operator/TaskExecuteRunnableOperator.java
index d270103da1..1d397e3575 100644
--- 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/operator/TaskExecuteRunnableOperator.java
+++ 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/operator/TaskExecuteRunnableOperator.java
@@ -17,7 +17,7 @@
 
 package org.apache.dolphinscheduler.server.master.runner.operator;
 
-import 
org.apache.dolphinscheduler.server.master.runner.execute.DefaultTaskExecuteRunnable;
+import 
org.apache.dolphinscheduler.server.master.runner.DefaultTaskExecuteRunnable;
 
 public interface TaskExecuteRunnableOperator {
 
diff --git 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/operator/TaskExecuteRunnableOperatorManager.java
 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/operator/TaskExecuteRunnableOperatorManager.java
index b217aff866..1b92f5e75c 100644
--- 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/operator/TaskExecuteRunnableOperatorManager.java
+++ 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/operator/TaskExecuteRunnableOperatorManager.java
@@ -17,7 +17,7 @@
 
 package org.apache.dolphinscheduler.server.master.runner.operator;
 
-import 
org.apache.dolphinscheduler.server.master.runner.execute.DefaultTaskExecuteRunnable;
+import 
org.apache.dolphinscheduler.server.master.runner.DefaultTaskExecuteRunnable;
 import org.apache.dolphinscheduler.server.master.utils.TaskUtils;
 
 import org.springframework.beans.factory.annotation.Autowired;
diff --git 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/operator/TaskExecuteRunnablePauseOperator.java
 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/operator/TaskExecuteRunnablePauseOperator.java
index 39896bb0cb..f450044377 100644
--- 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/operator/TaskExecuteRunnablePauseOperator.java
+++ 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/operator/TaskExecuteRunnablePauseOperator.java
@@ -22,7 +22,7 @@ import 
org.apache.dolphinscheduler.extract.base.client.SingletonJdkDynamicRpcCli
 import org.apache.dolphinscheduler.extract.worker.ITaskInstanceOperator;
 import 
org.apache.dolphinscheduler.extract.worker.transportor.TaskInstancePauseRequest;
 import 
org.apache.dolphinscheduler.extract.worker.transportor.TaskInstancePauseResponse;
-import 
org.apache.dolphinscheduler.server.master.runner.execute.DefaultTaskExecuteRunnable;
+import 
org.apache.dolphinscheduler.server.master.runner.DefaultTaskExecuteRunnable;
 
 import org.apache.commons.lang3.StringUtils;
 
diff --git 
a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnableTest.java
 
b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnableTest.java
index d121d7e198..28c0010e3b 100644
--- 
a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnableTest.java
+++ 
b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnableTest.java
@@ -39,7 +39,6 @@ import 
org.apache.dolphinscheduler.plugin.task.api.enums.TaskExecutionStatus;
 import org.apache.dolphinscheduler.server.master.config.MasterConfig;
 import org.apache.dolphinscheduler.server.master.graph.IWorkflowGraph;
 import 
org.apache.dolphinscheduler.server.master.runner.execute.DefaultTaskExecuteRunnableFactory;
-import 
org.apache.dolphinscheduler.server.master.runner.execute.TaskExecuteRunnable;
 import org.apache.dolphinscheduler.service.alert.ListenerEventAlertManager;
 import org.apache.dolphinscheduler.service.alert.ProcessAlertManager;
 import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
diff --git 
a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/runner/dispatcher/MasterTaskDispatcherTest.java
 
b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/runner/dispatcher/MasterTaskDispatcherTest.java
index c9710209e5..f57c9b6a68 100644
--- 
a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/runner/dispatcher/MasterTaskDispatcherTest.java
+++ 
b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/runner/dispatcher/MasterTaskDispatcherTest.java
@@ -20,7 +20,7 @@ package 
org.apache.dolphinscheduler.server.master.runner.dispatcher;
 import org.apache.dolphinscheduler.extract.base.utils.Host;
 import org.apache.dolphinscheduler.server.master.config.MasterConfig;
 import 
org.apache.dolphinscheduler.server.master.processor.queue.TaskEventService;
-import 
org.apache.dolphinscheduler.server.master.runner.execute.TaskExecuteRunnable;
+import org.apache.dolphinscheduler.server.master.runner.TaskExecuteRunnable;
 
 import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.Test;
diff --git 
a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/runner/dispatcher/WorkerTaskDispatcherTest.java
 
b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/runner/dispatcher/WorkerTaskDispatcherTest.java
index 9b447aada3..46c4f53e1a 100644
--- 
a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/runner/dispatcher/WorkerTaskDispatcherTest.java
+++ 
b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/runner/dispatcher/WorkerTaskDispatcherTest.java
@@ -23,7 +23,7 @@ import 
org.apache.dolphinscheduler.server.master.config.MasterConfig;
 import 
org.apache.dolphinscheduler.server.master.dispatch.exceptions.WorkerGroupNotFoundException;
 import org.apache.dolphinscheduler.server.master.dispatch.host.HostManager;
 import 
org.apache.dolphinscheduler.server.master.processor.queue.TaskEventService;
-import 
org.apache.dolphinscheduler.server.master.runner.execute.TaskExecuteRunnable;
+import org.apache.dolphinscheduler.server.master.runner.TaskExecuteRunnable;
 
 import java.util.Optional;
 
diff --git 
a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/runner/execute/PriorityTaskExecuteRunnableTest.java
 
b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/runner/execute/PriorityDelayTaskExecuteRunnableTest.java
similarity index 89%
rename from 
dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/runner/execute/PriorityTaskExecuteRunnableTest.java
rename to 
dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/runner/execute/PriorityDelayTaskExecuteRunnableTest.java
index 3a3a80b3ca..778884e066 100644
--- 
a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/runner/execute/PriorityTaskExecuteRunnableTest.java
+++ 
b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/runner/execute/PriorityDelayTaskExecuteRunnableTest.java
@@ -21,12 +21,14 @@ import org.apache.dolphinscheduler.common.enums.Priority;
 import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
 import org.apache.dolphinscheduler.dao.entity.TaskInstance;
 import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
+import 
org.apache.dolphinscheduler.server.master.runner.DefaultTaskExecuteRunnable;
+import 
org.apache.dolphinscheduler.server.master.runner.PriorityDelayTaskExecuteRunnable;
 import 
org.apache.dolphinscheduler.server.master.runner.operator.TaskExecuteRunnableOperatorManager;
 
 import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.Test;
 
-public class PriorityTaskExecuteRunnableTest {
+public class PriorityDelayTaskExecuteRunnableTest {
 
     @Test
     public void testCompareTo() {
@@ -46,9 +48,9 @@ public class PriorityTaskExecuteRunnableTest {
 
         TaskExecutionContext context1 = new TaskExecutionContext();
         TaskExecutionContext context2 = new TaskExecutionContext();
-        PriorityTaskExecuteRunnable p1 =
+        PriorityDelayTaskExecuteRunnable p1 =
                 new DefaultTaskExecuteRunnable(workflowInstance, t1, context1, 
taskOperatorManager);
-        PriorityTaskExecuteRunnable p2 =
+        PriorityDelayTaskExecuteRunnable p2 =
                 new DefaultTaskExecuteRunnable(workflowInstance, t2, context2, 
taskOperatorManager);
 
         Assertions.assertEquals(0, p1.compareTo(p2));
diff --git 
a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/shell/BaseLinuxShellInterceptorBuilder.java
 
b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/shell/BaseLinuxShellInterceptorBuilder.java
index 3c24977c4a..d5a858a437 100644
--- 
a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/shell/BaseLinuxShellInterceptorBuilder.java
+++ 
b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/shell/BaseLinuxShellInterceptorBuilder.java
@@ -17,6 +17,7 @@
 
 package org.apache.dolphinscheduler.plugin.task.api.shell;
 
+import org.apache.dolphinscheduler.common.constants.TenantConstants;
 import org.apache.dolphinscheduler.common.exception.FileOperateException;
 import org.apache.dolphinscheduler.common.utils.FileUtils;
 import org.apache.dolphinscheduler.common.utils.PropertyUtils;
@@ -67,8 +68,10 @@ public abstract class BaseLinuxShellInterceptorBuilder<T 
extends BaseLinuxShellI
 
     protected List<String> generateBootstrapCommand() throws 
FileOperateException {
         if (sudoEnable) {
-            // Set the tenant owner as the working directory
-            FileUtils.setDirectoryOwner(Paths.get(shellDirectory), runUser);
+            if (!TenantConstants.BOOTSTRAPT_SYSTEM_USER.equals(runUser)) {
+                // Set the tenant owner as the working directory
+                FileUtils.setDirectoryOwner(Paths.get(shellDirectory), 
runUser);
+            }
             return bootstrapCommandInSudoMode();
         }
         return bootstrapCommandInNormalMode();
diff --git 
a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java
 
b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java
index 082d784aab..e61409c9d2 100644
--- 
a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java
+++ 
b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java
@@ -29,7 +29,7 @@ import 
org.apache.dolphinscheduler.plugin.task.api.utils.ProcessUtils;
 import org.apache.dolphinscheduler.server.worker.message.MessageRetryRunner;
 import org.apache.dolphinscheduler.server.worker.registry.WorkerRegistryClient;
 import org.apache.dolphinscheduler.server.worker.rpc.WorkerRpcServer;
-import 
org.apache.dolphinscheduler.server.worker.runner.GlobalTaskInstanceDispatchQueueLooper;
+import 
org.apache.dolphinscheduler.server.worker.runner.GlobalTaskInstanceWaitingQueueLooper;
 import org.apache.dolphinscheduler.server.worker.runner.WorkerManagerThread;
 
 import org.apache.commons.collections4.CollectionUtils;
@@ -68,7 +68,7 @@ public class WorkerServer implements IStoppable {
     private MessageRetryRunner messageRetryRunner;
 
     @Autowired
-    private GlobalTaskInstanceDispatchQueueLooper 
globalTaskInstanceDispatchQueueLooper;
+    private GlobalTaskInstanceWaitingQueueLooper 
globalTaskInstanceWaitingQueueLooper;
 
     /**
      * worker server startup, not use web service
@@ -91,7 +91,7 @@ public class WorkerServer implements IStoppable {
         this.workerManagerThread.start();
 
         this.messageRetryRunner.start();
-        this.globalTaskInstanceDispatchQueueLooper.start();
+        this.globalTaskInstanceWaitingQueueLooper.start();
 
         /*
          * registry hooks, which are called before the process exits
diff --git 
a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/registry/WorkerWaitingStrategy.java
 
b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/registry/WorkerWaitingStrategy.java
index 2ab6ed8ecd..dc97a2c9a4 100644
--- 
a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/registry/WorkerWaitingStrategy.java
+++ 
b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/registry/WorkerWaitingStrategy.java
@@ -25,7 +25,7 @@ import org.apache.dolphinscheduler.registry.api.StrategyType;
 import org.apache.dolphinscheduler.server.worker.config.WorkerConfig;
 import org.apache.dolphinscheduler.server.worker.message.MessageRetryRunner;
 import org.apache.dolphinscheduler.server.worker.rpc.WorkerRpcServer;
-import 
org.apache.dolphinscheduler.server.worker.runner.GlobalTaskInstanceDispatchQueue;
+import 
org.apache.dolphinscheduler.server.worker.runner.GlobalTaskInstanceWaitingQueue;
 import org.apache.dolphinscheduler.server.worker.runner.WorkerManagerThread;
 
 import java.time.Duration;
@@ -57,7 +57,7 @@ public class WorkerWaitingStrategy implements 
WorkerConnectStrategy {
     private WorkerManagerThread workerManagerThread;
 
     @Autowired
-    private GlobalTaskInstanceDispatchQueue globalTaskInstanceDispatchQueue;
+    private GlobalTaskInstanceWaitingQueue globalTaskInstanceWaitingQueue;
 
     @Override
     public void disconnect() {
@@ -121,7 +121,7 @@ public class WorkerWaitingStrategy implements 
WorkerConnectStrategy {
         workerRpcServer.close();
         log.warn("Worker server close the RPC server due to lost connection 
from registry");
         workerManagerThread.clearTask();
-        globalTaskInstanceDispatchQueue.clearTask();
+        globalTaskInstanceWaitingQueue.clearTask();
         log.warn("Worker server clear the tasks due to lost connection from 
registry");
         messageRetryRunner.clearMessage();
         log.warn("Worker server clear the retry message due to lost connection 
from registry");
diff --git 
a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/rpc/StreamingTaskInstanceOperatorImpl.java
 
b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/rpc/StreamingTaskInstanceOperatorImpl.java
index 3466d52c28..c57ee2efca 100644
--- 
a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/rpc/StreamingTaskInstanceOperatorImpl.java
+++ 
b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/rpc/StreamingTaskInstanceOperatorImpl.java
@@ -26,7 +26,7 @@ import 
org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContextCacheMana
 import org.apache.dolphinscheduler.plugin.task.api.stream.StreamTask;
 import org.apache.dolphinscheduler.plugin.task.api.utils.LogUtils;
 import org.apache.dolphinscheduler.server.worker.runner.WorkerManagerThread;
-import 
org.apache.dolphinscheduler.server.worker.runner.WorkerTaskExecuteRunnable;
+import org.apache.dolphinscheduler.server.worker.runner.WorkerTaskExecutor;
 
 import lombok.extern.slf4j.Slf4j;
 
@@ -53,12 +53,12 @@ public class StreamingTaskInstanceOperatorImpl implements 
IStreamingTaskInstance
                 log.error("Cannot find TaskExecutionContext for taskInstance: 
{}", taskInstanceId);
                 return TaskInstanceTriggerSavepointResponse.fail("Cannot find 
TaskExecutionContext");
             }
-            WorkerTaskExecuteRunnable workerTaskExecuteRunnable = 
workerManager.getTaskExecuteThread(taskInstanceId);
-            if (workerTaskExecuteRunnable == null) {
+            WorkerTaskExecutor workerTaskExecutor = 
workerManager.getTaskExecuteThread(taskInstanceId);
+            if (workerTaskExecutor == null) {
                 log.error("Cannot find WorkerTaskExecuteRunnable for 
taskInstance: {}", taskInstanceId);
                 return TaskInstanceTriggerSavepointResponse.fail("Cannot find 
WorkerTaskExecuteRunnable");
             }
-            AbstractTask task = workerTaskExecuteRunnable.getTask();
+            AbstractTask task = workerTaskExecutor.getTask();
             if (task == null) {
                 log.error("Cannot find StreamTask for taskInstance:{}", 
taskInstanceId);
                 return TaskInstanceTriggerSavepointResponse.fail("Cannot find 
StreamTask");
diff --git 
a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/DefaultWorkerDelayTaskExecuteRunnableFactory.java
 
b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/DefaultWorkerDelayTaskExecuteRunnableFactory.java
deleted file mode 100644
index 631b29cc8e..0000000000
--- 
a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/DefaultWorkerDelayTaskExecuteRunnableFactory.java
+++ /dev/null
@@ -1,59 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.dolphinscheduler.server.worker.runner;
-
-import org.apache.dolphinscheduler.plugin.storage.api.StorageOperate;
-import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
-import org.apache.dolphinscheduler.plugin.task.api.TaskPluginManager;
-import org.apache.dolphinscheduler.server.worker.config.WorkerConfig;
-import org.apache.dolphinscheduler.server.worker.registry.WorkerRegistryClient;
-import org.apache.dolphinscheduler.server.worker.rpc.WorkerMessageSender;
-
-import javax.annotation.Nullable;
-
-import lombok.NonNull;
-
-public class DefaultWorkerDelayTaskExecuteRunnableFactory
-        extends
-            
WorkerDelayTaskExecuteRunnableFactory<DefaultWorkerDelayTaskExecuteRunnable> {
-
-    protected DefaultWorkerDelayTaskExecuteRunnableFactory(@NonNull 
TaskExecutionContext taskExecutionContext,
-                                                           @NonNull 
WorkerConfig workerConfig,
-                                                           @NonNull 
WorkerMessageSender workerMessageSender,
-                                                           @NonNull 
TaskPluginManager taskPluginManager,
-                                                           @Nullable 
StorageOperate storageOperate,
-                                                           @NonNull 
WorkerRegistryClient workerRegistryClient) {
-        super(taskExecutionContext,
-                workerConfig,
-                workerMessageSender,
-                taskPluginManager,
-                storageOperate,
-                workerRegistryClient);
-    }
-
-    @Override
-    public DefaultWorkerDelayTaskExecuteRunnable 
createWorkerTaskExecuteRunnable() {
-        return new DefaultWorkerDelayTaskExecuteRunnable(
-                taskExecutionContext,
-                workerConfig,
-                workerMessageSender,
-                taskPluginManager,
-                storageOperate,
-                workerRegistryClient);
-    }
-}
diff --git 
a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/DefaultWorkerDelayTaskExecuteRunnable.java
 
b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/DefaultWorkerTaskExecutor.java
similarity index 76%
rename from 
dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/DefaultWorkerDelayTaskExecuteRunnable.java
rename to 
dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/DefaultWorkerTaskExecutor.java
index 82b91e16ba..19421ee05e 100644
--- 
a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/DefaultWorkerDelayTaskExecuteRunnable.java
+++ 
b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/DefaultWorkerTaskExecutor.java
@@ -30,14 +30,14 @@ import javax.annotation.Nullable;
 
 import lombok.NonNull;
 
-public class DefaultWorkerDelayTaskExecuteRunnable extends 
WorkerDelayTaskExecuteRunnable {
+public class DefaultWorkerTaskExecutor extends WorkerTaskExecutor {
 
-    public DefaultWorkerDelayTaskExecuteRunnable(@NonNull TaskExecutionContext 
taskExecutionContext,
-                                                 @NonNull WorkerConfig 
workerConfig,
-                                                 @NonNull WorkerMessageSender 
workerMessageSender,
-                                                 @NonNull TaskPluginManager 
taskPluginManager,
-                                                 @Nullable StorageOperate 
storageOperate,
-                                                 @NonNull WorkerRegistryClient 
workerRegistryClient) {
+    public DefaultWorkerTaskExecutor(@NonNull TaskExecutionContext 
taskExecutionContext,
+                                     @NonNull WorkerConfig workerConfig,
+                                     @NonNull WorkerMessageSender 
workerMessageSender,
+                                     @NonNull TaskPluginManager 
taskPluginManager,
+                                     @Nullable StorageOperate storageOperate,
+                                     @NonNull WorkerRegistryClient 
workerRegistryClient) {
         super(taskExecutionContext,
                 workerConfig,
                 workerMessageSender,
diff --git 
a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerDelayTaskExecuteRunnableFactory.java
 
b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/DefaultWorkerTaskExecutorFactory.java
similarity index 55%
rename from 
dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerDelayTaskExecuteRunnableFactory.java
rename to 
dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/DefaultWorkerTaskExecutorFactory.java
index f2d39835b1..0141a5cd17 100644
--- 
a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerDelayTaskExecuteRunnableFactory.java
+++ 
b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/DefaultWorkerTaskExecutorFactory.java
@@ -28,24 +28,23 @@ import javax.annotation.Nullable;
 
 import lombok.NonNull;
 
-public abstract class WorkerDelayTaskExecuteRunnableFactory<T extends 
WorkerDelayTaskExecuteRunnable>
+public class DefaultWorkerTaskExecutorFactory
         implements
-            WorkerTaskExecuteRunnableFactory<T> {
-
-    protected final @NonNull TaskExecutionContext taskExecutionContext;
-    protected final @NonNull WorkerConfig workerConfig;
-    protected final @NonNull WorkerMessageSender workerMessageSender;
-    protected final @NonNull TaskPluginManager taskPluginManager;
-    protected final @Nullable StorageOperate storageOperate;
-    protected final @NonNull WorkerRegistryClient workerRegistryClient;
-
-    protected WorkerDelayTaskExecuteRunnableFactory(
-                                                    @NonNull 
TaskExecutionContext taskExecutionContext,
-                                                    @NonNull WorkerConfig 
workerConfig,
-                                                    @NonNull 
WorkerMessageSender workerMessageSender,
-                                                    @NonNull TaskPluginManager 
taskPluginManager,
-                                                    @Nullable StorageOperate 
storageOperate,
-                                                    @NonNull 
WorkerRegistryClient workerRegistryClient) {
+            WorkerTaskExecutorFactory<DefaultWorkerTaskExecutor> {
+
+    private final @NonNull TaskExecutionContext taskExecutionContext;
+    private final @NonNull WorkerConfig workerConfig;
+    private final @NonNull WorkerMessageSender workerMessageSender;
+    private final @NonNull TaskPluginManager taskPluginManager;
+    private final @Nullable StorageOperate storageOperate;
+    private final @NonNull WorkerRegistryClient workerRegistryClient;
+
+    public DefaultWorkerTaskExecutorFactory(@NonNull TaskExecutionContext 
taskExecutionContext,
+                                            @NonNull WorkerConfig workerConfig,
+                                            @NonNull WorkerMessageSender 
workerMessageSender,
+                                            @NonNull TaskPluginManager 
taskPluginManager,
+                                            @Nullable StorageOperate 
storageOperate,
+                                            @NonNull WorkerRegistryClient 
workerRegistryClient) {
         this.taskExecutionContext = taskExecutionContext;
         this.workerConfig = workerConfig;
         this.workerMessageSender = workerMessageSender;
@@ -54,5 +53,14 @@ public abstract class 
WorkerDelayTaskExecuteRunnableFactory<T extends WorkerDela
         this.workerRegistryClient = workerRegistryClient;
     }
 
-    public abstract T createWorkerTaskExecuteRunnable();
+    @Override
+    public DefaultWorkerTaskExecutor createWorkerTaskExecutor() {
+        return new DefaultWorkerTaskExecutor(
+                taskExecutionContext,
+                workerConfig,
+                workerMessageSender,
+                taskPluginManager,
+                storageOperate,
+                workerRegistryClient);
+    }
 }
diff --git 
a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/GlobalTaskInstanceDispatchQueue.java
 
b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/GlobalTaskInstanceWaitingQueue.java
similarity index 95%
rename from 
dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/GlobalTaskInstanceDispatchQueue.java
rename to 
dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/GlobalTaskInstanceWaitingQueue.java
index 2b894913a7..e1ac97b2d3 100644
--- 
a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/GlobalTaskInstanceDispatchQueue.java
+++ 
b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/GlobalTaskInstanceWaitingQueue.java
@@ -31,13 +31,13 @@ import org.springframework.stereotype.Component;
 
 @Slf4j
 @Component
-public class GlobalTaskInstanceDispatchQueue {
+public class GlobalTaskInstanceWaitingQueue {
 
     private final WorkerConfig workerConfig;
 
     private final BlockingQueue<TaskExecutionContext> blockingQueue;
 
-    public GlobalTaskInstanceDispatchQueue(WorkerConfig workerConfig) {
+    public GlobalTaskInstanceWaitingQueue(WorkerConfig workerConfig) {
         this.workerConfig = workerConfig;
         this.blockingQueue = new 
ArrayBlockingQueue<>(workerConfig.getExecThreads());
     }
diff --git 
a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/GlobalTaskInstanceDispatchQueueLooper.java
 
b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/GlobalTaskInstanceWaitingQueueLooper.java
similarity index 66%
rename from 
dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/GlobalTaskInstanceDispatchQueueLooper.java
rename to 
dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/GlobalTaskInstanceWaitingQueueLooper.java
index 654a1dd9fe..6e501a1be6 100644
--- 
a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/GlobalTaskInstanceDispatchQueueLooper.java
+++ 
b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/GlobalTaskInstanceWaitingQueueLooper.java
@@ -18,12 +18,9 @@
 package org.apache.dolphinscheduler.server.worker.runner;
 
 import org.apache.dolphinscheduler.common.thread.BaseDaemonThread;
-import org.apache.dolphinscheduler.common.utils.DateUtils;
-import 
org.apache.dolphinscheduler.extract.master.transportor.ITaskInstanceExecutionEvent;
 import org.apache.dolphinscheduler.plugin.storage.api.StorageOperate;
 import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
 import org.apache.dolphinscheduler.plugin.task.api.TaskPluginManager;
-import org.apache.dolphinscheduler.plugin.task.api.enums.TaskExecutionStatus;
 import org.apache.dolphinscheduler.plugin.task.api.utils.LogUtils;
 import org.apache.dolphinscheduler.server.worker.config.WorkerConfig;
 import org.apache.dolphinscheduler.server.worker.registry.WorkerRegistryClient;
@@ -36,10 +33,10 @@ import org.springframework.stereotype.Component;
 
 @Slf4j
 @Component
-public class GlobalTaskInstanceDispatchQueueLooper extends BaseDaemonThread {
+public class GlobalTaskInstanceWaitingQueueLooper extends BaseDaemonThread {
 
     @Autowired
-    private GlobalTaskInstanceDispatchQueue globalTaskInstanceDispatchQueue;
+    private GlobalTaskInstanceWaitingQueue globalTaskInstanceWaitingQueue;
 
     @Autowired
     private WorkerConfig workerConfig;
@@ -59,7 +56,7 @@ public class GlobalTaskInstanceDispatchQueueLooper extends 
BaseDaemonThread {
     @Autowired
     private WorkerRegistryClient workerRegistryClient;
 
-    protected GlobalTaskInstanceDispatchQueueLooper() {
+    protected GlobalTaskInstanceWaitingQueueLooper() {
         super("GlobalTaskDispatchQueueLooper");
     }
 
@@ -72,39 +69,25 @@ public class GlobalTaskInstanceDispatchQueueLooper extends 
BaseDaemonThread {
     public void run() {
         while (true) {
             try {
-                TaskExecutionContext taskExecutionContext = 
globalTaskInstanceDispatchQueue.take();
+                TaskExecutionContext taskExecutionContext = 
globalTaskInstanceWaitingQueue.take();
                 
LogUtils.setTaskInstanceLogFullPathMDC(taskExecutionContext.getLogPath());
                 
LogUtils.setTaskInstanceIdMDC(taskExecutionContext.getTaskInstanceId());
 
-                int delayTime = taskExecutionContext.getDelayTime();
-                if (delayTime > 0) {
-                    // delay task process
-                    long remainTime =
-                            DateUtils.getRemainTime(
-                                    
DateUtils.timeStampToDate(taskExecutionContext.getFirstSubmitTime()),
-                                    delayTime * 60L);
-                    if (remainTime > 0) {
-                        log.info("Current taskInstance is choose delay 
execution, delay time: {}s", remainTime);
-                        
taskExecutionContext.setCurrentExecutionStatus(TaskExecutionStatus.DELAY_EXECUTION);
-                        // todo: use delay running event
-                        workerMessageSender.sendMessage(taskExecutionContext,
-                                
ITaskInstanceExecutionEvent.TaskInstanceExecutionEventType.FINISH);
-                    }
-                }
-                WorkerDelayTaskExecuteRunnable workerTaskExecuteRunnable = 
WorkerTaskExecuteRunnableFactoryBuilder
-                        .createWorkerDelayTaskExecuteRunnableFactory(
+                WorkerTaskExecutor workerTaskExecutor = 
WorkerTaskExecutorFactoryBuilder
+                        .createWorkerTaskExecutorFactory(
                                 taskExecutionContext,
                                 workerConfig,
                                 workerMessageSender,
                                 taskPluginManager,
                                 storageOperate,
                                 workerRegistryClient)
-                        .createWorkerTaskExecuteRunnable();
-                if (workerManager.offer(workerTaskExecuteRunnable)) {
+                        .createWorkerTaskExecutor();
+                if (workerManager.offer(workerTaskExecutor)) {
                     log.info("Success submit WorkerDelayTaskExecuteRunnable to 
WorkerManagerThread's waiting queue");
                 }
             } catch (InterruptedException e) {
                 log.error("GlobalTaskDispatchQueueLooper interrupted");
+                Thread.currentThread().interrupt();
                 break;
             } catch (Exception ex) {
                 log.error("GlobalTaskDispatchQueueLooper error", ex);
diff --git 
a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerDelayTaskExecuteRunnable.java
 
b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerDelayTaskExecuteRunnable.java
deleted file mode 100644
index 07d85dc57b..0000000000
--- 
a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerDelayTaskExecuteRunnable.java
+++ /dev/null
@@ -1,69 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.dolphinscheduler.server.worker.runner;
-
-import org.apache.dolphinscheduler.common.utils.DateUtils;
-import org.apache.dolphinscheduler.plugin.storage.api.StorageOperate;
-import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
-import org.apache.dolphinscheduler.plugin.task.api.TaskPluginManager;
-import org.apache.dolphinscheduler.server.worker.config.WorkerConfig;
-import org.apache.dolphinscheduler.server.worker.registry.WorkerRegistryClient;
-import org.apache.dolphinscheduler.server.worker.rpc.WorkerMessageSender;
-
-import java.util.concurrent.Delayed;
-import java.util.concurrent.TimeUnit;
-
-import javax.annotation.Nullable;
-
-import lombok.NonNull;
-
-public abstract class WorkerDelayTaskExecuteRunnable extends 
WorkerTaskExecuteRunnable implements Delayed {
-
-    protected WorkerDelayTaskExecuteRunnable(@NonNull TaskExecutionContext 
taskExecutionContext,
-                                             @NonNull WorkerConfig 
workerConfig,
-                                             @NonNull WorkerMessageSender 
workerMessageSender,
-                                             @NonNull TaskPluginManager 
taskPluginManager,
-                                             @Nullable StorageOperate 
storageOperate,
-                                             @NonNull WorkerRegistryClient 
workerRegistryClient) {
-        super(taskExecutionContext,
-                workerConfig,
-                workerMessageSender,
-                taskPluginManager,
-                storageOperate,
-                workerRegistryClient);
-    }
-
-    @Override
-    public long getDelay(TimeUnit unit) {
-        TaskExecutionContext taskExecutionContext = getTaskExecutionContext();
-        return unit.convert(
-                DateUtils.getRemainTime(
-                        
DateUtils.timeStampToDate(taskExecutionContext.getFirstSubmitTime()),
-                        taskExecutionContext.getDelayTime() * 60L),
-                TimeUnit.SECONDS);
-    }
-
-    @Override
-    public int compareTo(Delayed o) {
-        if (o == null) {
-            return 1;
-        }
-        return Long.compare(this.getDelay(TimeUnit.MILLISECONDS), 
o.getDelay(TimeUnit.MILLISECONDS));
-    }
-
-}
diff --git 
a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerExecService.java
 
b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerExecService.java
index f16e887bec..2a6b7feec2 100644
--- 
a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerExecService.java
+++ 
b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerExecService.java
@@ -37,25 +37,19 @@ public class WorkerExecService {
 
     private final ListeningExecutorService listeningExecutorService;
 
-    /**
-     * thread executor service
-     */
     private final ExecutorService execService;
 
-    /**
-     * running task
-     */
-    private final ConcurrentHashMap<Integer, WorkerTaskExecuteRunnable> 
taskExecuteThreadMap;
+    private final ConcurrentHashMap<Integer, WorkerTaskExecutor> 
taskExecuteThreadMap;
 
     public WorkerExecService(ExecutorService execService,
-                             ConcurrentHashMap<Integer, 
WorkerTaskExecuteRunnable> taskExecuteThreadMap) {
+                             ConcurrentHashMap<Integer, WorkerTaskExecutor> 
taskExecuteThreadMap) {
         this.execService = execService;
         this.listeningExecutorService = 
MoreExecutors.listeningDecorator(this.execService);
         this.taskExecuteThreadMap = taskExecuteThreadMap;
         
WorkerServerMetrics.registerWorkerTaskTotalGauge(taskExecuteThreadMap::size);
     }
 
-    public void submit(final WorkerTaskExecuteRunnable taskExecuteThread) {
+    public void submit(final WorkerTaskExecutor taskExecuteThread) {
         
taskExecuteThreadMap.put(taskExecuteThread.getTaskExecutionContext().getTaskInstanceId(),
 taskExecuteThread);
         ListenableFuture future = 
this.listeningExecutorService.submit(taskExecuteThread);
         FutureCallback futureCallback = new FutureCallback() {
@@ -90,7 +84,7 @@ public class WorkerExecService {
         return ((ThreadPoolExecutor) this.execService).getActiveCount();
     }
 
-    public Map<Integer, WorkerTaskExecuteRunnable> getTaskExecuteThreadMap() {
+    public Map<Integer, WorkerTaskExecutor> getTaskExecuteThreadMap() {
         return taskExecuteThreadMap;
     }
 
diff --git 
a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerManagerThread.java
 
b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerManagerThread.java
index 3fdcec4e89..4b666cfb20 100644
--- 
a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerManagerThread.java
+++ 
b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerManagerThread.java
@@ -25,8 +25,9 @@ import 
org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContextCacheMana
 import org.apache.dolphinscheduler.server.worker.config.WorkerConfig;
 import org.apache.dolphinscheduler.server.worker.metrics.WorkerServerMetrics;
 
+import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.DelayQueue;
+import java.util.concurrent.LinkedBlockingQueue;
 
 import javax.annotation.Nullable;
 
@@ -41,28 +42,22 @@ import org.springframework.stereotype.Component;
 @Slf4j
 public class WorkerManagerThread implements Runnable {
 
-    private final DelayQueue<WorkerDelayTaskExecuteRunnable> waitSubmitQueue;
+    private final BlockingQueue<WorkerTaskExecutor> waitSubmitQueue;
     private final WorkerExecService workerExecService;
-    private final WorkerConfig workerConfig;
 
     private final int workerExecThreads;
 
-    /**
-     * running task
-     */
-    private final ConcurrentHashMap<Integer, WorkerTaskExecuteRunnable> 
taskExecuteThreadMap =
-            new ConcurrentHashMap<>();
+    private final ConcurrentHashMap<Integer, WorkerTaskExecutor> 
taskExecuteThreadMap = new ConcurrentHashMap<>();
 
     public WorkerManagerThread(WorkerConfig workerConfig) {
-        this.workerConfig = workerConfig;
         workerExecThreads = workerConfig.getExecThreads();
-        this.waitSubmitQueue = new DelayQueue<>();
+        this.waitSubmitQueue = new LinkedBlockingQueue<>();
         workerExecService = new WorkerExecService(
                 
ThreadUtils.newDaemonFixedThreadExecutor("Worker-Execute-Thread", 
workerConfig.getExecThreads()),
                 taskExecuteThreadMap);
     }
 
-    public @Nullable WorkerTaskExecuteRunnable getTaskExecuteThread(Integer 
taskInstanceId) {
+    public @Nullable WorkerTaskExecutor getTaskExecuteThread(Integer 
taskInstanceId) {
         return taskExecuteThreadMap.get(taskInstanceId);
     }
 
@@ -95,7 +90,7 @@ public class WorkerManagerThread implements Runnable {
                 .forEach(waitSubmitQueue::remove);
     }
 
-    public boolean offer(WorkerDelayTaskExecuteRunnable 
workerDelayTaskExecuteRunnable) {
+    public boolean offer(WorkerTaskExecutor workerDelayTaskExecuteRunnable) {
         return waitSubmitQueue.add(workerDelayTaskExecuteRunnable);
     }
 
@@ -122,8 +117,8 @@ public class WorkerManagerThread implements Runnable {
                     Thread.sleep(Constants.SLEEP_TIME_MILLIS);
                 }
                 if (this.getThreadPoolQueueSize() <= workerExecThreads) {
-                    final WorkerDelayTaskExecuteRunnable 
workerDelayTaskExecuteRunnable = waitSubmitQueue.take();
-                    workerExecService.submit(workerDelayTaskExecuteRunnable);
+                    WorkerTaskExecutor workerTaskExecutor = 
waitSubmitQueue.take();
+                    workerExecService.submit(workerTaskExecutor);
                 } else {
                     WorkerServerMetrics.incWorkerOverloadCount();
                     log.info("Exec queue is full, waiting submit queue {}, 
waiting exec queue size {}",
diff --git 
a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerTaskExecuteRunnable.java
 
b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerTaskExecutor.java
similarity index 95%
rename from 
dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerTaskExecuteRunnable.java
rename to 
dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerTaskExecutor.java
index 15141ed747..c0c7c35b7d 100644
--- 
a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerTaskExecuteRunnable.java
+++ 
b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerTaskExecutor.java
@@ -67,9 +67,9 @@ import org.slf4j.LoggerFactory;
 
 import com.google.common.base.Strings;
 
-public abstract class WorkerTaskExecuteRunnable implements Runnable {
+public abstract class WorkerTaskExecutor implements Runnable {
 
-    protected static final Logger log = 
LoggerFactory.getLogger(WorkerTaskExecuteRunnable.class);
+    protected static final Logger log = 
LoggerFactory.getLogger(WorkerTaskExecutor.class);
 
     protected final TaskExecutionContext taskExecutionContext;
     protected final WorkerConfig workerConfig;
@@ -80,13 +80,13 @@ public abstract class WorkerTaskExecuteRunnable implements 
Runnable {
 
     protected @Nullable AbstractTask task;
 
-    protected WorkerTaskExecuteRunnable(
-                                        @NonNull TaskExecutionContext 
taskExecutionContext,
-                                        @NonNull WorkerConfig workerConfig,
-                                        @NonNull WorkerMessageSender 
workerMessageSender,
-                                        @NonNull TaskPluginManager 
taskPluginManager,
-                                        @Nullable StorageOperate 
storageOperate,
-                                        @NonNull WorkerRegistryClient 
workerRegistryClient) {
+    protected WorkerTaskExecutor(
+                                 @NonNull TaskExecutionContext 
taskExecutionContext,
+                                 @NonNull WorkerConfig workerConfig,
+                                 @NonNull WorkerMessageSender 
workerMessageSender,
+                                 @NonNull TaskPluginManager taskPluginManager,
+                                 @Nullable StorageOperate storageOperate,
+                                 @NonNull WorkerRegistryClient 
workerRegistryClient) {
         this.taskExecutionContext = taskExecutionContext;
         this.workerConfig = workerConfig;
         this.workerMessageSender = workerMessageSender;
diff --git 
a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerTaskExecuteRunnableFactory.java
 
b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerTaskExecutorFactory.java
similarity index 90%
rename from 
dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerTaskExecuteRunnableFactory.java
rename to 
dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerTaskExecutorFactory.java
index 441662f4bc..5a185dbba0 100644
--- 
a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerTaskExecuteRunnableFactory.java
+++ 
b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerTaskExecutorFactory.java
@@ -17,7 +17,7 @@
 
 package org.apache.dolphinscheduler.server.worker.runner;
 
-public interface WorkerTaskExecuteRunnableFactory<T> {
+public interface WorkerTaskExecutorFactory<T> {
 
-    T createWorkerTaskExecuteRunnable();
+    T createWorkerTaskExecutor();
 }
diff --git 
a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerTaskExecuteRunnableFactoryBuilder.java
 
b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerTaskExecutorFactoryBuilder.java
similarity index 72%
rename from 
dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerTaskExecuteRunnableFactoryBuilder.java
rename to 
dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerTaskExecutorFactoryBuilder.java
index 603462e5b0..c2efdc9c7a 100644
--- 
a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerTaskExecuteRunnableFactoryBuilder.java
+++ 
b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerTaskExecutorFactoryBuilder.java
@@ -30,15 +30,15 @@ import lombok.NonNull;
 import lombok.experimental.UtilityClass;
 
 @UtilityClass
-public class WorkerTaskExecuteRunnableFactoryBuilder {
+public class WorkerTaskExecutorFactoryBuilder {
 
-    public static WorkerDelayTaskExecuteRunnableFactory<?> 
createWorkerDelayTaskExecuteRunnableFactory(@NonNull TaskExecutionContext 
taskExecutionContext,
-                                                                               
                        @NonNull WorkerConfig workerConfig,
-                                                                               
                        @NonNull WorkerMessageSender workerMessageSender,
-                                                                               
                        @NonNull TaskPluginManager taskPluginManager,
-                                                                               
                        @Nullable StorageOperate storageOperate,
-                                                                               
                        @NonNull WorkerRegistryClient workerRegistryClient) {
-        return new 
DefaultWorkerDelayTaskExecuteRunnableFactory(taskExecutionContext,
+    public static WorkerTaskExecutorFactory<? extends WorkerTaskExecutor> 
createWorkerTaskExecutorFactory(@NonNull TaskExecutionContext 
taskExecutionContext,
+                                                                               
                           @NonNull WorkerConfig workerConfig,
+                                                                               
                           @NonNull WorkerMessageSender workerMessageSender,
+                                                                               
                           @NonNull TaskPluginManager taskPluginManager,
+                                                                               
                           @Nullable StorageOperate storageOperate,
+                                                                               
                           @NonNull WorkerRegistryClient workerRegistryClient) {
+        return new DefaultWorkerTaskExecutorFactory(taskExecutionContext,
                 workerConfig,
                 workerMessageSender,
                 taskPluginManager,
diff --git 
a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/operator/TaskInstanceDispatchOperationFunction.java
 
b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/operator/TaskInstanceDispatchOperationFunction.java
index 3ff110c459..4d3ebc9aa9 100644
--- 
a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/operator/TaskInstanceDispatchOperationFunction.java
+++ 
b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/operator/TaskInstanceDispatchOperationFunction.java
@@ -24,7 +24,7 @@ import 
org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContextCacheMana
 import org.apache.dolphinscheduler.plugin.task.api.utils.LogUtils;
 import org.apache.dolphinscheduler.server.worker.config.WorkerConfig;
 import org.apache.dolphinscheduler.server.worker.metrics.TaskMetrics;
-import 
org.apache.dolphinscheduler.server.worker.runner.GlobalTaskInstanceDispatchQueue;
+import 
org.apache.dolphinscheduler.server.worker.runner.GlobalTaskInstanceWaitingQueue;
 
 import lombok.extern.slf4j.Slf4j;
 
@@ -41,7 +41,7 @@ public class TaskInstanceDispatchOperationFunction
     private WorkerConfig workerConfig;
 
     @Autowired
-    private GlobalTaskInstanceDispatchQueue globalTaskInstanceDispatchQueue;
+    private GlobalTaskInstanceWaitingQueue globalTaskInstanceWaitingQueue;
 
     @Override
     public TaskInstanceDispatchResponse operate(TaskInstanceDispatchRequest 
taskInstanceDispatchRequest) {
@@ -57,7 +57,7 @@ public class TaskInstanceDispatchOperationFunction
                     taskExecutionContext.getTaskInstanceId());
             
TaskMetrics.incrTaskTypeExecuteCount(taskExecutionContext.getTaskType());
 
-            if 
(!globalTaskInstanceDispatchQueue.addDispatchTask(taskExecutionContext)) {
+            if 
(!globalTaskInstanceWaitingQueue.addDispatchTask(taskExecutionContext)) {
                 log.error("Submit task: {} to wait queue error, current queue 
size: {} is full",
                         taskExecutionContext.getTaskName(), 
workerConfig.getExecThreads());
                 return 
TaskInstanceDispatchResponse.failed(taskExecutionContext.getTaskInstanceId(),
diff --git 
a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/operator/TaskInstanceKillOperationFunction.java
 
b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/operator/TaskInstanceKillOperationFunction.java
index 347a52fa50..c5f4ffb78b 100644
--- 
a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/operator/TaskInstanceKillOperationFunction.java
+++ 
b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/operator/TaskInstanceKillOperationFunction.java
@@ -29,7 +29,7 @@ import 
org.apache.dolphinscheduler.plugin.task.api.utils.LogUtils;
 import org.apache.dolphinscheduler.plugin.task.api.utils.ProcessUtils;
 import org.apache.dolphinscheduler.server.worker.message.MessageRetryRunner;
 import org.apache.dolphinscheduler.server.worker.runner.WorkerManagerThread;
-import 
org.apache.dolphinscheduler.server.worker.runner.WorkerTaskExecuteRunnable;
+import org.apache.dolphinscheduler.server.worker.runner.WorkerTaskExecutor;
 
 import lombok.extern.slf4j.Slf4j;
 
@@ -103,12 +103,12 @@ public class TaskInstanceKillOperationFunction
     }
 
     protected void cancelApplication(int taskInstanceId) {
-        WorkerTaskExecuteRunnable workerTaskExecuteRunnable = 
workerManager.getTaskExecuteThread(taskInstanceId);
-        if (workerTaskExecuteRunnable == null) {
+        WorkerTaskExecutor workerTaskExecutor = 
workerManager.getTaskExecuteThread(taskInstanceId);
+        if (workerTaskExecutor == null) {
             log.warn("taskExecuteThread not found, taskInstanceId:{}", 
taskInstanceId);
             return;
         }
-        AbstractTask task = workerTaskExecuteRunnable.getTask();
+        AbstractTask task = workerTaskExecutor.getTask();
         if (task == null) {
             log.warn("task not found, taskInstanceId:{}", taskInstanceId);
             return;
diff --git 
a/dolphinscheduler-worker/src/test/java/org/apache/dolphinscheduler/server/worker/runner/DefaultWorkerDelayTaskExecuteRunnableTest.java
 
b/dolphinscheduler-worker/src/test/java/org/apache/dolphinscheduler/server/worker/runner/DefaultWorkerTaskExecutorTest.java
similarity index 91%
rename from 
dolphinscheduler-worker/src/test/java/org/apache/dolphinscheduler/server/worker/runner/DefaultWorkerDelayTaskExecuteRunnableTest.java
rename to 
dolphinscheduler-worker/src/test/java/org/apache/dolphinscheduler/server/worker/runner/DefaultWorkerTaskExecutorTest.java
index 79d04edae1..43ef6f87d0 100644
--- 
a/dolphinscheduler-worker/src/test/java/org/apache/dolphinscheduler/server/worker/runner/DefaultWorkerDelayTaskExecuteRunnableTest.java
+++ 
b/dolphinscheduler-worker/src/test/java/org/apache/dolphinscheduler/server/worker/runner/DefaultWorkerTaskExecutorTest.java
@@ -30,7 +30,7 @@ import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.Test;
 import org.mockito.Mockito;
 
-public class DefaultWorkerDelayTaskExecuteRunnableTest {
+public class DefaultWorkerTaskExecutorTest {
 
     private TaskExecutionContext taskExecutionContext = 
Mockito.mock(TaskExecutionContext.class);
 
@@ -54,7 +54,7 @@ public class DefaultWorkerDelayTaskExecuteRunnableTest {
                 .processDefineId(0)
                 .firstSubmitTime(System.currentTimeMillis())
                 .build();
-        WorkerTaskExecuteRunnable workerTaskExecuteRunnable = new 
DefaultWorkerDelayTaskExecuteRunnable(
+        WorkerTaskExecutor workerTaskExecutor = new DefaultWorkerTaskExecutor(
                 taskExecutionContext,
                 workerConfig,
                 workerMessageSender,
@@ -62,7 +62,7 @@ public class DefaultWorkerDelayTaskExecuteRunnableTest {
                 storageOperate,
                 workerRegistryClient);
 
-        Assertions.assertAll(workerTaskExecuteRunnable::run);
+        Assertions.assertAll(workerTaskExecutor::run);
         Assertions.assertEquals(TaskExecutionStatus.SUCCESS, 
taskExecutionContext.getCurrentExecutionStatus());
     }
 
@@ -78,7 +78,7 @@ public class DefaultWorkerDelayTaskExecuteRunnableTest {
                 .taskParams(
                         
"{\"localParams\":[],\"resourceList\":[],\"type\":\"POSTGRESQL\",\"datasource\":null,\"sql\":\"select
 * from 
t_ds_user\",\"sqlType\":\"0\",\"preStatements\":[],\"postStatements\":[],\"segmentSeparator\":\"\",\"displayRows\":10,\"conditionResult\":\"null\",\"dependence\":\"null\",\"switchResult\":\"null\",\"waitStartTimeout\":null}")
                 .build();
-        WorkerTaskExecuteRunnable workerTaskExecuteRunnable = new 
DefaultWorkerDelayTaskExecuteRunnable(
+        WorkerTaskExecutor workerTaskExecutor = new DefaultWorkerTaskExecutor(
                 taskExecutionContext,
                 workerConfig,
                 workerMessageSender,
@@ -86,7 +86,7 @@ public class DefaultWorkerDelayTaskExecuteRunnableTest {
                 storageOperate,
                 workerRegistryClient);
 
-        Assertions.assertAll(workerTaskExecuteRunnable::run);
+        Assertions.assertAll(workerTaskExecutor::run);
         Assertions.assertEquals(TaskExecutionStatus.FAILURE, 
taskExecutionContext.getCurrentExecutionStatus());
     }
 }

Reply via email to