This is an automated email from the ASF dual-hosted git repository.

caishunfeng pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git


The following commit(s) were added to refs/heads/dev by this push:
     new deca0a7ea1 Fix master memory leak due to 
MasterTaskExecuteRunnableDoesn't be removed (#14162)
deca0a7ea1 is described below

commit deca0a7ea1e5e2a34f2eb59e550d02d4d2cf368e
Author: Wenjun Ruan <[email protected]>
AuthorDate: Wed May 24 13:49:32 2023 +0800

    Fix master memory leak due to MasterTaskExecuteRunnableDoesn't be removed 
(#14162)
    
    * Fix master memory leak due to MasterTaskExecuteRunnableDoesn't be removed
    
    * Update 
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/execute/MasterTaskExecuteRunnable.java
    
    Co-authored-by: caishunfeng <[email protected]>
---
 .../processor/MasterTaskDispatchProcessor.java     |  1 +
 .../master/processor/MasterTaskKillProcessor.java  | 10 +--
 .../master/processor/MasterTaskPauseProcessor.java |  8 +-
 ...erDelayTaskExecuteRunnableDelayQueueLooper.java |  3 +
 .../MasterTaskExecuteRunnableThreadPool.java       | 89 ----------------------
 .../execute/AsyncTaskCallbackFunctionImpl.java     |  6 +-
 .../runner/execute/MasterTaskExecuteRunnable.java  |  6 +-
 .../execute/MasterTaskExecuteRunnableHolder.java   | 45 +++++++++++
 .../MasterTaskExecuteRunnableThreadPool.java       | 51 +++++++++++++
 .../SyncMasterDelayTaskExecuteRunnable.java        |  2 +
 .../runner/operator/TaskTimeoutOperator.java       | 10 +--
 11 files changed, 118 insertions(+), 113 deletions(-)

diff --git 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/MasterTaskDispatchProcessor.java
 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/MasterTaskDispatchProcessor.java
index 009a684e92..e70199152a 100644
--- 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/MasterTaskDispatchProcessor.java
+++ 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/MasterTaskDispatchProcessor.java
@@ -99,6 +99,7 @@ public class MasterTaskDispatchProcessor implements 
MasterRpcProcessor {
             }
         } catch (Exception ex) {
             log.error("Handle task dispatch request error, command: {}", 
taskDispatchRequest, ex);
+            
MasterTaskExecutionContextHolder.removeTaskExecutionContext(taskExecutionContext.getTaskInstanceId());
             sendDispatchFailedResult(channel, message, taskExecutionContext, 
ex);
         } finally {
             LogUtils.removeWorkflowAndTaskInstanceIdMDC();
diff --git 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/MasterTaskKillProcessor.java
 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/MasterTaskKillProcessor.java
index 1ad56cedd3..37a61e6246 100644
--- 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/MasterTaskKillProcessor.java
+++ 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/MasterTaskKillProcessor.java
@@ -26,8 +26,8 @@ import 
org.apache.dolphinscheduler.remote.command.task.TaskKillRequest;
 import org.apache.dolphinscheduler.remote.processor.MasterRpcProcessor;
 import 
org.apache.dolphinscheduler.server.master.exception.MasterTaskExecuteException;
 import 
org.apache.dolphinscheduler.server.master.runner.MasterDelayTaskExecuteRunnableDelayQueue;
-import 
org.apache.dolphinscheduler.server.master.runner.MasterTaskExecuteRunnableThreadPool;
 import 
org.apache.dolphinscheduler.server.master.runner.execute.MasterTaskExecuteRunnable;
+import 
org.apache.dolphinscheduler.server.master.runner.execute.MasterTaskExecuteRunnableHolder;
 import 
org.apache.dolphinscheduler.server.master.runner.execute.MasterTaskExecutionContextHolder;
 
 import lombok.extern.slf4j.Slf4j;
@@ -41,9 +41,6 @@ import io.netty.channel.Channel;
 @Component
 public class MasterTaskKillProcessor implements MasterRpcProcessor {
 
-    @Autowired
-    private MasterTaskExecuteRunnableThreadPool 
masterTaskExecuteRunnableThreadPool;
-
     @Autowired
     private MasterDelayTaskExecuteRunnableDelayQueue 
masterDelayTaskExecuteRunnableDelayQueue;
 
@@ -60,7 +57,7 @@ public class MasterTaskKillProcessor implements 
MasterRpcProcessor {
                 return;
             }
             MasterTaskExecuteRunnable masterTaskExecuteRunnable =
-                    
masterTaskExecuteRunnableThreadPool.getMasterTaskExecuteRunnable(taskInstanceId);
+                    
MasterTaskExecuteRunnableHolder.getMasterTaskExecuteRunnable(taskInstanceId);
             if (masterTaskExecuteRunnable == null) {
                 log.error("Cannot find the MasterTaskExecuteRunnable, this 
task may already been killed");
                 return;
@@ -71,6 +68,9 @@ public class MasterTaskKillProcessor implements 
MasterRpcProcessor {
                         
.removeMasterDelayTaskExecuteRunnable(masterTaskExecuteRunnable);
             } catch (MasterTaskExecuteException e) {
                 log.error("Cancel MasterTaskExecuteRunnable failed ", e);
+            } finally {
+                
MasterTaskExecutionContextHolder.removeTaskExecutionContext(taskInstanceId);
+                
MasterTaskExecuteRunnableHolder.removeMasterTaskExecuteRunnable(taskInstanceId);
             }
         }
     }
diff --git 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/MasterTaskPauseProcessor.java
 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/MasterTaskPauseProcessor.java
index 5b7176c18f..41c9c50d69 100644
--- 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/MasterTaskPauseProcessor.java
+++ 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/MasterTaskPauseProcessor.java
@@ -25,12 +25,11 @@ import 
org.apache.dolphinscheduler.remote.command.MessageType;
 import org.apache.dolphinscheduler.remote.command.task.TaskPauseRequest;
 import org.apache.dolphinscheduler.remote.processor.MasterRpcProcessor;
 import 
org.apache.dolphinscheduler.server.master.exception.MasterTaskExecuteException;
-import 
org.apache.dolphinscheduler.server.master.runner.MasterTaskExecuteRunnableThreadPool;
 import 
org.apache.dolphinscheduler.server.master.runner.execute.MasterTaskExecuteRunnable;
+import 
org.apache.dolphinscheduler.server.master.runner.execute.MasterTaskExecuteRunnableHolder;
 
 import lombok.extern.slf4j.Slf4j;
 
-import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Component;
 
 import io.netty.channel.Channel;
@@ -39,14 +38,11 @@ import io.netty.channel.Channel;
 @Component
 public class MasterTaskPauseProcessor implements MasterRpcProcessor {
 
-    @Autowired
-    private MasterTaskExecuteRunnableThreadPool 
masterTaskExecuteRunnableThreadPool;
-
     @Override
     public void process(Channel channel, Message message) {
         TaskPauseRequest taskPauseRequest = 
JSONUtils.parseObject(message.getBody(), TaskPauseRequest.class);
         MasterTaskExecuteRunnable masterTaskExecuteRunnable =
-                
masterTaskExecuteRunnableThreadPool.getMasterTaskExecuteRunnable(taskPauseRequest.getTaskInstanceId());
+                
MasterTaskExecuteRunnableHolder.getMasterTaskExecuteRunnable(taskPauseRequest.getTaskInstanceId());
         if (masterTaskExecuteRunnable == null) {
             log.info("Cannot find the MasterTaskExecuteRunnable");
             return;
diff --git 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterDelayTaskExecuteRunnableDelayQueueLooper.java
 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterDelayTaskExecuteRunnableDelayQueueLooper.java
index 3b429e5d76..557f2ca447 100644
--- 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterDelayTaskExecuteRunnableDelayQueueLooper.java
+++ 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterDelayTaskExecuteRunnableDelayQueueLooper.java
@@ -19,6 +19,8 @@ package org.apache.dolphinscheduler.server.master.runner;
 
 import org.apache.dolphinscheduler.common.thread.BaseDaemonThread;
 import 
org.apache.dolphinscheduler.server.master.runner.execute.MasterDelayTaskExecuteRunnable;
+import 
org.apache.dolphinscheduler.server.master.runner.execute.MasterTaskExecuteRunnableHolder;
+import 
org.apache.dolphinscheduler.server.master.runner.execute.MasterTaskExecuteRunnableThreadPool;
 
 import java.util.concurrent.atomic.AtomicBoolean;
 
@@ -62,6 +64,7 @@ public class MasterDelayTaskExecuteRunnableDelayQueueLooper 
extends BaseDaemonTh
                 final MasterDelayTaskExecuteRunnable 
masterDelayTaskExecuteRunnable =
                         
masterDelayTaskExecuteRunnableDelayQueue.takeMasterDelayTaskExecuteRunnable();
                 
masterTaskExecuteRunnableThreadPool.submitMasterTaskExecuteRunnable(masterDelayTaskExecuteRunnable);
+                
MasterTaskExecuteRunnableHolder.putMasterTaskExecuteRunnable(masterDelayTaskExecuteRunnable);
             } catch (InterruptedException ex) {
                 Thread.currentThread().interrupt();
                 log.warn("MasterDelayTaskExecuteRunnableDelayQueueLooper has 
been interrupted, will stop loop");
diff --git 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterTaskExecuteRunnableThreadPool.java
 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterTaskExecuteRunnableThreadPool.java
deleted file mode 100644
index 86c95308d8..0000000000
--- 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterTaskExecuteRunnableThreadPool.java
+++ /dev/null
@@ -1,89 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.dolphinscheduler.server.master.runner;
-
-import org.apache.dolphinscheduler.common.thread.ThreadUtils;
-import org.apache.dolphinscheduler.server.master.config.MasterConfig;
-import 
org.apache.dolphinscheduler.server.master.runner.execute.MasterTaskExecuteRunnable;
-
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-
-import lombok.extern.slf4j.Slf4j;
-
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.stereotype.Component;
-
-import com.google.common.util.concurrent.FutureCallback;
-import com.google.common.util.concurrent.Futures;
-import com.google.common.util.concurrent.ListenableFuture;
-import com.google.common.util.concurrent.ListeningExecutorService;
-import com.google.common.util.concurrent.MoreExecutors;
-
-@Slf4j
-@Component
-public class MasterTaskExecuteRunnableThreadPool {
-
-    @Autowired
-    private MasterConfig masterConfig;
-
-    private static final Map<Integer, MasterTaskExecuteRunnable> 
SUBMITTED_MASTER_TASK_MAP = new ConcurrentHashMap<>();
-
-    private ListeningExecutorService listeningExecutorService;
-
-    public synchronized void start() {
-        log.info("MasterTaskExecuteRunnableThreadPool starting...");
-        this.listeningExecutorService = 
MoreExecutors.listeningDecorator(ThreadUtils.newDaemonFixedThreadExecutor(
-                "MasterTaskExecuteRunnableThread", 
masterConfig.getMasterTaskExecuteThreadPoolSize()));
-        log.info("MasterTaskExecuteRunnableThreadPool started...");
-    }
-
-    public void submitMasterTaskExecuteRunnable(MasterTaskExecuteRunnable 
masterTaskExecuteRunnable) {
-        ListenableFuture<?> future = 
listeningExecutorService.submit(masterTaskExecuteRunnable);
-        Futures.addCallback(future, new 
MasterTaskExecuteCallback(masterTaskExecuteRunnable),
-                this.listeningExecutorService);
-        
SUBMITTED_MASTER_TASK_MAP.put(masterTaskExecuteRunnable.getTaskExecutionContext().getTaskInstanceId(),
-                masterTaskExecuteRunnable);
-    }
-
-    public MasterTaskExecuteRunnable getMasterTaskExecuteRunnable(Integer 
taskInstanceId) {
-        return SUBMITTED_MASTER_TASK_MAP.get(taskInstanceId);
-    }
-
-    private static class MasterTaskExecuteCallback implements FutureCallback {
-
-        private MasterTaskExecuteRunnable masterTaskExecuteRunnable;
-
-        public MasterTaskExecuteCallback(MasterTaskExecuteRunnable 
masterTaskExecuteRunnable) {
-            this.masterTaskExecuteRunnable = masterTaskExecuteRunnable;
-        }
-
-        @Override
-        public void onSuccess(Object result) {
-            log.info("MasterTaskExecuteRunnable execute success, will remove 
this task");
-            
SUBMITTED_MASTER_TASK_MAP.remove(masterTaskExecuteRunnable.getTaskExecutionContext().getTaskInstanceId());
-        }
-
-        @Override
-        public void onFailure(Throwable t) {
-            log.info("MasterTaskExecuteRunnable execute failed, will remove 
this task");
-            
SUBMITTED_MASTER_TASK_MAP.remove(masterTaskExecuteRunnable.getTaskExecutionContext().getTaskInstanceId());
-        }
-    }
-
-}
diff --git 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/execute/AsyncTaskCallbackFunctionImpl.java
 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/execute/AsyncTaskCallbackFunctionImpl.java
index cb5d07fc20..e9d4fe4430 100644
--- 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/execute/AsyncTaskCallbackFunctionImpl.java
+++ 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/execute/AsyncTaskCallbackFunctionImpl.java
@@ -17,7 +17,6 @@
 
 package org.apache.dolphinscheduler.server.master.runner.execute;
 
-import 
org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContextCacheManager;
 import org.apache.dolphinscheduler.plugin.task.api.enums.TaskExecutionStatus;
 import org.apache.dolphinscheduler.plugin.task.api.log.TaskInstanceLogHeader;
 
@@ -54,8 +53,9 @@ public class AsyncTaskCallbackFunctionImpl implements 
AsyncTaskCallbackFunction
 
     private void executeFinished() {
         TaskInstanceLogHeader.printFinalizeTaskHeader();
-        TaskExecutionContextCacheManager.removeByTaskInstanceId(
-                
asyncMasterDelayTaskExecuteRunnable.getTaskExecutionContext().getTaskInstanceId());
+        int taskInstanceId = 
asyncMasterDelayTaskExecuteRunnable.getTaskExecutionContext().getTaskInstanceId();
+        
MasterTaskExecutionContextHolder.removeTaskExecutionContext(taskInstanceId);
+        
MasterTaskExecuteRunnableHolder.removeMasterTaskExecuteRunnable(taskInstanceId);
         log.info("Task execute finished, removed the TaskExecutionContext");
         asyncMasterDelayTaskExecuteRunnable.sendTaskResult();
         log.info(
diff --git 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/execute/MasterTaskExecuteRunnable.java
 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/execute/MasterTaskExecuteRunnable.java
index 973baf978e..e9a9e8dae7 100644
--- 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/execute/MasterTaskExecuteRunnable.java
+++ 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/execute/MasterTaskExecuteRunnable.java
@@ -22,7 +22,6 @@ import static 
ch.qos.logback.classic.ClassicConstants.FINALIZE_SESSION_MARKER;
 import org.apache.dolphinscheduler.common.log.remote.RemoteLogUtils;
 import org.apache.dolphinscheduler.common.utils.JSONUtils;
 import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
-import 
org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContextCacheManager;
 import org.apache.dolphinscheduler.plugin.task.api.enums.TaskExecutionStatus;
 import org.apache.dolphinscheduler.plugin.task.api.log.TaskInstanceLogHeader;
 import org.apache.dolphinscheduler.plugin.task.api.utils.LogUtils;
@@ -59,7 +58,7 @@ public abstract class MasterTaskExecuteRunnable implements 
Runnable {
     protected void afterThrowing(Throwable throwable) {
         try {
             cancelTask();
-            log.info("Get a exception when execute the task, canceled the 
task");
+            log.error("Get a exception when execute the task, canceled the 
task", throwable);
         } catch (Exception e) {
             log.error("Cancel task failed,", e);
         }
@@ -68,7 +67,8 @@ public abstract class MasterTaskExecuteRunnable implements 
Runnable {
         log.info(
                 "Get a exception when execute the task, sent the task execute 
result to master, the current task execute result is {}",
                 taskExecutionContext.getCurrentExecutionStatus());
-        
TaskExecutionContextCacheManager.removeByTaskInstanceId(taskExecutionContext.getTaskInstanceId());
+        
MasterTaskExecutionContextHolder.removeTaskExecutionContext(taskExecutionContext.getTaskInstanceId());
+        
MasterTaskExecuteRunnableHolder.removeMasterTaskExecuteRunnable(taskExecutionContext.getTaskInstanceId());
         log.info("Get a exception when execute the task, removed the 
TaskExecutionContext");
     }
 
diff --git 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/execute/MasterTaskExecuteRunnableHolder.java
 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/execute/MasterTaskExecuteRunnableHolder.java
new file mode 100644
index 0000000000..6b29897611
--- /dev/null
+++ 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/execute/MasterTaskExecuteRunnableHolder.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.server.master.runner.execute;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+import lombok.experimental.UtilityClass;
+import lombok.extern.slf4j.Slf4j;
+
+@Slf4j
+@UtilityClass
+public class MasterTaskExecuteRunnableHolder {
+
+    private static final Map<Integer, MasterTaskExecuteRunnable> 
SUBMITTED_MASTER_TASK_MAP = new ConcurrentHashMap<>();
+
+    public void putMasterTaskExecuteRunnable(MasterTaskExecuteRunnable 
masterTaskExecuteRunnable) {
+        
SUBMITTED_MASTER_TASK_MAP.put(masterTaskExecuteRunnable.getTaskExecutionContext().getTaskInstanceId(),
+                masterTaskExecuteRunnable);
+    }
+
+    public MasterTaskExecuteRunnable getMasterTaskExecuteRunnable(Integer 
taskInstanceId) {
+        return SUBMITTED_MASTER_TASK_MAP.get(taskInstanceId);
+    }
+
+    public void removeMasterTaskExecuteRunnable(Integer taskInstanceId) {
+        SUBMITTED_MASTER_TASK_MAP.remove(taskInstanceId);
+    }
+
+}
diff --git 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/execute/MasterTaskExecuteRunnableThreadPool.java
 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/execute/MasterTaskExecuteRunnableThreadPool.java
new file mode 100644
index 0000000000..4f542556e4
--- /dev/null
+++ 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/execute/MasterTaskExecuteRunnableThreadPool.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.server.master.runner.execute;
+
+import org.apache.dolphinscheduler.common.thread.ThreadUtils;
+import org.apache.dolphinscheduler.server.master.config.MasterConfig;
+
+import lombok.extern.slf4j.Slf4j;
+
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Component;
+
+import com.google.common.util.concurrent.ListeningExecutorService;
+import com.google.common.util.concurrent.MoreExecutors;
+
+@Slf4j
+@Component
+public class MasterTaskExecuteRunnableThreadPool {
+
+    @Autowired
+    private MasterConfig masterConfig;
+
+    private ListeningExecutorService listeningExecutorService;
+
+    public synchronized void start() {
+        log.info("MasterTaskExecuteRunnableThreadPool starting...");
+        this.listeningExecutorService = 
MoreExecutors.listeningDecorator(ThreadUtils.newDaemonFixedThreadExecutor(
+                "MasterTaskExecuteRunnableThread", 
masterConfig.getMasterTaskExecuteThreadPoolSize()));
+        log.info("MasterTaskExecuteRunnableThreadPool started...");
+    }
+
+    public void submitMasterTaskExecuteRunnable(MasterTaskExecuteRunnable 
masterTaskExecuteRunnable) {
+        listeningExecutorService.submit(masterTaskExecuteRunnable);
+    }
+
+}
diff --git 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/execute/SyncMasterDelayTaskExecuteRunnable.java
 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/execute/SyncMasterDelayTaskExecuteRunnable.java
index 03d3c0ea6f..04f9321c08 100644
--- 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/execute/SyncMasterDelayTaskExecuteRunnable.java
+++ 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/execute/SyncMasterDelayTaskExecuteRunnable.java
@@ -55,6 +55,8 @@ public class SyncMasterDelayTaskExecuteRunnable extends 
MasterDelayTaskExecuteRu
                 "Execute task finished, will send the task execute result to 
master, the current task execute result is {}",
                 taskExecutionContext.getCurrentExecutionStatus().name());
         closeLogAppender();
+        
MasterTaskExecutionContextHolder.removeTaskExecutionContext(taskExecutionContext.getTaskInstanceId());
+        
MasterTaskExecuteRunnableHolder.removeMasterTaskExecuteRunnable(taskExecutionContext.getTaskInstanceId());
     }
 
 }
diff --git 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/operator/TaskTimeoutOperator.java
 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/operator/TaskTimeoutOperator.java
index 741a681d8a..17de1e9539 100644
--- 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/operator/TaskTimeoutOperator.java
+++ 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/operator/TaskTimeoutOperator.java
@@ -19,12 +19,9 @@ package 
org.apache.dolphinscheduler.server.master.runner.operator;
 
 import org.apache.dolphinscheduler.dao.entity.TaskInstance;
 import org.apache.dolphinscheduler.dao.repository.TaskInstanceDao;
-import org.apache.dolphinscheduler.plugin.task.api.enums.TaskExecutionStatus;
 import org.apache.dolphinscheduler.plugin.task.api.enums.TaskTimeoutStrategy;
 import 
org.apache.dolphinscheduler.server.master.runner.execute.DefaultTaskExecuteRunnable;
 
-import java.util.Date;
-
 import lombok.extern.slf4j.Slf4j;
 
 import org.springframework.beans.factory.annotation.Autowired;
@@ -51,9 +48,8 @@ public class TaskTimeoutOperator implements TaskOperator {
                     taskInstance.getName(), taskTimeoutStrategy.name());
             return;
         }
-        log.info("TaskInstance: {} timeout, will kill the task instance", 
taskInstance.getName());
-        taskInstance.setState(TaskExecutionStatus.FAILURE);
-        taskInstance.setEndTime(new Date());
-        taskInstanceDao.upsertTaskInstance(taskInstance);
+        taskExecuteRunnable.kill();
+        log.info("TaskInstance: {} timeout, killed the task instance", 
taskInstance.getName());
+
     }
 }

Reply via email to