This is an automated email from the ASF dual-hosted git repository.

wenjun pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git


The following commit(s) were added to refs/heads/dev by this push:
     new 5523a62825 Remove taskQueue and looper in worker (#15292)
5523a62825 is described below

commit 5523a62825eb7168de8cb2fda564d5a923b5edd4
Author: Wenjun Ruan <[email protected]>
AuthorDate: Wed Dec 27 19:51:45 2023 +0800

    Remove taskQueue and looper in worker (#15292)
---
 .../common/thread/ThreadUtils.java                 |   6 +-
 .../server/master/config/MasterConfig.java         |   4 +-
 ...ogicITaskInstanceDispatchOperationFunction.java |  16 +-
 .../LogicITaskInstanceKillOperationFunction.java   |   8 +-
 .../GlobalMasterTaskExecuteRunnableQueue.java      |  52 ------
 ...GlobalMasterTaskExecuteRunnableQueueLooper.java |  84 ---------
 .../runner/GlobalTaskDispatchWaitingQueue.java     |   8 +-
 .../GlobalTaskDispatchWaitingQueueLooper.java      |  16 +-
 .../master/runner/MasterTaskExecutorBootstrap.java |   6 -
 .../runner/dispatcher/TaskDispatchFactory.java     |   5 +
 .../execute/AsyncMasterTaskDelayQueueLooper.java   |  15 +-
 .../IMasterTaskExecutorThreadPool.java}            |  24 +--
 .../execute/MasterAsyncTaskExecutorThreadPool.java |  58 ++++++
 ....java => MasterSyncTaskExecutorThreadPool.java} |  34 ++--
 .../MasterTaskExecutorThreadPoolManager.java       |  59 +++++++
 .../BaseTaskExecuteRunnableDispatchOperator.java   |   2 +-
 .../plugin/task/api/AbstractCommandExecutor.java   |  16 +-
 .../task/api/TaskExecutionContextCacheManager.java |  72 --------
 .../plugin/task/api/k8s/impl/K8sTaskExecutor.java  |  12 +-
 .../plugin/task/api/k8s/K8sTaskExecutorTest.java   |   4 +-
 .../plugin/task/dvc/DvcTaskTest.java               |   2 -
 .../plugin/kubeflow/KubeflowTaskTest.java          |   2 -
 .../plugin/task/mlflow/MlflowTaskTest.java         |   2 -
 .../plugin/task/pytorch/PytorchTaskTest.java       |   2 -
 .../server/worker/WorkerServer.java                |  38 ++--
 .../server/worker/message/MessageRetryRunner.java  |  13 +-
 .../worker/registry/WorkerRegistryClient.java      |   6 +-
 .../worker/registry/WorkerWaitingStrategy.java     |  11 +-
 .../rpc/StreamingTaskInstanceOperatorImpl.java     |  19 +-
 .../server/worker/rpc/WorkerLogServiceImpl.java    |  14 +-
 .../runner/GlobalTaskInstanceWaitingQueue.java     |  70 --------
 .../GlobalTaskInstanceWaitingQueueLooper.java      | 101 -----------
 .../server/worker/runner/TaskCallbackImpl.java     |  18 +-
 .../server/worker/runner/WorkerExecService.java    |  91 ----------
 .../server/worker/runner/WorkerManagerThread.java  | 150 ----------------
 .../server/worker/runner/WorkerTaskExecutor.java   |   9 +-
 .../runner/WorkerTaskExecutorFactoryBuilder.java   |  33 ++--
 .../worker/runner/WorkerTaskExecutorHolder.java    |  54 ++++++
 .../runner/WorkerTaskExecutorThreadPool.java       |  96 ++++++++++
 .../TaskInstanceDispatchOperationFunction.java     |  20 ++-
 .../TaskInstanceKillOperationFunction.java         |  37 ++--
 .../UpdateWorkflowHostOperationFunction.java       |  33 ++--
 .../worker/registry/WorkerRegistryClientTest.java  |   4 +-
 .../runner/WorkerTaskExecutorThreadPoolTest.java   | 194 +++++++++++++++++++++
 44 files changed, 651 insertions(+), 869 deletions(-)

diff --git 
a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/thread/ThreadUtils.java
 
b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/thread/ThreadUtils.java
index 3b66552274..5eef04ed82 100644
--- 
a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/thread/ThreadUtils.java
+++ 
b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/thread/ThreadUtils.java
@@ -17,10 +17,10 @@
 
 package org.apache.dolphinscheduler.common.thread;
 
-import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.ThreadPoolExecutor;
 
 import lombok.experimental.UtilityClass;
 import lombok.extern.slf4j.Slf4j;
@@ -38,9 +38,9 @@ public class ThreadUtils {
      * @param threadsNum threadsNum
      * @return ExecutorService
      */
-    public static ExecutorService newDaemonFixedThreadExecutor(String 
threadName, int threadsNum) {
+    public static ThreadPoolExecutor newDaemonFixedThreadExecutor(String 
threadName, int threadsNum) {
         ThreadFactory threadFactory = new 
ThreadFactoryBuilder().setDaemon(true).setNameFormat(threadName).build();
-        return Executors.newFixedThreadPool(threadsNum, threadFactory);
+        return (ThreadPoolExecutor) Executors.newFixedThreadPool(threadsNum, 
threadFactory);
     }
 
     public static ScheduledExecutorService 
newSingleDaemonScheduledExecutorService(String threadName) {
diff --git 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/config/MasterConfig.java
 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/config/MasterConfig.java
index f1b8485c69..d7f10f03fb 100644
--- 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/config/MasterConfig.java
+++ 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/config/MasterConfig.java
@@ -64,9 +64,9 @@ public class MasterConfig implements Validator {
     private int execThreads = 10;
 
     // todo: change to sync thread pool/ async thread pool ?
-    private int masterTaskExecuteThreadPoolSize = 
Runtime.getRuntime().availableProcessors();
+    private int masterSyncTaskExecutorThreadPoolSize = 
Runtime.getRuntime().availableProcessors();
 
-    private int masterAsyncTaskStateCheckThreadPoolSize = 
Runtime.getRuntime().availableProcessors();
+    private int masterAsyncTaskExecutorThreadPoolSize = 
Runtime.getRuntime().availableProcessors();
     /**
      * The task dispatch thread pool size.
      */
diff --git 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/rpc/LogicITaskInstanceDispatchOperationFunction.java
 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/rpc/LogicITaskInstanceDispatchOperationFunction.java
index f69ff4fb77..7590708208 100644
--- 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/rpc/LogicITaskInstanceDispatchOperationFunction.java
+++ 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/rpc/LogicITaskInstanceDispatchOperationFunction.java
@@ -21,10 +21,10 @@ import 
org.apache.dolphinscheduler.extract.master.transportor.LogicTaskDispatchR
 import 
org.apache.dolphinscheduler.extract.master.transportor.LogicTaskDispatchResponse;
 import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
 import org.apache.dolphinscheduler.plugin.task.api.utils.LogUtils;
-import 
org.apache.dolphinscheduler.server.master.runner.GlobalMasterTaskExecuteRunnableQueue;
 import 
org.apache.dolphinscheduler.server.master.runner.execute.MasterTaskExecutionContextHolder;
 import 
org.apache.dolphinscheduler.server.master.runner.execute.MasterTaskExecutor;
 import 
org.apache.dolphinscheduler.server.master.runner.execute.MasterTaskExecutorFactoryBuilder;
+import 
org.apache.dolphinscheduler.server.master.runner.execute.MasterTaskExecutorThreadPoolManager;
 
 import lombok.extern.slf4j.Slf4j;
 
@@ -41,7 +41,7 @@ public class LogicITaskInstanceDispatchOperationFunction
     private MasterTaskExecutorFactoryBuilder masterTaskExecutorFactoryBuilder;
 
     @Autowired
-    private GlobalMasterTaskExecuteRunnableQueue 
globalMasterTaskExecuteRunnableQueue;
+    private MasterTaskExecutorThreadPoolManager masterTaskExecutorThreadPool;
 
     @Override
     public LogicTaskDispatchResponse operate(LogicTaskDispatchRequest 
taskDispatchRequest) {
@@ -62,16 +62,12 @@ public class LogicITaskInstanceDispatchOperationFunction
             MasterTaskExecutor masterTaskExecutor = 
masterTaskExecutorFactoryBuilder
                     
.createMasterTaskExecutorFactory(taskExecutionContext.getTaskType())
                     .createMasterTaskExecutor(taskExecutionContext);
-            if (globalMasterTaskExecuteRunnableQueue
-                    .submitMasterTaskExecuteRunnable(masterTaskExecutor)) {
-                log.info("Submit LogicTask: {} to 
MasterDelayTaskExecuteRunnableDelayQueue success", taskInstanceName);
+            if 
(masterTaskExecutorThreadPool.submitMasterTaskExecutor(masterTaskExecutor)) {
+                log.info("Submit LogicTask: {} to MasterTaskExecutorThreadPool 
success", taskInstanceName);
                 return LogicTaskDispatchResponse.success(taskInstanceId);
             } else {
-                log.error(
-                        "Submit LogicTask: {} to 
MasterDelayTaskExecuteRunnableDelayQueue failed, current task waiting queue 
size: {} is full",
-                        taskInstanceName, 
globalMasterTaskExecuteRunnableQueue.size());
-                return LogicTaskDispatchResponse.failed(taskInstanceId,
-                        "MasterDelayTaskExecuteRunnableDelayQueue is full");
+                log.error("Submit LogicTask: {} to 
MasterTaskExecutorThreadPool failed", taskInstanceName);
+                return LogicTaskDispatchResponse.failed(taskInstanceId, 
"MasterTaskExecutorThreadPool is full");
             }
         } finally {
             LogUtils.removeWorkflowAndTaskInstanceIdMDC();
diff --git 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/rpc/LogicITaskInstanceKillOperationFunction.java
 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/rpc/LogicITaskInstanceKillOperationFunction.java
index 27fc52333d..121c5ae6e1 100644
--- 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/rpc/LogicITaskInstanceKillOperationFunction.java
+++ 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/rpc/LogicITaskInstanceKillOperationFunction.java
@@ -21,10 +21,10 @@ import 
org.apache.dolphinscheduler.extract.master.transportor.LogicTaskKillReque
 import 
org.apache.dolphinscheduler.extract.master.transportor.LogicTaskKillResponse;
 import org.apache.dolphinscheduler.plugin.task.api.utils.LogUtils;
 import 
org.apache.dolphinscheduler.server.master.exception.MasterTaskExecuteException;
-import 
org.apache.dolphinscheduler.server.master.runner.GlobalMasterTaskExecuteRunnableQueue;
 import 
org.apache.dolphinscheduler.server.master.runner.execute.MasterTaskExecutionContextHolder;
 import 
org.apache.dolphinscheduler.server.master.runner.execute.MasterTaskExecutor;
 import 
org.apache.dolphinscheduler.server.master.runner.execute.MasterTaskExecutorHolder;
+import 
org.apache.dolphinscheduler.server.master.runner.execute.MasterTaskExecutorThreadPoolManager;
 
 import lombok.extern.slf4j.Slf4j;
 
@@ -38,7 +38,7 @@ public class LogicITaskInstanceKillOperationFunction
             ITaskInstanceOperationFunction<LogicTaskKillRequest, 
LogicTaskKillResponse> {
 
     @Autowired
-    private GlobalMasterTaskExecuteRunnableQueue 
globalMasterTaskExecuteRunnableQueue;
+    private MasterTaskExecutorThreadPoolManager masterTaskExecutorThreadPool;
 
     @Override
     public LogicTaskKillResponse operate(LogicTaskKillRequest taskKillRequest) 
{
@@ -54,8 +54,8 @@ public class LogicITaskInstanceKillOperationFunction
             }
             try {
                 masterTaskExecutor.cancelTask();
-                globalMasterTaskExecuteRunnableQueue
-                        .removeMasterTaskExecuteRunnable(masterTaskExecutor);
+                // todo: if we remove success then we don't need to cancel?
+                
masterTaskExecutorThreadPool.removeMasterTaskExecutor(masterTaskExecutor);
                 return LogicTaskKillResponse.success();
             } catch (MasterTaskExecuteException e) {
                 log.error("Cancel MasterTaskExecuteRunnable failed ", e);
diff --git 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/GlobalMasterTaskExecuteRunnableQueue.java
 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/GlobalMasterTaskExecuteRunnableQueue.java
deleted file mode 100644
index 9005416b92..0000000000
--- 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/GlobalMasterTaskExecuteRunnableQueue.java
+++ /dev/null
@@ -1,52 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.dolphinscheduler.server.master.runner;
-
-import 
org.apache.dolphinscheduler.server.master.runner.execute.MasterTaskExecutor;
-
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.LinkedBlockingQueue;
-
-import org.springframework.stereotype.Component;
-
-/**
- *
- */
-@Component
-public class GlobalMasterTaskExecuteRunnableQueue {
-
-    private final BlockingQueue<MasterTaskExecutor> 
masterTaskExecutorBlockingQueue =
-            new LinkedBlockingQueue<>();
-
-    public boolean submitMasterTaskExecuteRunnable(MasterTaskExecutor 
masterTaskExecutor) {
-        return masterTaskExecutorBlockingQueue.offer(masterTaskExecutor);
-    }
-
-    public MasterTaskExecutor takeMasterTaskExecuteRunnable() throws 
InterruptedException {
-        return masterTaskExecutorBlockingQueue.take();
-    }
-
-    public boolean removeMasterTaskExecuteRunnable(MasterTaskExecutor 
masterTaskExecutor) {
-        return masterTaskExecutorBlockingQueue.remove(masterTaskExecutor);
-    }
-
-    public int size() {
-        return masterTaskExecutorBlockingQueue.size();
-    }
-
-}
diff --git 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/GlobalMasterTaskExecuteRunnableQueueLooper.java
 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/GlobalMasterTaskExecuteRunnableQueueLooper.java
deleted file mode 100644
index e0b1f80704..0000000000
--- 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/GlobalMasterTaskExecuteRunnableQueueLooper.java
+++ /dev/null
@@ -1,84 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.dolphinscheduler.server.master.runner;
-
-import org.apache.dolphinscheduler.common.thread.BaseDaemonThread;
-import 
org.apache.dolphinscheduler.server.master.runner.execute.MasterTaskExecutor;
-import 
org.apache.dolphinscheduler.server.master.runner.execute.MasterTaskExecutorHolder;
-import 
org.apache.dolphinscheduler.server.master.runner.execute.MasterTaskExecutorThreadPool;
-
-import java.util.concurrent.atomic.AtomicBoolean;
-
-import lombok.extern.slf4j.Slf4j;
-
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.stereotype.Component;
-
-@Slf4j
-@Component
-public class GlobalMasterTaskExecuteRunnableQueueLooper extends 
BaseDaemonThread implements AutoCloseable {
-
-    @Autowired
-    private GlobalMasterTaskExecuteRunnableQueue 
globalMasterTaskExecuteRunnableQueue;
-
-    @Autowired
-    private MasterTaskExecutorThreadPool masterTaskExecutorThreadPool;
-
-    private final AtomicBoolean RUNNING_FLAG = new AtomicBoolean(false);
-
-    public GlobalMasterTaskExecuteRunnableQueueLooper() {
-        super("MasterDelayTaskExecuteRunnableDelayQueueLooper");
-    }
-
-    @Override
-    public synchronized void start() {
-        if (!RUNNING_FLAG.compareAndSet(false, true)) {
-            log.error("The MasterDelayTaskExecuteRunnableDelayQueueLooper 
already started, will not start again");
-            return;
-        }
-        log.info("MasterDelayTaskExecuteRunnableDelayQueueLooper starting...");
-        super.start();
-        masterTaskExecutorThreadPool.start();
-        log.info("MasterDelayTaskExecuteRunnableDelayQueueLooper started...");
-    }
-
-    @Override
-    public void run() {
-        while (RUNNING_FLAG.get()) {
-            try {
-                final MasterTaskExecutor masterTaskExecutor =
-                        
globalMasterTaskExecuteRunnableQueue.takeMasterTaskExecuteRunnable();
-                
masterTaskExecutorThreadPool.submitMasterTaskExecutor(masterTaskExecutor);
-                
MasterTaskExecutorHolder.putMasterTaskExecuteRunnable(masterTaskExecutor);
-            } catch (InterruptedException ex) {
-                Thread.currentThread().interrupt();
-                log.warn("MasterDelayTaskExecuteRunnableDelayQueueLooper has 
been interrupted, will stop loop");
-                break;
-            }
-        }
-        log.info("MasterDelayTaskExecuteRunnableDelayQueueLooper stop 
loop...");
-    }
-
-    @Override
-    public void close() throws Exception {
-        if (RUNNING_FLAG.compareAndSet(true, false)) {
-            log.info("MasterDelayTaskExecuteRunnableDelayQueueLooper 
stopping...");
-            log.info("MasterDelayTaskExecuteRunnableDelayQueueLooper 
stopped...");
-        }
-    }
-}
diff --git 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/GlobalTaskDispatchWaitingQueue.java
 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/GlobalTaskDispatchWaitingQueue.java
index f4d50537c6..7e0d683571 100644
--- 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/GlobalTaskDispatchWaitingQueue.java
+++ 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/GlobalTaskDispatchWaitingQueue.java
@@ -23,17 +23,21 @@ import lombok.extern.slf4j.Slf4j;
 
 import org.springframework.stereotype.Component;
 
+/**
+ * The class is used to store {@link TaskExecuteRunnable} which needs to be 
dispatched. The {@link TaskExecuteRunnable} will be stored in a {@link 
DelayQueue},
+ * if the {@link TaskExecuteRunnable}'s delay time is 0, then it will be 
consumed by {@link GlobalTaskDispatchWaitingQueueLooper}.
+ */
 @Slf4j
 @Component
 public class GlobalTaskDispatchWaitingQueue {
 
     private final DelayQueue<DefaultTaskExecuteRunnable> queue = new 
DelayQueue<>();
 
-    public void 
submitNeedToDispatchTaskExecuteRunnable(DefaultTaskExecuteRunnable 
priorityTaskExecuteRunnable) {
+    public void submitTaskExecuteRunnable(DefaultTaskExecuteRunnable 
priorityTaskExecuteRunnable) {
         queue.put(priorityTaskExecuteRunnable);
     }
 
-    public DefaultTaskExecuteRunnable takeNeedToDispatchTaskExecuteRunnable() 
throws InterruptedException {
+    public DefaultTaskExecuteRunnable takeTaskExecuteRunnable() throws 
InterruptedException {
         return queue.take();
     }
 
diff --git 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/GlobalTaskDispatchWaitingQueueLooper.java
 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/GlobalTaskDispatchWaitingQueueLooper.java
index b496bea5a5..a1f4b28783 100644
--- 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/GlobalTaskDispatchWaitingQueueLooper.java
+++ 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/GlobalTaskDispatchWaitingQueueLooper.java
@@ -42,7 +42,7 @@ public class GlobalTaskDispatchWaitingQueueLooper extends 
BaseDaemonThread imple
 
     private final AtomicBoolean RUNNING_FLAG = new AtomicBoolean(false);
 
-    private final AtomicInteger DISPATCHED_TIMES = new AtomicInteger();
+    private final AtomicInteger DISPATCHED_CONSECUTIVE_FAILURE_TIMES = new 
AtomicInteger();
 
     private static final Integer MAX_DISPATCHED_FAILED_TIMES = 100;
 
@@ -66,24 +66,24 @@ public class GlobalTaskDispatchWaitingQueueLooper extends 
BaseDaemonThread imple
         DefaultTaskExecuteRunnable defaultTaskExecuteRunnable;
         while (RUNNING_FLAG.get()) {
             try {
-                defaultTaskExecuteRunnable = 
globalTaskDispatchWaitingQueue.takeNeedToDispatchTaskExecuteRunnable();
+                defaultTaskExecuteRunnable = 
globalTaskDispatchWaitingQueue.takeTaskExecuteRunnable();
             } catch (InterruptedException e) {
                 log.warn("Get waiting dispatch task failed, the current thread 
has been interrupted, will stop loop");
                 Thread.currentThread().interrupt();
                 break;
             }
             try {
-                final TaskDispatcher taskDispatcher = taskDispatchFactory
-                        
.getTaskDispatcher(defaultTaskExecuteRunnable.getTaskInstance().getTaskType());
+                TaskDispatcher taskDispatcher =
+                        
taskDispatchFactory.getTaskDispatcher(defaultTaskExecuteRunnable.getTaskInstance());
                 taskDispatcher.dispatchTask(defaultTaskExecuteRunnable);
-                DISPATCHED_TIMES.set(0);
+                DISPATCHED_CONSECUTIVE_FAILURE_TIMES.set(0);
             } catch (Exception e) {
                 
defaultTaskExecuteRunnable.getTaskExecutionContext().increaseDispatchFailTimes();
-                
globalTaskDispatchWaitingQueue.submitNeedToDispatchTaskExecuteRunnable(defaultTaskExecuteRunnable);
-                if (DISPATCHED_TIMES.incrementAndGet() > 
MAX_DISPATCHED_FAILED_TIMES) {
+                
globalTaskDispatchWaitingQueue.submitTaskExecuteRunnable(defaultTaskExecuteRunnable);
+                if (DISPATCHED_CONSECUTIVE_FAILURE_TIMES.incrementAndGet() > 
MAX_DISPATCHED_FAILED_TIMES) {
                     ThreadUtils.sleep(10 * 1000L);
                 }
-                log.error("Dispatch task failed", e);
+                log.error("Dispatch Task: {} failed", 
defaultTaskExecuteRunnable.getTaskInstance().getName(), e);
             }
         }
         log.info("GlobalTaskDispatchWaitingQueueLooper started...");
diff --git 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterTaskExecutorBootstrap.java
 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterTaskExecutorBootstrap.java
index 3e99d2141c..7d3bf6940b 100644
--- 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterTaskExecutorBootstrap.java
+++ 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterTaskExecutorBootstrap.java
@@ -31,16 +31,12 @@ public class MasterTaskExecutorBootstrap implements 
AutoCloseable {
     @Autowired
     private GlobalTaskDispatchWaitingQueueLooper 
globalTaskDispatchWaitingQueueLooper;
 
-    @Autowired
-    private GlobalMasterTaskExecuteRunnableQueueLooper 
globalMasterTaskExecuteRunnableQueueLooper;
-
     @Autowired
     private AsyncMasterTaskDelayQueueLooper asyncMasterTaskDelayQueueLooper;
 
     public synchronized void start() {
         log.info("MasterTaskExecutorBootstrap starting...");
         globalTaskDispatchWaitingQueueLooper.start();
-        globalMasterTaskExecuteRunnableQueueLooper.start();
         asyncMasterTaskDelayQueueLooper.start();
         log.info("MasterTaskExecutorBootstrap started...");
     }
@@ -51,8 +47,6 @@ public class MasterTaskExecutorBootstrap implements 
AutoCloseable {
         try (
                 final GlobalTaskDispatchWaitingQueueLooper 
globalTaskDispatchWaitingQueueLooper1 =
                         globalTaskDispatchWaitingQueueLooper;
-                final GlobalMasterTaskExecuteRunnableQueueLooper 
globalMasterTaskExecuteRunnableQueueLooper1 =
-                        globalMasterTaskExecuteRunnableQueueLooper;
                 final AsyncMasterTaskDelayQueueLooper 
asyncMasterTaskDelayQueueLooper1 =
                         asyncMasterTaskDelayQueueLooper) {
             // closed the resource
diff --git 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/dispatcher/TaskDispatchFactory.java
 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/dispatcher/TaskDispatchFactory.java
index 1979b48de5..52469fb54c 100644
--- 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/dispatcher/TaskDispatchFactory.java
+++ 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/dispatcher/TaskDispatchFactory.java
@@ -17,6 +17,7 @@
 
 package org.apache.dolphinscheduler.server.master.runner.dispatcher;
 
+import org.apache.dolphinscheduler.dao.entity.TaskInstance;
 import org.apache.dolphinscheduler.server.master.utils.TaskUtils;
 
 import lombok.extern.slf4j.Slf4j;
@@ -38,4 +39,8 @@ public class TaskDispatchFactory {
         return TaskUtils.isMasterTask(taskType) ? masterTaskDispatcher : 
workerTaskDispatcher;
     }
 
+    public TaskDispatcher getTaskDispatcher(TaskInstance taskInstance) {
+        return getTaskDispatcher(taskInstance.getTaskType());
+    }
+
 }
diff --git 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/execute/AsyncMasterTaskDelayQueueLooper.java
 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/execute/AsyncMasterTaskDelayQueueLooper.java
index 63b8636be6..86b711f245 100644
--- 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/execute/AsyncMasterTaskDelayQueueLooper.java
+++ 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/execute/AsyncMasterTaskDelayQueueLooper.java
@@ -18,12 +18,9 @@
 package org.apache.dolphinscheduler.server.master.runner.execute;
 
 import org.apache.dolphinscheduler.common.thread.BaseDaemonThread;
-import org.apache.dolphinscheduler.common.thread.ThreadUtils;
 import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
 import org.apache.dolphinscheduler.plugin.task.api.utils.LogUtils;
-import org.apache.dolphinscheduler.server.master.config.MasterConfig;
 
-import java.util.concurrent.ExecutorService;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 import lombok.extern.slf4j.Slf4j;
@@ -39,12 +36,9 @@ public class AsyncMasterTaskDelayQueueLooper extends 
BaseDaemonThread implements
     private AsyncMasterTaskDelayQueue asyncMasterTaskDelayQueue;
 
     @Autowired
-    private MasterConfig masterConfig;
-
+    private MasterAsyncTaskExecutorThreadPool 
masterAsyncTaskExecutorThreadPool;
     private static final AtomicBoolean RUNNING_FLAG = new AtomicBoolean(false);
 
-    private ExecutorService asyncTaskStateCheckThreadPool;
-
     public AsyncMasterTaskDelayQueueLooper() {
         super("AsyncMasterTaskDelayQueueLooper");
     }
@@ -63,8 +57,6 @@ public class AsyncMasterTaskDelayQueueLooper extends 
BaseDaemonThread implements
 
     @Override
     public void run() {
-        asyncTaskStateCheckThreadPool = 
ThreadUtils.newDaemonFixedThreadExecutor("AsyncTaskStateCheckThreadPool",
-                masterConfig.getMasterAsyncTaskStateCheckThreadPoolSize());
         while (RUNNING_FLAG.get()) {
             AsyncTaskExecutionContext asyncTaskExecutionContext;
             try {
@@ -86,7 +78,7 @@ public class AsyncMasterTaskDelayQueueLooper extends 
BaseDaemonThread implements
                             "Cannot find the taskInstance from 
TaskExecutionContextCacheManager, the task may already been killed, will stop 
the async master task");
                     continue;
                 }
-                asyncTaskStateCheckThreadPool.submit(() -> {
+                masterAsyncTaskExecutorThreadPool.getThreadPool().submit(() -> 
{
                     final AsyncTaskExecuteFunction asyncTaskExecuteFunction =
                             
asyncTaskExecutionContext.getAsyncTaskExecuteFunction();
                     final AsyncTaskCallbackFunction asyncTaskCallbackFunction =
@@ -131,8 +123,5 @@ public class AsyncMasterTaskDelayQueueLooper extends 
BaseDaemonThread implements
             log.warn("The AsyncMasterTaskDelayQueueLooper is not started, will 
not close");
             return;
         }
-        log.info("AsyncMasterTaskDelayQueueLooper closing...");
-        asyncTaskStateCheckThreadPool.shutdown();
-        log.info("AsyncMasterTaskDelayQueueLooper closed...");
     }
 }
diff --git 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/dispatcher/TaskDispatchFactory.java
 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/execute/IMasterTaskExecutorThreadPool.java
similarity index 55%
copy from 
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/dispatcher/TaskDispatchFactory.java
copy to 
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/execute/IMasterTaskExecutorThreadPool.java
index 1979b48de5..6c8e2caa73 100644
--- 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/dispatcher/TaskDispatchFactory.java
+++ 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/execute/IMasterTaskExecutorThreadPool.java
@@ -15,27 +15,11 @@
  * limitations under the License.
  */
 
-package org.apache.dolphinscheduler.server.master.runner.dispatcher;
+package org.apache.dolphinscheduler.server.master.runner.execute;
 
-import org.apache.dolphinscheduler.server.master.utils.TaskUtils;
+public interface IMasterTaskExecutorThreadPool<T extends MasterTaskExecutor> {
 
-import lombok.extern.slf4j.Slf4j;
-
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.stereotype.Component;
-
-@Slf4j
-@Component
-public class TaskDispatchFactory {
-
-    @Autowired
-    private MasterTaskDispatcher masterTaskDispatcher;
-
-    @Autowired
-    private WorkerTaskDispatcher workerTaskDispatcher;
-
-    public TaskDispatcher getTaskDispatcher(String taskType) {
-        return TaskUtils.isMasterTask(taskType) ? masterTaskDispatcher : 
workerTaskDispatcher;
-    }
+    boolean submitMasterTaskExecutor(T masterTaskExecutor);
 
+    boolean removeMasterTaskExecutor(T masterTaskExecutor);
 }
diff --git 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/execute/MasterAsyncTaskExecutorThreadPool.java
 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/execute/MasterAsyncTaskExecutorThreadPool.java
new file mode 100644
index 0000000000..868b66b6df
--- /dev/null
+++ 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/execute/MasterAsyncTaskExecutorThreadPool.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.server.master.runner.execute;
+
+import org.apache.dolphinscheduler.common.thread.ThreadUtils;
+import org.apache.dolphinscheduler.server.master.config.MasterConfig;
+
+import java.util.concurrent.ThreadPoolExecutor;
+
+import lombok.extern.slf4j.Slf4j;
+
+import org.springframework.stereotype.Component;
+
+@Slf4j
+@Component
+public class MasterAsyncTaskExecutorThreadPool implements 
IMasterTaskExecutorThreadPool<AsyncMasterTaskExecutor> {
+
+    private final ThreadPoolExecutor threadPoolExecutor;
+
+    public MasterAsyncTaskExecutorThreadPool(MasterConfig masterConfig) {
+        this.threadPoolExecutor = 
ThreadUtils.newDaemonFixedThreadExecutor("MasterAsyncTaskExecutorThreadPool",
+                masterConfig.getMasterSyncTaskExecutorThreadPoolSize());
+    }
+
+    @Override
+    public boolean submitMasterTaskExecutor(AsyncMasterTaskExecutor 
asyncMasterTaskExecutor) {
+        synchronized (MasterAsyncTaskExecutorThreadPool.class) {
+            // todo: check if the thread pool is overload
+            threadPoolExecutor.submit(asyncMasterTaskExecutor);
+            return true;
+        }
+    }
+
+    @Override
+    public boolean removeMasterTaskExecutor(AsyncMasterTaskExecutor 
asyncMasterTaskExecutor) {
+        return threadPoolExecutor.remove(asyncMasterTaskExecutor);
+    }
+
+    // todo: remove this method, it's not a good idea to expose the 
ThreadPoolExecutor to out side.
+    ThreadPoolExecutor getThreadPool() {
+        return threadPoolExecutor;
+    }
+}
diff --git 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/execute/MasterTaskExecutorThreadPool.java
 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/execute/MasterSyncTaskExecutorThreadPool.java
similarity index 53%
rename from 
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/execute/MasterTaskExecutorThreadPool.java
rename to 
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/execute/MasterSyncTaskExecutorThreadPool.java
index d61d058d22..3f683076e6 100644
--- 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/execute/MasterTaskExecutorThreadPool.java
+++ 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/execute/MasterSyncTaskExecutorThreadPool.java
@@ -20,32 +20,34 @@ package 
org.apache.dolphinscheduler.server.master.runner.execute;
 import org.apache.dolphinscheduler.common.thread.ThreadUtils;
 import org.apache.dolphinscheduler.server.master.config.MasterConfig;
 
+import java.util.concurrent.ThreadPoolExecutor;
+
 import lombok.extern.slf4j.Slf4j;
 
-import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Component;
 
-import com.google.common.util.concurrent.ListeningExecutorService;
-import com.google.common.util.concurrent.MoreExecutors;
-
 @Slf4j
 @Component
-public class MasterTaskExecutorThreadPool {
+public class MasterSyncTaskExecutorThreadPool implements 
IMasterTaskExecutorThreadPool<SyncMasterTaskExecutor> {
 
-    @Autowired
-    private MasterConfig masterConfig;
+    private final ThreadPoolExecutor threadPoolExecutor;
 
-    private ListeningExecutorService listeningExecutorService;
-
-    public synchronized void start() {
-        log.info("MasterTaskExecuteRunnableThreadPool starting...");
-        this.listeningExecutorService = 
MoreExecutors.listeningDecorator(ThreadUtils.newDaemonFixedThreadExecutor(
-                "MasterTaskExecuteRunnableThread", 
masterConfig.getMasterTaskExecuteThreadPoolSize()));
-        log.info("MasterTaskExecuteRunnableThreadPool started...");
+    public MasterSyncTaskExecutorThreadPool(MasterConfig masterConfig) {
+        this.threadPoolExecutor = 
ThreadUtils.newDaemonFixedThreadExecutor("MasterSyncTaskExecutorThreadPool",
+                masterConfig.getMasterSyncTaskExecutorThreadPoolSize());
     }
 
-    public void submitMasterTaskExecutor(MasterTaskExecutor 
masterTaskExecutor) {
-        listeningExecutorService.submit(masterTaskExecutor);
+    @Override
+    public boolean submitMasterTaskExecutor(SyncMasterTaskExecutor 
syncMasterTaskExecutor) {
+        synchronized (MasterSyncTaskExecutorThreadPool.class) {
+            // todo: check if the thread pool is overload
+            threadPoolExecutor.submit(syncMasterTaskExecutor);
+            return true;
+        }
     }
 
+    @Override
+    public boolean removeMasterTaskExecutor(SyncMasterTaskExecutor 
syncMasterTaskExecutor) {
+        return threadPoolExecutor.remove(syncMasterTaskExecutor);
+    }
 }
diff --git 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/execute/MasterTaskExecutorThreadPoolManager.java
 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/execute/MasterTaskExecutorThreadPoolManager.java
new file mode 100644
index 0000000000..1f4a916897
--- /dev/null
+++ 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/execute/MasterTaskExecutorThreadPoolManager.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.server.master.runner.execute;
+
+import lombok.extern.slf4j.Slf4j;
+
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Component;
+
+@Slf4j
+@Component
+public class MasterTaskExecutorThreadPoolManager {
+
+    @Autowired
+    private MasterSyncTaskExecutorThreadPool masterSyncTaskExecutorThreadPool;
+
+    @Autowired
+    private MasterAsyncTaskExecutorThreadPool 
masterAsyncTaskExecutorThreadPool;
+
+    public boolean submitMasterTaskExecutor(MasterTaskExecutor 
masterTaskExecutor) {
+        if (masterTaskExecutor instanceof SyncMasterTaskExecutor) {
+            return masterSyncTaskExecutorThreadPool
+                    .submitMasterTaskExecutor((SyncMasterTaskExecutor) 
masterTaskExecutor);
+        }
+        if (masterTaskExecutor instanceof AsyncMasterTaskExecutor) {
+            return masterAsyncTaskExecutorThreadPool
+                    .submitMasterTaskExecutor((AsyncMasterTaskExecutor) 
masterTaskExecutor);
+        }
+        throw new IllegalArgumentException("Unknown type of 
MasterTaskExecutor: " + masterTaskExecutor);
+    }
+
+    public boolean removeMasterTaskExecutor(MasterTaskExecutor 
masterTaskExecutor) {
+        if (masterTaskExecutor instanceof SyncMasterTaskExecutor) {
+            return masterSyncTaskExecutorThreadPool
+                    .removeMasterTaskExecutor((SyncMasterTaskExecutor) 
masterTaskExecutor);
+        }
+        if (masterTaskExecutor instanceof AsyncMasterTaskExecutor) {
+            return masterAsyncTaskExecutorThreadPool
+                    .removeMasterTaskExecutor((AsyncMasterTaskExecutor) 
masterTaskExecutor);
+        }
+        throw new IllegalArgumentException("Unknown type of 
MasterTaskExecutor: " + masterTaskExecutor);
+    }
+
+}
diff --git 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/operator/BaseTaskExecuteRunnableDispatchOperator.java
 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/operator/BaseTaskExecuteRunnableDispatchOperator.java
index 6f0419ae97..cb2c7a0e07 100644
--- 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/operator/BaseTaskExecuteRunnableDispatchOperator.java
+++ 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/operator/BaseTaskExecuteRunnableDispatchOperator.java
@@ -52,6 +52,6 @@ public abstract class BaseTaskExecuteRunnableDispatchOperator 
implements TaskExe
                     taskInstance.getName(),
                     taskInstance.getDelayTime(), remainTime);
         }
-        
globalTaskDispatchWaitingQueue.submitNeedToDispatchTaskExecuteRunnable(taskExecuteRunnable);
+        
globalTaskDispatchWaitingQueue.submitTaskExecuteRunnable(taskExecuteRunnable);
     }
 }
diff --git 
a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/AbstractCommandExecutor.java
 
b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/AbstractCommandExecutor.java
index 8aa4c36af2..4203516f42 100644
--- 
a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/AbstractCommandExecutor.java
+++ 
b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/AbstractCommandExecutor.java
@@ -105,13 +105,7 @@ public abstract class AbstractCommandExecutor {
                             TaskCallBack taskCallBack) throws Exception {
         TaskResponse result = new TaskResponse();
         int taskInstanceId = taskRequest.getTaskInstanceId();
-        if (null == 
TaskExecutionContextCacheManager.getByTaskInstanceId(taskInstanceId)) {
-            log.warn(
-                    "Cannot find the taskInstance: {} from 
TaskExecutionContextCacheManager, the task might already been killed",
-                    taskInstanceId);
-            result.setExitStatusCode(EXIT_CODE_KILL);
-            return result;
-        }
+        // todo: we need to use state like JDK Thread to make sure the killed 
task should not be executed
         iShellInterceptorBuilder = iShellInterceptorBuilder
                 .shellDirectory(taskRequest.getExecutePath())
                 .shellName(taskRequest.getTaskAppId());
@@ -155,13 +149,7 @@ public abstract class AbstractCommandExecutor {
 
         // cache processId
         taskRequest.setProcessId(processId);
-        boolean updateTaskExecutionContextStatus =
-                
TaskExecutionContextCacheManager.updateTaskExecutionContext(taskRequest);
-        if (Boolean.FALSE.equals(updateTaskExecutionContextStatus)) {
-            result.setExitStatusCode(EXIT_CODE_KILL);
-            cancelApplication();
-            return result;
-        }
+
         // print process id
         log.info("process start, process id is: {}", processId);
 
diff --git 
a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/TaskExecutionContextCacheManager.java
 
b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/TaskExecutionContextCacheManager.java
deleted file mode 100644
index dbe39d7042..0000000000
--- 
a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/TaskExecutionContextCacheManager.java
+++ /dev/null
@@ -1,72 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.dolphinscheduler.plugin.task.api;
-
-import java.util.Collection;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-
-public class TaskExecutionContextCacheManager {
-
-    private TaskExecutionContextCacheManager() {
-        throw new IllegalStateException("Utility class");
-    }
-
-    /**
-     * taskInstance cache
-     */
-    private static final Map<Integer, TaskExecutionContext> 
taskRequestContextCache = new ConcurrentHashMap<>();
-
-    /**
-     * get taskInstance by taskInstance id
-     *
-     * @param taskInstanceId taskInstanceId
-     * @return taskInstance
-     */
-
-    public static TaskExecutionContext getByTaskInstanceId(Integer 
taskInstanceId) {
-        return taskRequestContextCache.get(taskInstanceId);
-    }
-
-    /**
-     * cache taskInstance
-     *
-     * @param request request
-     */
-    public static void cacheTaskExecutionContext(TaskExecutionContext request) 
{
-        taskRequestContextCache.put(request.getTaskInstanceId(), request);
-    }
-
-    /**
-     * remove taskInstance by taskInstanceId
-     *
-     * @param taskInstanceId taskInstanceId
-     */
-    public static void removeByTaskInstanceId(Integer taskInstanceId) {
-        taskRequestContextCache.remove(taskInstanceId);
-    }
-
-    public static boolean updateTaskExecutionContext(TaskExecutionContext 
request) {
-        taskRequestContextCache.computeIfPresent(request.getTaskInstanceId(), 
(k, v) -> request);
-        return 
taskRequestContextCache.containsKey(request.getTaskInstanceId());
-    }
-
-    public static Collection<TaskExecutionContext> getAllTaskRequestList() {
-        return taskRequestContextCache.values();
-    }
-}
diff --git 
a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/k8s/impl/K8sTaskExecutor.java
 
b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/k8s/impl/K8sTaskExecutor.java
index f5f794859b..1ce6b12c22 100644
--- 
a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/k8s/impl/K8sTaskExecutor.java
+++ 
b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/k8s/impl/K8sTaskExecutor.java
@@ -21,7 +21,6 @@ import static java.util.Collections.singletonList;
 import static 
org.apache.dolphinscheduler.plugin.task.api.TaskConstants.API_VERSION;
 import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.CPU;
 import static 
org.apache.dolphinscheduler.plugin.task.api.TaskConstants.EXIT_CODE_FAILURE;
-import static 
org.apache.dolphinscheduler.plugin.task.api.TaskConstants.EXIT_CODE_KILL;
 import static 
org.apache.dolphinscheduler.plugin.task.api.TaskConstants.EXIT_CODE_SUCCESS;
 import static 
org.apache.dolphinscheduler.plugin.task.api.TaskConstants.JOB_TTL_SECONDS;
 import static 
org.apache.dolphinscheduler.plugin.task.api.TaskConstants.LAYER_LABEL;
@@ -39,7 +38,6 @@ import 
org.apache.dolphinscheduler.plugin.task.api.K8sTaskExecutionContext;
 import org.apache.dolphinscheduler.plugin.task.api.TaskConstants;
 import org.apache.dolphinscheduler.plugin.task.api.TaskException;
 import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
-import 
org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContextCacheManager;
 import org.apache.dolphinscheduler.plugin.task.api.enums.TaskTimeoutStrategy;
 import org.apache.dolphinscheduler.plugin.task.api.k8s.AbstractK8sTaskExecutor;
 import org.apache.dolphinscheduler.plugin.task.api.k8s.K8sTaskMainParameters;
@@ -284,12 +282,7 @@ public class K8sTaskExecutor extends 
AbstractK8sTaskExecutor {
         TaskResponse result = new TaskResponse();
         int taskInstanceId = taskRequest.getTaskInstanceId();
         try {
-            if (null == 
TaskExecutionContextCacheManager.getByTaskInstanceId(taskInstanceId)) {
-                result.setExitStatusCode(EXIT_CODE_KILL);
-                return result;
-            }
             if (StringUtils.isEmpty(k8sParameterStr)) {
-                
TaskExecutionContextCacheManager.removeByTaskInstanceId(taskInstanceId);
                 return result;
             }
             K8sTaskExecutionContext k8sTaskExecutionContext = 
taskRequest.getK8sTaskExecutionContext();
@@ -371,10 +364,7 @@ public class K8sTaskExecutor extends 
AbstractK8sTaskExecutor {
 
     public void setTaskStatus(int jobStatus, String taskInstanceId, 
TaskResponse taskResponse) {
         if (jobStatus == EXIT_CODE_SUCCESS || jobStatus == EXIT_CODE_FAILURE) {
-            if (null == 
TaskExecutionContextCacheManager.getByTaskInstanceId(Integer.valueOf(taskInstanceId)))
 {
-                log.info("[K8sJobExecutor-{}] killed", 
job.getMetadata().getName());
-                taskResponse.setExitStatusCode(EXIT_CODE_KILL);
-            } else if (jobStatus == EXIT_CODE_SUCCESS) {
+            if (jobStatus == EXIT_CODE_SUCCESS) {
                 log.info("[K8sJobExecutor-{}] succeed in k8s", 
job.getMetadata().getName());
                 taskResponse.setExitStatusCode(EXIT_CODE_SUCCESS);
             } else {
diff --git 
a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/test/java/org/apache/dolphinscheduler/plugin/task/api/k8s/K8sTaskExecutorTest.java
 
b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/test/java/org/apache/dolphinscheduler/plugin/task/api/k8s/K8sTaskExecutorTest.java
index 4bbd29d18e..1e7629acce 100644
--- 
a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/test/java/org/apache/dolphinscheduler/plugin/task/api/k8s/K8sTaskExecutorTest.java
+++ 
b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/test/java/org/apache/dolphinscheduler/plugin/task/api/k8s/K8sTaskExecutorTest.java
@@ -17,8 +17,6 @@
 
 package org.apache.dolphinscheduler.plugin.task.api.k8s;
 
-import static 
org.apache.dolphinscheduler.plugin.task.api.TaskConstants.EXIT_CODE_KILL;
-
 import org.apache.dolphinscheduler.plugin.task.api.TaskException;
 import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
 import org.apache.dolphinscheduler.plugin.task.api.k8s.impl.K8sTaskExecutor;
@@ -90,7 +88,7 @@ public class K8sTaskExecutorTest {
         TaskResponse taskResponse = new TaskResponse();
         k8sTaskExecutor.setJob(job);
         k8sTaskExecutor.setTaskStatus(jobStatus, 
String.valueOf(taskInstanceId), taskResponse);
-        Assertions.assertEquals(0, Integer.compare(EXIT_CODE_KILL, 
taskResponse.getExitStatusCode()));
+        Assertions.assertEquals(0, taskResponse.getExitStatusCode());
     }
     @Test
     public void testWaitTimeoutNormal() {
diff --git 
a/dolphinscheduler-task-plugin/dolphinscheduler-task-dvc/src/test/java/org/apache/dolphinscheduler/plugin/task/dvc/DvcTaskTest.java
 
b/dolphinscheduler-task-plugin/dolphinscheduler-task-dvc/src/test/java/org/apache/dolphinscheduler/plugin/task/dvc/DvcTaskTest.java
index 3f87271015..204b024e19 100644
--- 
a/dolphinscheduler-task-plugin/dolphinscheduler-task-dvc/src/test/java/org/apache/dolphinscheduler/plugin/task/dvc/DvcTaskTest.java
+++ 
b/dolphinscheduler-task-plugin/dolphinscheduler-task-dvc/src/test/java/org/apache/dolphinscheduler/plugin/task/dvc/DvcTaskTest.java
@@ -19,7 +19,6 @@ package org.apache.dolphinscheduler.plugin.task.dvc;
 
 import org.apache.dolphinscheduler.common.utils.JSONUtils;
 import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
-import 
org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContextCacheManager;
 
 import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.Test;
@@ -35,7 +34,6 @@ public class DvcTaskTest {
         TaskExecutionContext taskExecutionContext = 
Mockito.mock(TaskExecutionContext.class);
         
Mockito.when(taskExecutionContext.getTaskParams()).thenReturn(parameters);
 
-        
TaskExecutionContextCacheManager.cacheTaskExecutionContext(taskExecutionContext);
         return taskExecutionContext;
     }
 
diff --git 
a/dolphinscheduler-task-plugin/dolphinscheduler-task-kubeflow/src/test/java/org/apache/dolphinscheduler/plugin/kubeflow/KubeflowTaskTest.java
 
b/dolphinscheduler-task-plugin/dolphinscheduler-task-kubeflow/src/test/java/org/apache/dolphinscheduler/plugin/kubeflow/KubeflowTaskTest.java
index e6df60491d..0f70b67f75 100644
--- 
a/dolphinscheduler-task-plugin/dolphinscheduler-task-kubeflow/src/test/java/org/apache/dolphinscheduler/plugin/kubeflow/KubeflowTaskTest.java
+++ 
b/dolphinscheduler-task-plugin/dolphinscheduler-task-kubeflow/src/test/java/org/apache/dolphinscheduler/plugin/kubeflow/KubeflowTaskTest.java
@@ -21,7 +21,6 @@ import org.apache.dolphinscheduler.common.utils.JSONUtils;
 import org.apache.dolphinscheduler.plugin.task.api.TaskConstants;
 import org.apache.dolphinscheduler.plugin.task.api.TaskException;
 import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
-import 
org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContextCacheManager;
 
 import java.io.File;
 import java.io.IOException;
@@ -127,7 +126,6 @@ public class KubeflowTaskTest {
         }
         
Mockito.when(taskExecutionContext.getK8sTaskExecutionContext().getConfigYaml())
                 .thenReturn(kubeflowParameters.getClusterYAML());
-        
TaskExecutionContextCacheManager.cacheTaskExecutionContext(taskExecutionContext);
 
         return taskExecutionContext;
     }
diff --git 
a/dolphinscheduler-task-plugin/dolphinscheduler-task-mlflow/src/test/java/org/apache/dolphinler/plugin/task/mlflow/MlflowTaskTest.java
 
b/dolphinscheduler-task-plugin/dolphinscheduler-task-mlflow/src/test/java/org/apache/dolphinler/plugin/task/mlflow/MlflowTaskTest.java
index cf746799c6..4196e208e7 100644
--- 
a/dolphinscheduler-task-plugin/dolphinscheduler-task-mlflow/src/test/java/org/apache/dolphinler/plugin/task/mlflow/MlflowTaskTest.java
+++ 
b/dolphinscheduler-task-plugin/dolphinscheduler-task-mlflow/src/test/java/org/apache/dolphinler/plugin/task/mlflow/MlflowTaskTest.java
@@ -20,7 +20,6 @@ package org.apache.dolphinler.plugin.task.mlflow;
 import org.apache.dolphinscheduler.common.utils.JSONUtils;
 import org.apache.dolphinscheduler.common.utils.PropertyUtils;
 import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
-import 
org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContextCacheManager;
 import org.apache.dolphinscheduler.plugin.task.mlflow.MlflowConstants;
 import org.apache.dolphinscheduler.plugin.task.mlflow.MlflowParameters;
 import org.apache.dolphinscheduler.plugin.task.mlflow.MlflowTask;
@@ -58,7 +57,6 @@ public class MlflowTaskTest {
         String parameters = JSONUtils.toJsonString(mlflowParameters);
         TaskExecutionContext taskExecutionContext = 
Mockito.mock(TaskExecutionContext.class);
         
Mockito.when(taskExecutionContext.getTaskParams()).thenReturn(parameters);
-        
TaskExecutionContextCacheManager.cacheTaskExecutionContext(taskExecutionContext);
         return taskExecutionContext;
     }
 
diff --git 
a/dolphinscheduler-task-plugin/dolphinscheduler-task-pytorch/src/test/java/org/apache/dolphinscheduler/plugin/task/pytorch/PytorchTaskTest.java
 
b/dolphinscheduler-task-plugin/dolphinscheduler-task-pytorch/src/test/java/org/apache/dolphinscheduler/plugin/task/pytorch/PytorchTaskTest.java
index 25b975b885..c213021607 100644
--- 
a/dolphinscheduler-task-plugin/dolphinscheduler-task-pytorch/src/test/java/org/apache/dolphinscheduler/plugin/task/pytorch/PytorchTaskTest.java
+++ 
b/dolphinscheduler-task-plugin/dolphinscheduler-task-pytorch/src/test/java/org/apache/dolphinscheduler/plugin/task/pytorch/PytorchTaskTest.java
@@ -21,7 +21,6 @@ import static 
org.apache.dolphinscheduler.plugin.task.api.TaskConstants.RWXR_XR_
 
 import org.apache.dolphinscheduler.common.utils.JSONUtils;
 import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
-import 
org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContextCacheManager;
 
 import org.apache.commons.lang3.SystemUtils;
 
@@ -198,7 +197,6 @@ public class PytorchTaskTest {
         String parameters = JSONUtils.toJsonString(pytorchParameters);
         TaskExecutionContext taskExecutionContext = 
Mockito.mock(TaskExecutionContext.class);
         
Mockito.when(taskExecutionContext.getTaskParams()).thenReturn(parameters);
-        
TaskExecutionContextCacheManager.cacheTaskExecutionContext(taskExecutionContext);
         return taskExecutionContext;
     }
 
diff --git 
a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java
 
b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java
index e61409c9d2..e8ae5381fd 100644
--- 
a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java
+++ 
b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java
@@ -22,15 +22,14 @@ import 
org.apache.dolphinscheduler.common.constants.Constants;
 import org.apache.dolphinscheduler.common.lifecycle.ServerLifeCycleManager;
 import org.apache.dolphinscheduler.common.thread.ThreadUtils;
 import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
-import 
org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContextCacheManager;
 import org.apache.dolphinscheduler.plugin.task.api.TaskPluginManager;
 import org.apache.dolphinscheduler.plugin.task.api.utils.LogUtils;
 import org.apache.dolphinscheduler.plugin.task.api.utils.ProcessUtils;
 import org.apache.dolphinscheduler.server.worker.message.MessageRetryRunner;
 import org.apache.dolphinscheduler.server.worker.registry.WorkerRegistryClient;
 import org.apache.dolphinscheduler.server.worker.rpc.WorkerRpcServer;
-import 
org.apache.dolphinscheduler.server.worker.runner.GlobalTaskInstanceWaitingQueueLooper;
-import org.apache.dolphinscheduler.server.worker.runner.WorkerManagerThread;
+import org.apache.dolphinscheduler.server.worker.runner.WorkerTaskExecutor;
+import 
org.apache.dolphinscheduler.server.worker.runner.WorkerTaskExecutorHolder;
 
 import org.apache.commons.collections4.CollectionUtils;
 
@@ -52,9 +51,6 @@ import 
org.springframework.transaction.annotation.EnableTransactionManagement;
 @Slf4j
 public class WorkerServer implements IStoppable {
 
-    @Autowired
-    private WorkerManagerThread workerManagerThread;
-
     @Autowired
     private WorkerRegistryClient workerRegistryClient;
 
@@ -67,9 +63,6 @@ public class WorkerServer implements IStoppable {
     @Autowired
     private MessageRetryRunner messageRetryRunner;
 
-    @Autowired
-    private GlobalTaskInstanceWaitingQueueLooper 
globalTaskInstanceWaitingQueueLooper;
-
     /**
      * worker server startup, not use web service
      *
@@ -88,10 +81,7 @@ public class WorkerServer implements IStoppable {
         this.workerRegistryClient.setRegistryStoppable(this);
         this.workerRegistryClient.start();
 
-        this.workerManagerThread.start();
-
         this.messageRetryRunner.start();
-        this.globalTaskInstanceWaitingQueueLooper.start();
 
         /*
          * registry hooks, which are called before the process exits
@@ -114,7 +104,13 @@ public class WorkerServer implements IStoppable {
                 WorkerRpcServer closedWorkerRpcServer = workerRpcServer;
                 WorkerRegistryClient closedRegistryClient = 
workerRegistryClient) {
             log.info("Worker server is stopping, current cause : {}", cause);
-            // kill running tasks
+            // todo: we need to remove this method
+            // since for some task, we need to take-over the remote task after 
the worker restart
+            // and if the worker crash, the `killAllRunningTasks` will not be 
execute, this will cause there exist two
+            // kind of situation:
+            // 1. If the worker is stop by kill, the tasks will be kill.
+            // 2. If the worker is stop by kill -9, the tasks will not be kill.
+            // So we don't need to kill the tasks.
             this.killAllRunningTasks();
         } catch (Exception e) {
             log.error("Worker server stop failed, current cause: {}", cause, 
e);
@@ -129,25 +125,25 @@ public class WorkerServer implements IStoppable {
     }
 
     public void killAllRunningTasks() {
-        Collection<TaskExecutionContext> taskRequests = 
TaskExecutionContextCacheManager.getAllTaskRequestList();
-        if (CollectionUtils.isEmpty(taskRequests)) {
+        Collection<WorkerTaskExecutor> workerTaskExecutors = 
WorkerTaskExecutorHolder.getAllTaskExecutor();
+        if (CollectionUtils.isEmpty(workerTaskExecutors)) {
             return;
         }
-        log.info("Worker begin to kill all cache task, task size: {}", 
taskRequests.size());
+        log.info("Worker begin to kill all cache task, task size: {}", 
workerTaskExecutors.size());
         int killNumber = 0;
-        for (TaskExecutionContext taskRequest : taskRequests) {
+        for (WorkerTaskExecutor workerTaskExecutor : workerTaskExecutors) {
             // kill task when it's not finished yet
             try {
-                
LogUtils.setWorkflowAndTaskInstanceIDMDC(taskRequest.getProcessInstanceId(),
-                        taskRequest.getTaskInstanceId());
-                if (ProcessUtils.kill(taskRequest)) {
+                TaskExecutionContext taskExecutionContext = 
workerTaskExecutor.getTaskExecutionContext();
+                
LogUtils.setTaskInstanceIdMDC(taskExecutionContext.getTaskInstanceId());
+                if (ProcessUtils.kill(taskExecutionContext)) {
                     killNumber++;
                 }
             } finally {
                 LogUtils.removeWorkflowAndTaskInstanceIdMDC();
             }
         }
-        log.info("Worker after kill all cache task, task size: {}, killed 
number: {}", taskRequests.size(),
+        log.info("Worker after kill all cache task, task size: {}, killed 
number: {}", workerTaskExecutors.size(),
                 killNumber);
     }
 }
diff --git 
a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/message/MessageRetryRunner.java
 
b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/message/MessageRetryRunner.java
index 2b9ce2ec5f..28a4d8b214 100644
--- 
a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/message/MessageRetryRunner.java
+++ 
b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/message/MessageRetryRunner.java
@@ -23,6 +23,7 @@ import 
org.apache.dolphinscheduler.common.thread.BaseDaemonThread;
 import 
org.apache.dolphinscheduler.extract.master.transportor.ITaskInstanceExecutionEvent;
 import org.apache.dolphinscheduler.plugin.task.api.utils.LogUtils;
 
+import org.apache.commons.collections4.CollectionUtils;
 import org.apache.commons.collections4.MapUtils;
 
 import java.time.Duration;
@@ -92,13 +93,15 @@ public class MessageRetryRunner extends BaseDaemonThread {
         needToRetryMessages.remove(taskInstanceId);
     }
 
-    public void updateMessageHost(int taskInstanceId, String 
messageReceiverHost) {
+    public boolean updateMessageHost(int taskInstanceId, String 
messageReceiverHost) {
         List<TaskInstanceMessage> taskInstanceMessages = 
this.needToRetryMessages.get(taskInstanceId);
-        if (taskInstanceMessages != null) {
-            taskInstanceMessages.forEach(taskInstanceMessage -> {
-                
taskInstanceMessage.getEvent().setWorkflowInstanceHost(messageReceiverHost);
-            });
+        if (CollectionUtils.isEmpty(taskInstanceMessages)) {
+            return false;
         }
+        taskInstanceMessages.forEach(taskInstanceMessage -> {
+            
taskInstanceMessage.getEvent().setWorkflowInstanceHost(messageReceiverHost);
+        });
+        return true;
     }
 
     public void run() {
diff --git 
a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistryClient.java
 
b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistryClient.java
index ab60de9a77..b383af752d 100644
--- 
a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistryClient.java
+++ 
b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistryClient.java
@@ -30,7 +30,7 @@ import 
org.apache.dolphinscheduler.registry.api.RegistryClient;
 import org.apache.dolphinscheduler.registry.api.RegistryException;
 import org.apache.dolphinscheduler.registry.api.enums.RegistryNodeType;
 import org.apache.dolphinscheduler.server.worker.config.WorkerConfig;
-import org.apache.dolphinscheduler.server.worker.runner.WorkerManagerThread;
+import 
org.apache.dolphinscheduler.server.worker.runner.WorkerTaskExecutorThreadPool;
 import org.apache.dolphinscheduler.server.worker.task.WorkerHeartBeatTask;
 
 import org.apache.commons.collections4.CollectionUtils;
@@ -55,7 +55,7 @@ public class WorkerRegistryClient implements AutoCloseable {
     private WorkerConfig workerConfig;
 
     @Autowired
-    private WorkerManagerThread workerManagerThread;
+    private WorkerTaskExecutorThreadPool workerManagerThread;
 
     @Autowired
     private RegistryClient registryClient;
@@ -71,7 +71,7 @@ public class WorkerRegistryClient implements AutoCloseable {
         this.workerHeartBeatTask = new WorkerHeartBeatTask(
                 workerConfig,
                 registryClient,
-                () -> workerManagerThread.getWaitSubmitQueueSize());
+                () -> workerManagerThread.getWaitingTaskExecutorSize());
     }
 
     public void start() {
diff --git 
a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/registry/WorkerWaitingStrategy.java
 
b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/registry/WorkerWaitingStrategy.java
index dc97a2c9a4..b2c3d0b606 100644
--- 
a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/registry/WorkerWaitingStrategy.java
+++ 
b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/registry/WorkerWaitingStrategy.java
@@ -25,8 +25,8 @@ import org.apache.dolphinscheduler.registry.api.StrategyType;
 import org.apache.dolphinscheduler.server.worker.config.WorkerConfig;
 import org.apache.dolphinscheduler.server.worker.message.MessageRetryRunner;
 import org.apache.dolphinscheduler.server.worker.rpc.WorkerRpcServer;
-import 
org.apache.dolphinscheduler.server.worker.runner.GlobalTaskInstanceWaitingQueue;
-import org.apache.dolphinscheduler.server.worker.runner.WorkerManagerThread;
+import 
org.apache.dolphinscheduler.server.worker.runner.WorkerTaskExecutorHolder;
+import 
org.apache.dolphinscheduler.server.worker.runner.WorkerTaskExecutorThreadPool;
 
 import java.time.Duration;
 
@@ -54,10 +54,7 @@ public class WorkerWaitingStrategy implements 
WorkerConnectStrategy {
     private MessageRetryRunner messageRetryRunner;
 
     @Autowired
-    private WorkerManagerThread workerManagerThread;
-
-    @Autowired
-    private GlobalTaskInstanceWaitingQueue globalTaskInstanceWaitingQueue;
+    private WorkerTaskExecutorThreadPool workerManagerThread;
 
     @Override
     public void disconnect() {
@@ -121,7 +118,7 @@ public class WorkerWaitingStrategy implements 
WorkerConnectStrategy {
         workerRpcServer.close();
         log.warn("Worker server close the RPC server due to lost connection 
from registry");
         workerManagerThread.clearTask();
-        globalTaskInstanceWaitingQueue.clearTask();
+        WorkerTaskExecutorHolder.clear();
         log.warn("Worker server clear the tasks due to lost connection from 
registry");
         messageRetryRunner.clearMessage();
         log.warn("Worker server clear the retry message due to lost connection 
from registry");
diff --git 
a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/rpc/StreamingTaskInstanceOperatorImpl.java
 
b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/rpc/StreamingTaskInstanceOperatorImpl.java
index c57ee2efca..51b75aa457 100644
--- 
a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/rpc/StreamingTaskInstanceOperatorImpl.java
+++ 
b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/rpc/StreamingTaskInstanceOperatorImpl.java
@@ -21,12 +21,11 @@ import 
org.apache.dolphinscheduler.extract.worker.IStreamingTaskInstanceOperator
 import 
org.apache.dolphinscheduler.extract.worker.transportor.TaskInstanceTriggerSavepointRequest;
 import 
org.apache.dolphinscheduler.extract.worker.transportor.TaskInstanceTriggerSavepointResponse;
 import org.apache.dolphinscheduler.plugin.task.api.AbstractTask;
-import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
-import 
org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContextCacheManager;
 import org.apache.dolphinscheduler.plugin.task.api.stream.StreamTask;
 import org.apache.dolphinscheduler.plugin.task.api.utils.LogUtils;
-import org.apache.dolphinscheduler.server.worker.runner.WorkerManagerThread;
 import org.apache.dolphinscheduler.server.worker.runner.WorkerTaskExecutor;
+import 
org.apache.dolphinscheduler.server.worker.runner.WorkerTaskExecutorHolder;
+import 
org.apache.dolphinscheduler.server.worker.runner.WorkerTaskExecutorThreadPool;
 
 import lombok.extern.slf4j.Slf4j;
 
@@ -38,7 +37,7 @@ import org.springframework.stereotype.Component;
 public class StreamingTaskInstanceOperatorImpl implements 
IStreamingTaskInstanceOperator {
 
     @Autowired
-    private WorkerManagerThread workerManager;
+    private WorkerTaskExecutorThreadPool workerManager;
 
     @Override
     public TaskInstanceTriggerSavepointResponse 
triggerSavepoint(TaskInstanceTriggerSavepointRequest 
taskInstanceTriggerSavepointRequest) {
@@ -47,16 +46,10 @@ public class StreamingTaskInstanceOperatorImpl implements 
IStreamingTaskInstance
         try {
             int taskInstanceId = 
taskInstanceTriggerSavepointRequest.getTaskInstanceId();
             LogUtils.setTaskInstanceIdMDC(taskInstanceId);
-            TaskExecutionContext taskExecutionContext =
-                    
TaskExecutionContextCacheManager.getByTaskInstanceId(taskInstanceId);
-            if (taskExecutionContext == null) {
-                log.error("Cannot find TaskExecutionContext for taskInstance: 
{}", taskInstanceId);
-                return TaskInstanceTriggerSavepointResponse.fail("Cannot find 
TaskExecutionContext");
-            }
-            WorkerTaskExecutor workerTaskExecutor = 
workerManager.getTaskExecuteThread(taskInstanceId);
+            WorkerTaskExecutor workerTaskExecutor = 
WorkerTaskExecutorHolder.get(taskInstanceId);
             if (workerTaskExecutor == null) {
-                log.error("Cannot find WorkerTaskExecuteRunnable for 
taskInstance: {}", taskInstanceId);
-                return TaskInstanceTriggerSavepointResponse.fail("Cannot find 
WorkerTaskExecuteRunnable");
+                log.error("Cannot find WorkerTaskExecutor for taskInstance: 
{}", taskInstanceId);
+                return TaskInstanceTriggerSavepointResponse.fail("Cannot find 
TaskExecutionContext");
             }
             AbstractTask task = workerTaskExecutor.getTask();
             if (task == null) {
diff --git 
a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/rpc/WorkerLogServiceImpl.java
 
b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/rpc/WorkerLogServiceImpl.java
index 24b5477cce..a8a0c5a2b7 100644
--- 
a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/rpc/WorkerLogServiceImpl.java
+++ 
b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/rpc/WorkerLogServiceImpl.java
@@ -30,8 +30,8 @@ import 
org.apache.dolphinscheduler.extract.common.transportor.TaskInstanceLogFil
 import 
org.apache.dolphinscheduler.extract.common.transportor.TaskInstanceLogFileDownloadResponse;
 import 
org.apache.dolphinscheduler.extract.common.transportor.TaskInstanceLogPageQueryRequest;
 import 
org.apache.dolphinscheduler.extract.common.transportor.TaskInstanceLogPageQueryResponse;
-import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
-import 
org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContextCacheManager;
+import org.apache.dolphinscheduler.server.worker.runner.WorkerTaskExecutor;
+import 
org.apache.dolphinscheduler.server.worker.runner.WorkerTaskExecutorHolder;
 
 import java.util.List;
 
@@ -64,9 +64,13 @@ public class WorkerLogServiceImpl implements ILogService {
 
     @Override
     public GetAppIdResponse getAppId(GetAppIdRequest getAppIdRequest) {
-        TaskExecutionContext taskExecutionContext =
-                
TaskExecutionContextCacheManager.getByTaskInstanceId(getAppIdRequest.getTaskInstanceId());
-        String appInfoPath = taskExecutionContext.getAppInfoPath();
+        String appInfoPath = null;
+        WorkerTaskExecutor workerTaskExecutor = 
WorkerTaskExecutorHolder.get(getAppIdRequest.getTaskInstanceId());
+        if (workerTaskExecutor != null) {
+            // todo: remove this kind of logic, and remove get appId method, 
the appId should be send by worker rather
+            // than query by master
+            appInfoPath = 
workerTaskExecutor.getTaskExecutionContext().getAppInfoPath();
+        }
         String logPath = getAppIdRequest.getLogPath();
         List<String> appIds = 
org.apache.dolphinscheduler.plugin.task.api.utils.LogUtils.getAppIds(logPath, 
appInfoPath,
                 PropertyUtils.getString(APPID_COLLECT, DEFAULT_COLLECT_WAY));
diff --git 
a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/GlobalTaskInstanceWaitingQueue.java
 
b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/GlobalTaskInstanceWaitingQueue.java
deleted file mode 100644
index e1ac97b2d3..0000000000
--- 
a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/GlobalTaskInstanceWaitingQueue.java
+++ /dev/null
@@ -1,70 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.dolphinscheduler.server.worker.runner;
-
-import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
-import 
org.apache.dolphinscheduler.server.worker.config.TaskExecuteThreadsFullPolicy;
-import org.apache.dolphinscheduler.server.worker.config.WorkerConfig;
-import org.apache.dolphinscheduler.server.worker.metrics.WorkerServerMetrics;
-
-import java.util.concurrent.ArrayBlockingQueue;
-import java.util.concurrent.BlockingQueue;
-
-import lombok.extern.slf4j.Slf4j;
-
-import org.springframework.stereotype.Component;
-
-@Slf4j
-@Component
-public class GlobalTaskInstanceWaitingQueue {
-
-    private final WorkerConfig workerConfig;
-
-    private final BlockingQueue<TaskExecutionContext> blockingQueue;
-
-    public GlobalTaskInstanceWaitingQueue(WorkerConfig workerConfig) {
-        this.workerConfig = workerConfig;
-        this.blockingQueue = new 
ArrayBlockingQueue<>(workerConfig.getExecThreads());
-    }
-
-    public boolean addDispatchTask(TaskExecutionContext taskExecutionContext) {
-        if (workerConfig.getTaskExecuteThreadsFullPolicy() == 
TaskExecuteThreadsFullPolicy.CONTINUE) {
-            return blockingQueue.offer(taskExecutionContext);
-        }
-
-        if (blockingQueue.size() > getQueueSize()) {
-            log.warn("Wait submit queue is full, will retry submit task 
later");
-            WorkerServerMetrics.incWorkerSubmitQueueIsFullCount();
-            return false;
-        }
-        return blockingQueue.offer(taskExecutionContext);
-    }
-
-    public TaskExecutionContext take() throws InterruptedException {
-        return blockingQueue.take();
-    }
-
-    public void clearTask() {
-        blockingQueue.clear();
-    }
-
-    public int getQueueSize() {
-        return workerConfig.getExecThreads();
-    }
-
-}
diff --git 
a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/GlobalTaskInstanceWaitingQueueLooper.java
 
b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/GlobalTaskInstanceWaitingQueueLooper.java
deleted file mode 100644
index 6e501a1be6..0000000000
--- 
a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/GlobalTaskInstanceWaitingQueueLooper.java
+++ /dev/null
@@ -1,101 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.dolphinscheduler.server.worker.runner;
-
-import org.apache.dolphinscheduler.common.thread.BaseDaemonThread;
-import org.apache.dolphinscheduler.plugin.storage.api.StorageOperate;
-import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
-import org.apache.dolphinscheduler.plugin.task.api.TaskPluginManager;
-import org.apache.dolphinscheduler.plugin.task.api.utils.LogUtils;
-import org.apache.dolphinscheduler.server.worker.config.WorkerConfig;
-import org.apache.dolphinscheduler.server.worker.registry.WorkerRegistryClient;
-import org.apache.dolphinscheduler.server.worker.rpc.WorkerMessageSender;
-
-import lombok.extern.slf4j.Slf4j;
-
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.stereotype.Component;
-
-@Slf4j
-@Component
-public class GlobalTaskInstanceWaitingQueueLooper extends BaseDaemonThread {
-
-    @Autowired
-    private GlobalTaskInstanceWaitingQueue globalTaskInstanceWaitingQueue;
-
-    @Autowired
-    private WorkerConfig workerConfig;
-
-    @Autowired
-    private WorkerMessageSender workerMessageSender;
-
-    @Autowired
-    private TaskPluginManager taskPluginManager;
-
-    @Autowired
-    private WorkerManagerThread workerManager;
-
-    @Autowired(required = false)
-    private StorageOperate storageOperate;
-
-    @Autowired
-    private WorkerRegistryClient workerRegistryClient;
-
-    protected GlobalTaskInstanceWaitingQueueLooper() {
-        super("GlobalTaskDispatchQueueLooper");
-    }
-
-    public synchronized void start() {
-        log.info("GlobalTaskDispatchQueueLooper starting");
-        super.start();
-        log.info("GlobalTaskDispatchQueueLooper started");
-    }
-
-    public void run() {
-        while (true) {
-            try {
-                TaskExecutionContext taskExecutionContext = 
globalTaskInstanceWaitingQueue.take();
-                
LogUtils.setTaskInstanceLogFullPathMDC(taskExecutionContext.getLogPath());
-                
LogUtils.setTaskInstanceIdMDC(taskExecutionContext.getTaskInstanceId());
-
-                WorkerTaskExecutor workerTaskExecutor = 
WorkerTaskExecutorFactoryBuilder
-                        .createWorkerTaskExecutorFactory(
-                                taskExecutionContext,
-                                workerConfig,
-                                workerMessageSender,
-                                taskPluginManager,
-                                storageOperate,
-                                workerRegistryClient)
-                        .createWorkerTaskExecutor();
-                if (workerManager.offer(workerTaskExecutor)) {
-                    log.info("Success submit WorkerDelayTaskExecuteRunnable to 
WorkerManagerThread's waiting queue");
-                }
-            } catch (InterruptedException e) {
-                log.error("GlobalTaskDispatchQueueLooper interrupted");
-                Thread.currentThread().interrupt();
-                break;
-            } catch (Exception ex) {
-                log.error("GlobalTaskDispatchQueueLooper error", ex);
-            } finally {
-                LogUtils.removeTaskInstanceIdMDC();
-                LogUtils.removeTaskInstanceLogFullPathMDC();
-            }
-        }
-    }
-
-}
diff --git 
a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskCallbackImpl.java
 
b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskCallbackImpl.java
index be8a62a6c8..aeb7d658d1 100644
--- 
a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskCallbackImpl.java
+++ 
b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskCallbackImpl.java
@@ -20,7 +20,6 @@ package org.apache.dolphinscheduler.server.worker.runner;
 import 
org.apache.dolphinscheduler.extract.master.transportor.ITaskInstanceExecutionEvent;
 import org.apache.dolphinscheduler.plugin.task.api.TaskCallBack;
 import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
-import 
org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContextCacheManager;
 import org.apache.dolphinscheduler.plugin.task.api.model.ApplicationInfo;
 import org.apache.dolphinscheduler.server.worker.rpc.WorkerMessageSender;
 
@@ -42,15 +41,7 @@ public class TaskCallbackImpl implements TaskCallBack {
 
     @Override
     public void updateRemoteApplicationInfo(int taskInstanceId, 
ApplicationInfo applicationInfo) {
-        TaskExecutionContext taskExecutionContext =
-                
TaskExecutionContextCacheManager.getByTaskInstanceId(taskInstanceId);
-        if (taskExecutionContext == null) {
-            log.error("task execution context is empty, taskInstanceId: {}, 
applicationInfo:{}", taskInstanceId,
-                    applicationInfo);
-            return;
-        }
-
-        log.info("send remote application info {}", applicationInfo);
+        // todo: use listener
         taskExecutionContext.setAppIds(applicationInfo.getAppIds());
         workerMessageSender.sendMessageWithRetry(taskExecutionContext,
                 
ITaskInstanceExecutionEvent.TaskInstanceExecutionEventType.RUNNING_INFO);
@@ -58,13 +49,6 @@ public class TaskCallbackImpl implements TaskCallBack {
 
     @Override
     public void updateTaskInstanceInfo(int taskInstanceId) {
-        TaskExecutionContext taskExecutionContext =
-                
TaskExecutionContextCacheManager.getByTaskInstanceId(taskInstanceId);
-        if (taskExecutionContext == null) {
-            log.error("task execution context is empty, taskInstanceId: {}", 
taskInstanceId);
-            return;
-        }
-
         workerMessageSender.sendMessageWithRetry(taskExecutionContext,
                 
ITaskInstanceExecutionEvent.TaskInstanceExecutionEventType.RUNNING_INFO);
     }
diff --git 
a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerExecService.java
 
b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerExecService.java
deleted file mode 100644
index 2a6b7feec2..0000000000
--- 
a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerExecService.java
+++ /dev/null
@@ -1,91 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.dolphinscheduler.server.worker.runner;
-
-import org.apache.dolphinscheduler.server.worker.metrics.WorkerServerMetrics;
-
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.ThreadPoolExecutor;
-
-import lombok.extern.slf4j.Slf4j;
-
-import com.google.common.util.concurrent.FutureCallback;
-import com.google.common.util.concurrent.Futures;
-import com.google.common.util.concurrent.ListenableFuture;
-import com.google.common.util.concurrent.ListeningExecutorService;
-import com.google.common.util.concurrent.MoreExecutors;
-
-@Slf4j
-public class WorkerExecService {
-
-    private final ListeningExecutorService listeningExecutorService;
-
-    private final ExecutorService execService;
-
-    private final ConcurrentHashMap<Integer, WorkerTaskExecutor> 
taskExecuteThreadMap;
-
-    public WorkerExecService(ExecutorService execService,
-                             ConcurrentHashMap<Integer, WorkerTaskExecutor> 
taskExecuteThreadMap) {
-        this.execService = execService;
-        this.listeningExecutorService = 
MoreExecutors.listeningDecorator(this.execService);
-        this.taskExecuteThreadMap = taskExecuteThreadMap;
-        
WorkerServerMetrics.registerWorkerTaskTotalGauge(taskExecuteThreadMap::size);
-    }
-
-    public void submit(final WorkerTaskExecutor taskExecuteThread) {
-        
taskExecuteThreadMap.put(taskExecuteThread.getTaskExecutionContext().getTaskInstanceId(),
 taskExecuteThread);
-        ListenableFuture future = 
this.listeningExecutorService.submit(taskExecuteThread);
-        FutureCallback futureCallback = new FutureCallback() {
-
-            @Override
-            public void onSuccess(Object o) {
-                
taskExecuteThreadMap.remove(taskExecuteThread.getTaskExecutionContext().getTaskInstanceId());
-            }
-
-            @Override
-            public void onFailure(Throwable throwable) {
-                log.error("task execute failed, processInstanceId:{}, 
taskInstanceId:{}",
-                        
taskExecuteThread.getTaskExecutionContext().getProcessInstanceId(),
-                        
taskExecuteThread.getTaskExecutionContext().getTaskInstanceId(),
-                        throwable);
-                
taskExecuteThreadMap.remove(taskExecuteThread.getTaskExecutionContext().getTaskInstanceId());
-            }
-        };
-        Futures.addCallback(future, futureCallback, 
this.listeningExecutorService);
-    }
-
-    /**
-     * get thread pool queue size
-     *
-     * @return queue size
-     */
-    public int getThreadPoolQueueSize() {
-        return ((ThreadPoolExecutor) this.execService).getQueue().size();
-    }
-
-    public int getActiveExecThreadCount() {
-        return ((ThreadPoolExecutor) this.execService).getActiveCount();
-    }
-
-    public Map<Integer, WorkerTaskExecutor> getTaskExecuteThreadMap() {
-        return taskExecuteThreadMap;
-    }
-
-}
diff --git 
a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerManagerThread.java
 
b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerManagerThread.java
deleted file mode 100644
index 4b666cfb20..0000000000
--- 
a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerManagerThread.java
+++ /dev/null
@@ -1,150 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.dolphinscheduler.server.worker.runner;
-
-import org.apache.dolphinscheduler.common.constants.Constants;
-import org.apache.dolphinscheduler.common.lifecycle.ServerLifeCycleManager;
-import org.apache.dolphinscheduler.common.thread.ThreadUtils;
-import org.apache.dolphinscheduler.common.utils.OSUtils;
-import 
org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContextCacheManager;
-import org.apache.dolphinscheduler.server.worker.config.WorkerConfig;
-import org.apache.dolphinscheduler.server.worker.metrics.WorkerServerMetrics;
-
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.LinkedBlockingQueue;
-
-import javax.annotation.Nullable;
-
-import lombok.extern.slf4j.Slf4j;
-
-import org.springframework.stereotype.Component;
-
-/**
- * Manage tasks
- */
-@Component
-@Slf4j
-public class WorkerManagerThread implements Runnable {
-
-    private final BlockingQueue<WorkerTaskExecutor> waitSubmitQueue;
-    private final WorkerExecService workerExecService;
-
-    private final int workerExecThreads;
-
-    private final ConcurrentHashMap<Integer, WorkerTaskExecutor> 
taskExecuteThreadMap = new ConcurrentHashMap<>();
-
-    public WorkerManagerThread(WorkerConfig workerConfig) {
-        workerExecThreads = workerConfig.getExecThreads();
-        this.waitSubmitQueue = new LinkedBlockingQueue<>();
-        workerExecService = new WorkerExecService(
-                
ThreadUtils.newDaemonFixedThreadExecutor("Worker-Execute-Thread", 
workerConfig.getExecThreads()),
-                taskExecuteThreadMap);
-    }
-
-    public @Nullable WorkerTaskExecutor getTaskExecuteThread(Integer 
taskInstanceId) {
-        return taskExecuteThreadMap.get(taskInstanceId);
-    }
-
-    /**
-     * get wait submit queue size
-     *
-     * @return queue size
-     */
-    public int getWaitSubmitQueueSize() {
-        return waitSubmitQueue.size();
-    }
-
-    /**
-     * get thread pool queue size
-     *
-     * @return queue size
-     */
-    public int getThreadPoolQueueSize() {
-        return workerExecService.getThreadPoolQueueSize();
-    }
-
-    /**
-     * Kill tasks that have not been executed, like delay task
-     * then send Response to Master, update the execution status of task 
instance
-     */
-    public void killTaskBeforeExecuteByInstanceId(Integer taskInstanceId) {
-        waitSubmitQueue.stream()
-                .filter(taskExecuteThread -> 
taskExecuteThread.getTaskExecutionContext()
-                        .getTaskInstanceId() == taskInstanceId)
-                .forEach(waitSubmitQueue::remove);
-    }
-
-    public boolean offer(WorkerTaskExecutor workerDelayTaskExecuteRunnable) {
-        return waitSubmitQueue.add(workerDelayTaskExecuteRunnable);
-    }
-
-    public void start() {
-        log.info("Worker manager thread starting");
-        Thread thread = new Thread(this, this.getClass().getName());
-        thread.setDaemon(true);
-        thread.start();
-        log.info("Worker manager thread started");
-    }
-
-    @Override
-    public void run() {
-        
WorkerServerMetrics.registerWorkerCpuUsageGauge(OSUtils::cpuUsagePercentage);
-        
WorkerServerMetrics.registerWorkerMemoryAvailableGauge(OSUtils::availablePhysicalMemorySize);
-        
WorkerServerMetrics.registerWorkerMemoryUsageGauge(OSUtils::memoryUsagePercentage);
-        
WorkerServerMetrics.registerWorkerExecuteQueueSizeGauge(workerExecService::getThreadPoolQueueSize);
-        
WorkerServerMetrics.registerWorkerActiveExecuteThreadGauge(workerExecService::getActiveExecThreadCount);
-
-        Thread.currentThread().setName("Worker-Execute-Manager-Thread");
-        while (!ServerLifeCycleManager.isStopped()) {
-            try {
-                if (!ServerLifeCycleManager.isRunning()) {
-                    Thread.sleep(Constants.SLEEP_TIME_MILLIS);
-                }
-                if (this.getThreadPoolQueueSize() <= workerExecThreads) {
-                    WorkerTaskExecutor workerTaskExecutor = 
waitSubmitQueue.take();
-                    workerExecService.submit(workerTaskExecutor);
-                } else {
-                    WorkerServerMetrics.incWorkerOverloadCount();
-                    log.info("Exec queue is full, waiting submit queue {}, 
waiting exec queue size {}",
-                            this.getWaitSubmitQueueSize(), 
this.getThreadPoolQueueSize());
-                    ThreadUtils.sleep(Constants.SLEEP_TIME_MILLIS);
-                }
-            } catch (Exception e) {
-                log.error("An unexpected interrupt is happened, "
-                        + "the exception will be ignored and this thread will 
continue to run", e);
-            }
-        }
-    }
-
-    public void clearTask() {
-        waitSubmitQueue.clear();
-        
workerExecService.getTaskExecuteThreadMap().values().forEach(workerTaskExecuteRunnable
 -> {
-            int taskInstanceId = 
workerTaskExecuteRunnable.getTaskExecutionContext().getTaskInstanceId();
-            try {
-                workerTaskExecuteRunnable.cancelTask();
-                log.info("Cancel the taskInstance in worker  {}", 
taskInstanceId);
-            } catch (Exception ex) {
-                log.error("Cancel the taskInstance error {}", taskInstanceId, 
ex);
-            } finally {
-                
TaskExecutionContextCacheManager.removeByTaskInstanceId(taskInstanceId);
-            }
-        });
-        workerExecService.getTaskExecuteThreadMap().clear();
-    }
-}
diff --git 
a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerTaskExecutor.java
 
b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerTaskExecutor.java
index e47a28d264..f713605a48 100644
--- 
a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerTaskExecutor.java
+++ 
b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerTaskExecutor.java
@@ -39,7 +39,6 @@ import 
org.apache.dolphinscheduler.plugin.task.api.TaskCallBack;
 import org.apache.dolphinscheduler.plugin.task.api.TaskChannel;
 import org.apache.dolphinscheduler.plugin.task.api.TaskException;
 import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
-import 
org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContextCacheManager;
 import org.apache.dolphinscheduler.plugin.task.api.TaskPluginException;
 import org.apache.dolphinscheduler.plugin.task.api.TaskPluginManager;
 import org.apache.dolphinscheduler.plugin.task.api.enums.Direct;
@@ -108,7 +107,7 @@ public abstract class WorkerTaskExecutor implements 
Runnable {
 
         sendTaskResult();
 
-        
TaskExecutionContextCacheManager.removeByTaskInstanceId(taskExecutionContext.getTaskInstanceId());
+        
WorkerTaskExecutorHolder.remove(taskExecutionContext.getTaskInstanceId());
         log.info("Remove the current task execute context from worker cache");
         clearTaskExecPathIfNeeded();
 
@@ -118,7 +117,7 @@ public abstract class WorkerTaskExecutor implements 
Runnable {
         if (cancelTask()) {
             log.info("Cancel the task successfully");
         }
-        
TaskExecutionContextCacheManager.removeByTaskInstanceId(taskExecutionContext.getTaskInstanceId());
+        
WorkerTaskExecutorHolder.remove(taskExecutionContext.getTaskInstanceId());
         
taskExecutionContext.setCurrentExecutionStatus(TaskExecutionStatus.FAILURE);
         taskExecutionContext.setEndTime(System.currentTimeMillis());
         workerMessageSender.sendMessageWithRetry(taskExecutionContext,
@@ -128,7 +127,7 @@ public abstract class WorkerTaskExecutor implements 
Runnable {
 
     }
 
-    public boolean cancelTask() {
+    protected boolean cancelTask() {
         // cancel the task
         if (task == null) {
             return true;
@@ -157,7 +156,7 @@ public abstract class WorkerTaskExecutor implements 
Runnable {
             if (DRY_RUN_FLAG_YES == taskExecutionContext.getDryRun()) {
                 
taskExecutionContext.setCurrentExecutionStatus(TaskExecutionStatus.SUCCESS);
                 taskExecutionContext.setEndTime(System.currentTimeMillis());
-                
TaskExecutionContextCacheManager.removeByTaskInstanceId(taskExecutionContext.getTaskInstanceId());
+                
WorkerTaskExecutorHolder.remove(taskExecutionContext.getTaskInstanceId());
                 workerMessageSender.sendMessageWithRetry(taskExecutionContext,
                         
ITaskInstanceExecutionEvent.TaskInstanceExecutionEventType.FINISH);
                 log.info(
diff --git 
a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerTaskExecutorFactoryBuilder.java
 
b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerTaskExecutorFactoryBuilder.java
index c2efdc9c7a..a9c2948482 100644
--- 
a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerTaskExecutorFactoryBuilder.java
+++ 
b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerTaskExecutorFactoryBuilder.java
@@ -24,20 +24,31 @@ import 
org.apache.dolphinscheduler.server.worker.config.WorkerConfig;
 import org.apache.dolphinscheduler.server.worker.registry.WorkerRegistryClient;
 import org.apache.dolphinscheduler.server.worker.rpc.WorkerMessageSender;
 
-import javax.annotation.Nullable;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Component;
 
-import lombok.NonNull;
-import lombok.experimental.UtilityClass;
-
-@UtilityClass
+@Component
 public class WorkerTaskExecutorFactoryBuilder {
 
-    public static WorkerTaskExecutorFactory<? extends WorkerTaskExecutor> 
createWorkerTaskExecutorFactory(@NonNull TaskExecutionContext 
taskExecutionContext,
-                                                                               
                           @NonNull WorkerConfig workerConfig,
-                                                                               
                           @NonNull WorkerMessageSender workerMessageSender,
-                                                                               
                           @NonNull TaskPluginManager taskPluginManager,
-                                                                               
                           @Nullable StorageOperate storageOperate,
-                                                                               
                           @NonNull WorkerRegistryClient workerRegistryClient) {
+    @Autowired
+    private WorkerConfig workerConfig;
+
+    @Autowired
+    private WorkerMessageSender workerMessageSender;
+
+    @Autowired
+    private TaskPluginManager taskPluginManager;
+
+    @Autowired
+    private WorkerTaskExecutorThreadPool workerManager;
+
+    @Autowired(required = false)
+    private StorageOperate storageOperate;
+
+    @Autowired
+    private WorkerRegistryClient workerRegistryClient;
+
+    public WorkerTaskExecutorFactory<? extends WorkerTaskExecutor> 
createWorkerTaskExecutorFactory(TaskExecutionContext taskExecutionContext) {
         return new DefaultWorkerTaskExecutorFactory(taskExecutionContext,
                 workerConfig,
                 workerMessageSender,
diff --git 
a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerTaskExecutorHolder.java
 
b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerTaskExecutorHolder.java
new file mode 100644
index 0000000000..fa07dbfde6
--- /dev/null
+++ 
b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerTaskExecutorHolder.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.server.worker.runner;
+
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Used to store all running and waiting {@link WorkerTaskExecutor}. If the 
task has been finished, it will be removed from the map.
+ */
+public class WorkerTaskExecutorHolder {
+
+    private static final Map<Integer, WorkerTaskExecutor> 
workerTaskExecutorMap = new HashMap<>();
+
+    public static void put(WorkerTaskExecutor workerTaskExecutor) {
+        int taskInstanceId = 
workerTaskExecutor.getTaskExecutionContext().getTaskInstanceId();
+        if (workerTaskExecutorMap.containsKey(taskInstanceId)) {
+            throw new IllegalArgumentException("TaskInstance: " + 
taskInstanceId + " already exists");
+        }
+        workerTaskExecutorMap.put(taskInstanceId, workerTaskExecutor);
+    }
+
+    public static WorkerTaskExecutor get(int taskInstanceId) {
+        return workerTaskExecutorMap.get(taskInstanceId);
+    }
+
+    public static WorkerTaskExecutor remove(int taskInstanceId) {
+        return workerTaskExecutorMap.remove(taskInstanceId);
+    }
+
+    public static void clear() {
+        workerTaskExecutorMap.clear();
+    }
+
+    public static Collection<WorkerTaskExecutor> getAllTaskExecutor() {
+        return workerTaskExecutorMap.values();
+    }
+}
diff --git 
a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerTaskExecutorThreadPool.java
 
b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerTaskExecutorThreadPool.java
new file mode 100644
index 0000000000..28588c0dbf
--- /dev/null
+++ 
b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerTaskExecutorThreadPool.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.server.worker.runner;
+
+import org.apache.dolphinscheduler.common.thread.ThreadUtils;
+import org.apache.dolphinscheduler.common.utils.OSUtils;
+import 
org.apache.dolphinscheduler.server.worker.config.TaskExecuteThreadsFullPolicy;
+import org.apache.dolphinscheduler.server.worker.config.WorkerConfig;
+import org.apache.dolphinscheduler.server.worker.metrics.WorkerServerMetrics;
+
+import java.util.concurrent.ThreadPoolExecutor;
+
+import lombok.extern.slf4j.Slf4j;
+
+import org.springframework.stereotype.Component;
+
+@Component
+@Slf4j
+public class WorkerTaskExecutorThreadPool {
+
+    private final ThreadPoolExecutor threadPoolExecutor;
+
+    private final WorkerConfig workerConfig;
+
+    public WorkerTaskExecutorThreadPool(WorkerConfig workerConfig) {
+        this.threadPoolExecutor =
+                
ThreadUtils.newDaemonFixedThreadExecutor("WorkerTaskExecutorThreadPool", 
workerConfig.getExecThreads());
+        this.workerConfig = workerConfig;
+
+        
WorkerServerMetrics.registerWorkerCpuUsageGauge(OSUtils::cpuUsagePercentage);
+        
WorkerServerMetrics.registerWorkerMemoryAvailableGauge(OSUtils::availablePhysicalMemorySize);
+        
WorkerServerMetrics.registerWorkerMemoryUsageGauge(OSUtils::memoryUsagePercentage);
+        WorkerServerMetrics.registerWorkerExecuteQueueSizeGauge(
+                () -> threadPoolExecutor.getQueue().size() - 
threadPoolExecutor.getActiveCount());
+        
WorkerServerMetrics.registerWorkerActiveExecuteThreadGauge(threadPoolExecutor::getActiveCount);
+    }
+
+    public boolean submitWorkerTaskExecutor(WorkerTaskExecutor 
workerTaskExecutor) {
+        synchronized (WorkerTaskExecutorThreadPool.class) {
+            if 
(TaskExecuteThreadsFullPolicy.CONTINUE.equals(workerConfig.getTaskExecuteThreadsFullPolicy()))
 {
+                WorkerTaskExecutorHolder.put(workerTaskExecutor);
+                threadPoolExecutor.submit(workerTaskExecutor);
+                return true;
+            }
+            if (isOverload()) {
+                log.warn("WorkerTaskExecutorThreadPool is overload, cannot 
submit new WorkerTaskExecutor");
+                WorkerServerMetrics.incWorkerSubmitQueueIsFullCount();
+                return false;
+            }
+            WorkerTaskExecutorHolder.put(workerTaskExecutor);
+            threadPoolExecutor.submit(workerTaskExecutor);
+            return true;
+        }
+    }
+
+    public boolean isOverload() {
+        return threadPoolExecutor.getQueue().size() > 0;
+    }
+
+    public int getWaitingTaskExecutorSize() {
+        return threadPoolExecutor.getQueue().size();
+    }
+
+    public int getRunningTaskExecutorSize() {
+        return threadPoolExecutor.getActiveCount();
+    }
+
+    /**
+     * Kill tasks that have not been executed, e.g. waiting in the queue
+     */
+    public void killTaskBeforeExecuteByInstanceId(Integer taskInstanceId) {
+        synchronized (WorkerTaskExecutorThreadPool.class) {
+            WorkerTaskExecutor workerTaskExecutor = 
WorkerTaskExecutorHolder.get(taskInstanceId);
+            threadPoolExecutor.remove(workerTaskExecutor);
+        }
+    }
+
+    public void clearTask() {
+        threadPoolExecutor.getQueue().clear();
+    }
+}
diff --git 
a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/operator/TaskInstanceDispatchOperationFunction.java
 
b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/operator/TaskInstanceDispatchOperationFunction.java
index 4d3ebc9aa9..8c6825df63 100644
--- 
a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/operator/TaskInstanceDispatchOperationFunction.java
+++ 
b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/operator/TaskInstanceDispatchOperationFunction.java
@@ -20,11 +20,12 @@ package 
org.apache.dolphinscheduler.server.worker.runner.operator;
 import 
org.apache.dolphinscheduler.extract.worker.transportor.TaskInstanceDispatchRequest;
 import 
org.apache.dolphinscheduler.extract.worker.transportor.TaskInstanceDispatchResponse;
 import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
-import 
org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContextCacheManager;
 import org.apache.dolphinscheduler.plugin.task.api.utils.LogUtils;
 import org.apache.dolphinscheduler.server.worker.config.WorkerConfig;
 import org.apache.dolphinscheduler.server.worker.metrics.TaskMetrics;
-import 
org.apache.dolphinscheduler.server.worker.runner.GlobalTaskInstanceWaitingQueue;
+import org.apache.dolphinscheduler.server.worker.runner.WorkerTaskExecutor;
+import 
org.apache.dolphinscheduler.server.worker.runner.WorkerTaskExecutorFactoryBuilder;
+import 
org.apache.dolphinscheduler.server.worker.runner.WorkerTaskExecutorThreadPool;
 
 import lombok.extern.slf4j.Slf4j;
 
@@ -41,15 +42,16 @@ public class TaskInstanceDispatchOperationFunction
     private WorkerConfig workerConfig;
 
     @Autowired
-    private GlobalTaskInstanceWaitingQueue globalTaskInstanceWaitingQueue;
+    private WorkerTaskExecutorFactoryBuilder workerTaskExecutorFactoryBuilder;
+
+    @Autowired
+    private WorkerTaskExecutorThreadPool workerTaskExecutorThreadPool;
 
     @Override
     public TaskInstanceDispatchResponse operate(TaskInstanceDispatchRequest 
taskInstanceDispatchRequest) {
         log.info("Receive TaskInstanceDispatchRequest: {}", 
taskInstanceDispatchRequest);
         TaskExecutionContext taskExecutionContext = 
taskInstanceDispatchRequest.getTaskExecutionContext();
         try {
-            // set cache, it will be used when kill task
-            
TaskExecutionContextCacheManager.cacheTaskExecutionContext(taskExecutionContext);
             taskExecutionContext.setHost(workerConfig.getWorkerAddress());
             
taskExecutionContext.setLogPath(LogUtils.getTaskInstanceLogFullPath(taskExecutionContext));
 
@@ -57,9 +59,11 @@ public class TaskInstanceDispatchOperationFunction
                     taskExecutionContext.getTaskInstanceId());
             
TaskMetrics.incrTaskTypeExecuteCount(taskExecutionContext.getTaskType());
 
-            if 
(!globalTaskInstanceWaitingQueue.addDispatchTask(taskExecutionContext)) {
-                log.error("Submit task: {} to wait queue error, current queue 
size: {} is full",
-                        taskExecutionContext.getTaskName(), 
workerConfig.getExecThreads());
+            WorkerTaskExecutor workerTaskExecutor = 
workerTaskExecutorFactoryBuilder
+                    
.createWorkerTaskExecutorFactory(taskExecutionContext).createWorkerTaskExecutor();
+            // todo: hold the workerTaskExecutor
+            if 
(!workerTaskExecutorThreadPool.submitWorkerTaskExecutor(workerTaskExecutor)) {
+                log.info("Submit task: {} to wait queue failed", 
taskExecutionContext.getTaskName());
                 return 
TaskInstanceDispatchResponse.failed(taskExecutionContext.getTaskInstanceId(),
                         "WorkerManagerThread is full");
             } else {
diff --git 
a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/operator/TaskInstanceKillOperationFunction.java
 
b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/operator/TaskInstanceKillOperationFunction.java
index c5f4ffb78b..69e3994a90 100644
--- 
a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/operator/TaskInstanceKillOperationFunction.java
+++ 
b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/operator/TaskInstanceKillOperationFunction.java
@@ -23,13 +23,13 @@ import 
org.apache.dolphinscheduler.extract.worker.transportor.TaskInstanceKillRe
 import org.apache.dolphinscheduler.plugin.task.api.AbstractTask;
 import org.apache.dolphinscheduler.plugin.task.api.TaskException;
 import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
-import 
org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContextCacheManager;
 import org.apache.dolphinscheduler.plugin.task.api.enums.TaskExecutionStatus;
 import org.apache.dolphinscheduler.plugin.task.api.utils.LogUtils;
 import org.apache.dolphinscheduler.plugin.task.api.utils.ProcessUtils;
 import org.apache.dolphinscheduler.server.worker.message.MessageRetryRunner;
-import org.apache.dolphinscheduler.server.worker.runner.WorkerManagerThread;
 import org.apache.dolphinscheduler.server.worker.runner.WorkerTaskExecutor;
+import 
org.apache.dolphinscheduler.server.worker.runner.WorkerTaskExecutorHolder;
+import 
org.apache.dolphinscheduler.server.worker.runner.WorkerTaskExecutorThreadPool;
 
 import lombok.extern.slf4j.Slf4j;
 
@@ -45,7 +45,7 @@ public class TaskInstanceKillOperationFunction
             ITaskInstanceOperationFunction<TaskInstanceKillRequest, 
TaskInstanceKillResponse> {
 
     @Autowired
-    private WorkerManagerThread workerManager;
+    private WorkerTaskExecutorThreadPool workerManager;
 
     @Autowired
     private MessageRetryRunner messageRetryRunner;
@@ -57,22 +57,24 @@ public class TaskInstanceKillOperationFunction
         int taskInstanceId = taskInstanceKillRequest.getTaskInstanceId();
         try {
             LogUtils.setTaskInstanceIdMDC(taskInstanceId);
-            TaskExecutionContext taskExecutionContext =
-                    
TaskExecutionContextCacheManager.getByTaskInstanceId(taskInstanceId);
-            if (taskExecutionContext == null) {
-                log.error("Cannot find TaskExecutionContext for taskInstance: 
{}", taskInstanceId);
-                return TaskInstanceKillResponse.fail("Cannot find 
TaskExecutionContext");
+            WorkerTaskExecutor workerTaskExecutor = 
WorkerTaskExecutorHolder.get(taskInstanceId);
+            if (workerTaskExecutor == null) {
+                log.error("Cannot find WorkerTaskExecutor for taskInstance: 
{}", taskInstanceId);
+                return TaskInstanceKillResponse.fail("Cannot find 
WorkerTaskExecutor");
             }
+            TaskExecutionContext taskExecutionContext = 
workerTaskExecutor.getTaskExecutionContext();
+
             
LogUtils.setTaskInstanceLogFullPathMDC(taskExecutionContext.getLogPath());
 
             boolean result = doKill(taskExecutionContext);
-            this.cancelApplication(taskInstanceId);
+            this.cancelApplication(workerTaskExecutor);
 
             int processId = taskExecutionContext.getProcessId();
             if (processId == 0) {
                 
workerManager.killTaskBeforeExecuteByInstanceId(taskInstanceId);
                 
taskExecutionContext.setCurrentExecutionStatus(TaskExecutionStatus.KILL);
-                
TaskExecutionContextCacheManager.removeByTaskInstanceId(taskInstanceId);
+                // todo: the task might be executed, but the processId is 0
+                WorkerTaskExecutorHolder.remove(taskInstanceId);
                 log.info("The task has not been executed and has been 
cancelled, task id:{}", taskInstanceId);
                 return TaskInstanceKillResponse.success(taskExecutionContext);
             }
@@ -80,7 +82,7 @@ public class TaskInstanceKillOperationFunction
             taskExecutionContext
                     .setCurrentExecutionStatus(result ? 
TaskExecutionStatus.SUCCESS : TaskExecutionStatus.FAILURE);
 
-            
TaskExecutionContextCacheManager.removeByTaskInstanceId(taskExecutionContext.getTaskInstanceId());
+            
WorkerTaskExecutorHolder.remove(taskExecutionContext.getTaskInstanceId());
             
messageRetryRunner.removeRetryMessages(taskExecutionContext.getTaskInstanceId());
             return TaskInstanceKillResponse.success(taskExecutionContext);
         } finally {
@@ -102,15 +104,11 @@ public class TaskInstanceKillOperationFunction
         return processFlag;
     }
 
-    protected void cancelApplication(int taskInstanceId) {
-        WorkerTaskExecutor workerTaskExecutor = 
workerManager.getTaskExecuteThread(taskInstanceId);
-        if (workerTaskExecutor == null) {
-            log.warn("taskExecuteThread not found, taskInstanceId:{}", 
taskInstanceId);
-            return;
-        }
+    protected void cancelApplication(WorkerTaskExecutor workerTaskExecutor) {
         AbstractTask task = workerTaskExecutor.getTask();
         if (task == null) {
-            log.warn("task not found, taskInstanceId:{}", taskInstanceId);
+            log.warn("task not found, taskInstanceId: {}",
+                    
workerTaskExecutor.getTaskExecutionContext().getTaskInstanceId());
             return;
         }
         try {
@@ -118,7 +116,8 @@ public class TaskInstanceKillOperationFunction
         } catch (Exception e) {
             log.error("kill task error", e);
         }
-        log.info("kill task by cancelApplication, task id:{}", taskInstanceId);
+        log.info("kill task by cancelApplication, taskInstanceId: {}",
+                
workerTaskExecutor.getTaskExecutionContext().getTaskInstanceId());
     }
 
     protected boolean killProcess(String tenantCode, Integer processId) {
diff --git 
a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/operator/UpdateWorkflowHostOperationFunction.java
 
b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/operator/UpdateWorkflowHostOperationFunction.java
index 6d31feff7e..7485b9230f 100644
--- 
a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/operator/UpdateWorkflowHostOperationFunction.java
+++ 
b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/operator/UpdateWorkflowHostOperationFunction.java
@@ -20,9 +20,10 @@ package 
org.apache.dolphinscheduler.server.worker.runner.operator;
 import 
org.apache.dolphinscheduler.extract.worker.transportor.UpdateWorkflowHostRequest;
 import 
org.apache.dolphinscheduler.extract.worker.transportor.UpdateWorkflowHostResponse;
 import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
-import 
org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContextCacheManager;
 import org.apache.dolphinscheduler.plugin.task.api.utils.LogUtils;
 import org.apache.dolphinscheduler.server.worker.message.MessageRetryRunner;
+import org.apache.dolphinscheduler.server.worker.runner.WorkerTaskExecutor;
+import 
org.apache.dolphinscheduler.server.worker.runner.WorkerTaskExecutorHolder;
 
 import lombok.extern.slf4j.Slf4j;
 
@@ -47,21 +48,29 @@ public class UpdateWorkflowHostOperationFunction
             LogUtils.setTaskInstanceIdMDC(taskInstanceId);
             log.info("Received UpdateWorkflowHostRequest: {}", 
updateWorkflowHostRequest);
 
-            TaskExecutionContext taskExecutionContext =
-                    
TaskExecutionContextCacheManager.getByTaskInstanceId(taskInstanceId);
-            if (taskExecutionContext == null) {
-                log.error("Cannot find the taskExecutionContext for 
taskInstance : {}", taskInstanceId);
-                return UpdateWorkflowHostResponse.failed("Cannot find the 
taskExecutionContext");
+            boolean updateWorkerTaskExecutor = 
updateHostInWorkflowTaskExecutor(taskInstanceId, workflowHost);
+            boolean updateMessage = updateHostInMessage(taskInstanceId, 
workflowHost);
+            if (updateWorkerTaskExecutor || updateMessage) {
+                return UpdateWorkflowHostResponse.success();
             }
-
-            
LogUtils.setTaskInstanceLogFullPathMDC(taskExecutionContext.getLogPath());
-            taskExecutionContext.setWorkflowInstanceHost(workflowHost);
-            messageRetryRunner.updateMessageHost(taskInstanceId, workflowHost);
-            log.info("Success update workflow host: {} for taskInstance: {}", 
workflowHost, taskInstanceId);
-            return UpdateWorkflowHostResponse.success();
+            return UpdateWorkflowHostResponse.failed("The taskInstance is not 
in the worker");
         } finally {
             LogUtils.removeTaskInstanceIdMDC();
             LogUtils.removeTaskInstanceLogFullPathMDC();
         }
     }
+
+    private boolean updateHostInWorkflowTaskExecutor(int taskInstanceId, 
String workflowHost) {
+        WorkerTaskExecutor workerTaskExecutor = 
WorkerTaskExecutorHolder.get(taskInstanceId);
+        if (workerTaskExecutor == null) {
+            return false;
+        }
+        TaskExecutionContext taskExecutionContext = 
workerTaskExecutor.getTaskExecutionContext();
+        taskExecutionContext.setWorkflowInstanceHost(workflowHost);
+        return true;
+    }
+
+    private boolean updateHostInMessage(int taskInstanceId, String 
workflowHost) {
+        return messageRetryRunner.updateMessageHost(taskInstanceId, 
workflowHost);
+    }
 }
diff --git 
a/dolphinscheduler-worker/src/test/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistryClientTest.java
 
b/dolphinscheduler-worker/src/test/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistryClientTest.java
index d55ccc25af..e6043486f5 100644
--- 
a/dolphinscheduler-worker/src/test/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistryClientTest.java
+++ 
b/dolphinscheduler-worker/src/test/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistryClientTest.java
@@ -23,7 +23,7 @@ import org.apache.dolphinscheduler.common.utils.NetUtils;
 import org.apache.dolphinscheduler.registry.api.RegistryClient;
 import org.apache.dolphinscheduler.registry.api.enums.RegistryNodeType;
 import org.apache.dolphinscheduler.server.worker.config.WorkerConfig;
-import org.apache.dolphinscheduler.server.worker.runner.WorkerManagerThread;
+import 
org.apache.dolphinscheduler.server.worker.runner.WorkerTaskExecutorThreadPool;
 
 import java.time.Duration;
 import java.util.Set;
@@ -67,7 +67,7 @@ public class WorkerRegistryClientTest {
     private ScheduledExecutorService heartBeatExecutor;
 
     @Mock
-    private WorkerManagerThread workerManagerThread;
+    private WorkerTaskExecutorThreadPool workerManagerThread;
 
     @Mock
     private WorkerConnectStrategy workerConnectStrategy;
diff --git 
a/dolphinscheduler-worker/src/test/java/org/apache/dolphinscheduler/server/worker/runner/WorkerTaskExecutorThreadPoolTest.java
 
b/dolphinscheduler-worker/src/test/java/org/apache/dolphinscheduler/server/worker/runner/WorkerTaskExecutorThreadPoolTest.java
new file mode 100644
index 0000000000..988f1f7fec
--- /dev/null
+++ 
b/dolphinscheduler-worker/src/test/java/org/apache/dolphinscheduler/server/worker/runner/WorkerTaskExecutorThreadPoolTest.java
@@ -0,0 +1,194 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.server.worker.runner;
+
+import org.apache.dolphinscheduler.common.enums.ResUploadType;
+import org.apache.dolphinscheduler.plugin.storage.api.StorageEntity;
+import org.apache.dolphinscheduler.plugin.storage.api.StorageOperate;
+import org.apache.dolphinscheduler.plugin.task.api.TaskCallBack;
+import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
+import org.apache.dolphinscheduler.plugin.task.api.TaskPluginManager;
+import 
org.apache.dolphinscheduler.server.worker.config.TaskExecuteThreadsFullPolicy;
+import org.apache.dolphinscheduler.server.worker.config.WorkerConfig;
+import org.apache.dolphinscheduler.server.worker.registry.WorkerRegistryClient;
+import org.apache.dolphinscheduler.server.worker.rpc.WorkerMessageSender;
+import org.apache.dolphinscheduler.spi.enums.ResourceType;
+
+import java.io.IOException;
+import java.util.List;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+class WorkerTaskExecutorThreadPoolTest {
+
+    @Test
+    public void testIsOverload() {
+        WorkerConfig workerConfig = new WorkerConfig();
+        workerConfig.setExecThreads(1);
+        
workerConfig.setTaskExecuteThreadsFullPolicy(TaskExecuteThreadsFullPolicy.CONTINUE);
+        WorkerTaskExecutorThreadPool workerTaskExecutorThreadPool = new 
WorkerTaskExecutorThreadPool(workerConfig);
+        // submit 100 task, the thread pool size is 1
+        // assert the overload should be true
+        // assert the submitQueue should be 99
+        for (int i = 0; i < 100; i++) {
+            boolean submitResult =
+                    workerTaskExecutorThreadPool.submitWorkerTaskExecutor(new 
MockWorkerTaskExecutor(() -> {
+                        try {
+                            Thread.sleep(10_000L);
+                        } catch (InterruptedException e) {
+                            throw new RuntimeException(e);
+                        }
+                    }));
+            Assertions.assertTrue(submitResult);
+        }
+        Assertions.assertTrue(workerTaskExecutorThreadPool.isOverload());
+        Assertions.assertEquals(99, 
workerTaskExecutorThreadPool.getWaitingTaskExecutorSize());
+        Assertions.assertEquals(1, 
workerTaskExecutorThreadPool.getRunningTaskExecutorSize());
+    }
+
+    static class MockWorkerTaskExecutor extends WorkerTaskExecutor {
+
+        private final Runnable runnable;
+
+        protected MockWorkerTaskExecutor(Runnable runnable) {
+            super(TaskExecutionContext.builder().taskInstanceId((int) 
System.nanoTime()).build(), new WorkerConfig(),
+                    new WorkerMessageSender(), new TaskPluginManager(), new 
StorageOperate() {
+
+                        @Override
+                        public void createTenantDirIfNotExists(String 
tenantCode) {
+
+                        }
+
+                        @Override
+                        public String getResDir(String tenantCode) {
+                            return null;
+                        }
+
+                        @Override
+                        public String getUdfDir(String tenantCode) {
+                            return null;
+                        }
+
+                        @Override
+                        public boolean mkdir(String tenantCode, String path) 
throws IOException {
+                            return false;
+                        }
+
+                        @Override
+                        public String getResourceFullName(String tenantCode, 
String fileName) {
+                            return null;
+                        }
+
+                        @Override
+                        public String getResourceFileName(String tenantCode, 
String fullName) {
+                            return null;
+                        }
+
+                        @Override
+                        public String getFileName(ResourceType resourceType, 
String tenantCode, String fileName) {
+                            return null;
+                        }
+
+                        @Override
+                        public boolean exists(String fullName) {
+                            return false;
+                        }
+
+                        @Override
+                        public boolean delete(String filePath, boolean 
recursive) {
+                            return false;
+                        }
+
+                        @Override
+                        public boolean delete(String filePath, List<String> 
childrenPathArray,
+                                              boolean recursive) {
+                            return false;
+                        }
+
+                        @Override
+                        public boolean copy(String srcPath, String dstPath, 
boolean deleteSource,
+                                            boolean overwrite) {
+                            return false;
+                        }
+
+                        @Override
+                        public String getDir(ResourceType resourceType, String 
tenantCode) {
+                            return null;
+                        }
+
+                        @Override
+                        public boolean upload(String tenantCode, String 
srcFile, String dstPath, boolean deleteSource,
+                                              boolean overwrite) {
+                            return false;
+                        }
+
+                        @Override
+                        public void download(String srcFilePath, String 
dstFile, boolean overwrite) {
+
+                        }
+
+                        @Override
+                        public List<String> vimFile(String tenantCode, String 
filePath, int skipLineNums,
+                                                    int limit) {
+                            return null;
+                        }
+
+                        @Override
+                        public void deleteTenant(String tenantCode) {
+
+                        }
+
+                        @Override
+                        public ResUploadType returnStorageType() {
+                            return null;
+                        }
+
+                        @Override
+                        public List<StorageEntity> 
listFilesStatusRecursively(String path, String defaultPath,
+                                                                              
String tenantCode, ResourceType type) {
+                            return null;
+                        }
+
+                        @Override
+                        public List<StorageEntity> listFilesStatus(String 
path, String defaultPath, String tenantCode,
+                                                                   
ResourceType type) throws Exception {
+                            return null;
+                        }
+
+                        @Override
+                        public StorageEntity getFileStatus(String path, String 
defaultPath, String tenantCode,
+                                                           ResourceType type) 
throws Exception {
+                            return null;
+                        }
+                    }, new WorkerRegistryClient());
+            this.runnable = runnable;
+        }
+
+        @Override
+        public void run() {
+            executeTask(new TaskCallbackImpl(null, null));
+        }
+
+        @Override
+        protected void executeTask(TaskCallBack taskCallBack) {
+            runnable.run();
+        }
+    }
+
+}

Reply via email to