This is an automated email from the ASF dual-hosted git repository.
caishunfeng pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git
The following commit(s) were added to refs/heads/dev by this push:
new deca0a7ea1 Fix master memory leak due to
MasterTaskExecuteRunnableDoesn't be removed (#14162)
deca0a7ea1 is described below
commit deca0a7ea1e5e2a34f2eb59e550d02d4d2cf368e
Author: Wenjun Ruan <[email protected]>
AuthorDate: Wed May 24 13:49:32 2023 +0800
Fix master memory leak due to MasterTaskExecuteRunnableDoesn't be removed
(#14162)
* Fix master memory leak due to MasterTaskExecuteRunnableDoesn't be removed
* Update
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/execute/MasterTaskExecuteRunnable.java
Co-authored-by: caishunfeng <[email protected]>
---
.../processor/MasterTaskDispatchProcessor.java | 1 +
.../master/processor/MasterTaskKillProcessor.java | 10 +--
.../master/processor/MasterTaskPauseProcessor.java | 8 +-
...erDelayTaskExecuteRunnableDelayQueueLooper.java | 3 +
.../MasterTaskExecuteRunnableThreadPool.java | 89 ----------------------
.../execute/AsyncTaskCallbackFunctionImpl.java | 6 +-
.../runner/execute/MasterTaskExecuteRunnable.java | 6 +-
.../execute/MasterTaskExecuteRunnableHolder.java | 45 +++++++++++
.../MasterTaskExecuteRunnableThreadPool.java | 51 +++++++++++++
.../SyncMasterDelayTaskExecuteRunnable.java | 2 +
.../runner/operator/TaskTimeoutOperator.java | 10 +--
11 files changed, 118 insertions(+), 113 deletions(-)
diff --git
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/MasterTaskDispatchProcessor.java
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/MasterTaskDispatchProcessor.java
index 009a684e92..e70199152a 100644
---
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/MasterTaskDispatchProcessor.java
+++
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/MasterTaskDispatchProcessor.java
@@ -99,6 +99,7 @@ public class MasterTaskDispatchProcessor implements
MasterRpcProcessor {
}
} catch (Exception ex) {
log.error("Handle task dispatch request error, command: {}",
taskDispatchRequest, ex);
+
MasterTaskExecutionContextHolder.removeTaskExecutionContext(taskExecutionContext.getTaskInstanceId());
sendDispatchFailedResult(channel, message, taskExecutionContext,
ex);
} finally {
LogUtils.removeWorkflowAndTaskInstanceIdMDC();
diff --git
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/MasterTaskKillProcessor.java
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/MasterTaskKillProcessor.java
index 1ad56cedd3..37a61e6246 100644
---
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/MasterTaskKillProcessor.java
+++
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/MasterTaskKillProcessor.java
@@ -26,8 +26,8 @@ import
org.apache.dolphinscheduler.remote.command.task.TaskKillRequest;
import org.apache.dolphinscheduler.remote.processor.MasterRpcProcessor;
import
org.apache.dolphinscheduler.server.master.exception.MasterTaskExecuteException;
import
org.apache.dolphinscheduler.server.master.runner.MasterDelayTaskExecuteRunnableDelayQueue;
-import
org.apache.dolphinscheduler.server.master.runner.MasterTaskExecuteRunnableThreadPool;
import
org.apache.dolphinscheduler.server.master.runner.execute.MasterTaskExecuteRunnable;
+import
org.apache.dolphinscheduler.server.master.runner.execute.MasterTaskExecuteRunnableHolder;
import
org.apache.dolphinscheduler.server.master.runner.execute.MasterTaskExecutionContextHolder;
import lombok.extern.slf4j.Slf4j;
@@ -41,9 +41,6 @@ import io.netty.channel.Channel;
@Component
public class MasterTaskKillProcessor implements MasterRpcProcessor {
- @Autowired
- private MasterTaskExecuteRunnableThreadPool
masterTaskExecuteRunnableThreadPool;
-
@Autowired
private MasterDelayTaskExecuteRunnableDelayQueue
masterDelayTaskExecuteRunnableDelayQueue;
@@ -60,7 +57,7 @@ public class MasterTaskKillProcessor implements
MasterRpcProcessor {
return;
}
MasterTaskExecuteRunnable masterTaskExecuteRunnable =
-
masterTaskExecuteRunnableThreadPool.getMasterTaskExecuteRunnable(taskInstanceId);
+
MasterTaskExecuteRunnableHolder.getMasterTaskExecuteRunnable(taskInstanceId);
if (masterTaskExecuteRunnable == null) {
log.error("Cannot find the MasterTaskExecuteRunnable, this
task may already been killed");
return;
@@ -71,6 +68,9 @@ public class MasterTaskKillProcessor implements
MasterRpcProcessor {
.removeMasterDelayTaskExecuteRunnable(masterTaskExecuteRunnable);
} catch (MasterTaskExecuteException e) {
log.error("Cancel MasterTaskExecuteRunnable failed ", e);
+ } finally {
+
MasterTaskExecutionContextHolder.removeTaskExecutionContext(taskInstanceId);
+
MasterTaskExecuteRunnableHolder.removeMasterTaskExecuteRunnable(taskInstanceId);
}
}
}
diff --git
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/MasterTaskPauseProcessor.java
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/MasterTaskPauseProcessor.java
index 5b7176c18f..41c9c50d69 100644
---
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/MasterTaskPauseProcessor.java
+++
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/MasterTaskPauseProcessor.java
@@ -25,12 +25,11 @@ import
org.apache.dolphinscheduler.remote.command.MessageType;
import org.apache.dolphinscheduler.remote.command.task.TaskPauseRequest;
import org.apache.dolphinscheduler.remote.processor.MasterRpcProcessor;
import
org.apache.dolphinscheduler.server.master.exception.MasterTaskExecuteException;
-import
org.apache.dolphinscheduler.server.master.runner.MasterTaskExecuteRunnableThreadPool;
import
org.apache.dolphinscheduler.server.master.runner.execute.MasterTaskExecuteRunnable;
+import
org.apache.dolphinscheduler.server.master.runner.execute.MasterTaskExecuteRunnableHolder;
import lombok.extern.slf4j.Slf4j;
-import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import io.netty.channel.Channel;
@@ -39,14 +38,11 @@ import io.netty.channel.Channel;
@Component
public class MasterTaskPauseProcessor implements MasterRpcProcessor {
- @Autowired
- private MasterTaskExecuteRunnableThreadPool
masterTaskExecuteRunnableThreadPool;
-
@Override
public void process(Channel channel, Message message) {
TaskPauseRequest taskPauseRequest =
JSONUtils.parseObject(message.getBody(), TaskPauseRequest.class);
MasterTaskExecuteRunnable masterTaskExecuteRunnable =
-
masterTaskExecuteRunnableThreadPool.getMasterTaskExecuteRunnable(taskPauseRequest.getTaskInstanceId());
+
MasterTaskExecuteRunnableHolder.getMasterTaskExecuteRunnable(taskPauseRequest.getTaskInstanceId());
if (masterTaskExecuteRunnable == null) {
log.info("Cannot find the MasterTaskExecuteRunnable");
return;
diff --git
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterDelayTaskExecuteRunnableDelayQueueLooper.java
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterDelayTaskExecuteRunnableDelayQueueLooper.java
index 3b429e5d76..557f2ca447 100644
---
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterDelayTaskExecuteRunnableDelayQueueLooper.java
+++
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterDelayTaskExecuteRunnableDelayQueueLooper.java
@@ -19,6 +19,8 @@ package org.apache.dolphinscheduler.server.master.runner;
import org.apache.dolphinscheduler.common.thread.BaseDaemonThread;
import
org.apache.dolphinscheduler.server.master.runner.execute.MasterDelayTaskExecuteRunnable;
+import
org.apache.dolphinscheduler.server.master.runner.execute.MasterTaskExecuteRunnableHolder;
+import
org.apache.dolphinscheduler.server.master.runner.execute.MasterTaskExecuteRunnableThreadPool;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -62,6 +64,7 @@ public class MasterDelayTaskExecuteRunnableDelayQueueLooper
extends BaseDaemonTh
final MasterDelayTaskExecuteRunnable
masterDelayTaskExecuteRunnable =
masterDelayTaskExecuteRunnableDelayQueue.takeMasterDelayTaskExecuteRunnable();
masterTaskExecuteRunnableThreadPool.submitMasterTaskExecuteRunnable(masterDelayTaskExecuteRunnable);
+
MasterTaskExecuteRunnableHolder.putMasterTaskExecuteRunnable(masterDelayTaskExecuteRunnable);
} catch (InterruptedException ex) {
Thread.currentThread().interrupt();
log.warn("MasterDelayTaskExecuteRunnableDelayQueueLooper has
been interrupted, will stop loop");
diff --git
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterTaskExecuteRunnableThreadPool.java
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterTaskExecuteRunnableThreadPool.java
deleted file mode 100644
index 86c95308d8..0000000000
---
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterTaskExecuteRunnableThreadPool.java
+++ /dev/null
@@ -1,89 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.dolphinscheduler.server.master.runner;
-
-import org.apache.dolphinscheduler.common.thread.ThreadUtils;
-import org.apache.dolphinscheduler.server.master.config.MasterConfig;
-import
org.apache.dolphinscheduler.server.master.runner.execute.MasterTaskExecuteRunnable;
-
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-
-import lombok.extern.slf4j.Slf4j;
-
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.stereotype.Component;
-
-import com.google.common.util.concurrent.FutureCallback;
-import com.google.common.util.concurrent.Futures;
-import com.google.common.util.concurrent.ListenableFuture;
-import com.google.common.util.concurrent.ListeningExecutorService;
-import com.google.common.util.concurrent.MoreExecutors;
-
-@Slf4j
-@Component
-public class MasterTaskExecuteRunnableThreadPool {
-
- @Autowired
- private MasterConfig masterConfig;
-
- private static final Map<Integer, MasterTaskExecuteRunnable>
SUBMITTED_MASTER_TASK_MAP = new ConcurrentHashMap<>();
-
- private ListeningExecutorService listeningExecutorService;
-
- public synchronized void start() {
- log.info("MasterTaskExecuteRunnableThreadPool starting...");
- this.listeningExecutorService =
MoreExecutors.listeningDecorator(ThreadUtils.newDaemonFixedThreadExecutor(
- "MasterTaskExecuteRunnableThread",
masterConfig.getMasterTaskExecuteThreadPoolSize()));
- log.info("MasterTaskExecuteRunnableThreadPool started...");
- }
-
- public void submitMasterTaskExecuteRunnable(MasterTaskExecuteRunnable
masterTaskExecuteRunnable) {
- ListenableFuture<?> future =
listeningExecutorService.submit(masterTaskExecuteRunnable);
- Futures.addCallback(future, new
MasterTaskExecuteCallback(masterTaskExecuteRunnable),
- this.listeningExecutorService);
-
SUBMITTED_MASTER_TASK_MAP.put(masterTaskExecuteRunnable.getTaskExecutionContext().getTaskInstanceId(),
- masterTaskExecuteRunnable);
- }
-
- public MasterTaskExecuteRunnable getMasterTaskExecuteRunnable(Integer
taskInstanceId) {
- return SUBMITTED_MASTER_TASK_MAP.get(taskInstanceId);
- }
-
- private static class MasterTaskExecuteCallback implements FutureCallback {
-
- private MasterTaskExecuteRunnable masterTaskExecuteRunnable;
-
- public MasterTaskExecuteCallback(MasterTaskExecuteRunnable
masterTaskExecuteRunnable) {
- this.masterTaskExecuteRunnable = masterTaskExecuteRunnable;
- }
-
- @Override
- public void onSuccess(Object result) {
- log.info("MasterTaskExecuteRunnable execute success, will remove
this task");
-
SUBMITTED_MASTER_TASK_MAP.remove(masterTaskExecuteRunnable.getTaskExecutionContext().getTaskInstanceId());
- }
-
- @Override
- public void onFailure(Throwable t) {
- log.info("MasterTaskExecuteRunnable execute failed, will remove
this task");
-
SUBMITTED_MASTER_TASK_MAP.remove(masterTaskExecuteRunnable.getTaskExecutionContext().getTaskInstanceId());
- }
- }
-
-}
diff --git
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/execute/AsyncTaskCallbackFunctionImpl.java
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/execute/AsyncTaskCallbackFunctionImpl.java
index cb5d07fc20..e9d4fe4430 100644
---
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/execute/AsyncTaskCallbackFunctionImpl.java
+++
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/execute/AsyncTaskCallbackFunctionImpl.java
@@ -17,7 +17,6 @@
package org.apache.dolphinscheduler.server.master.runner.execute;
-import
org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContextCacheManager;
import org.apache.dolphinscheduler.plugin.task.api.enums.TaskExecutionStatus;
import org.apache.dolphinscheduler.plugin.task.api.log.TaskInstanceLogHeader;
@@ -54,8 +53,9 @@ public class AsyncTaskCallbackFunctionImpl implements
AsyncTaskCallbackFunction
private void executeFinished() {
TaskInstanceLogHeader.printFinalizeTaskHeader();
- TaskExecutionContextCacheManager.removeByTaskInstanceId(
-
asyncMasterDelayTaskExecuteRunnable.getTaskExecutionContext().getTaskInstanceId());
+ int taskInstanceId =
asyncMasterDelayTaskExecuteRunnable.getTaskExecutionContext().getTaskInstanceId();
+
MasterTaskExecutionContextHolder.removeTaskExecutionContext(taskInstanceId);
+
MasterTaskExecuteRunnableHolder.removeMasterTaskExecuteRunnable(taskInstanceId);
log.info("Task execute finished, removed the TaskExecutionContext");
asyncMasterDelayTaskExecuteRunnable.sendTaskResult();
log.info(
diff --git
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/execute/MasterTaskExecuteRunnable.java
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/execute/MasterTaskExecuteRunnable.java
index 973baf978e..e9a9e8dae7 100644
---
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/execute/MasterTaskExecuteRunnable.java
+++
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/execute/MasterTaskExecuteRunnable.java
@@ -22,7 +22,6 @@ import static
ch.qos.logback.classic.ClassicConstants.FINALIZE_SESSION_MARKER;
import org.apache.dolphinscheduler.common.log.remote.RemoteLogUtils;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
-import
org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContextCacheManager;
import org.apache.dolphinscheduler.plugin.task.api.enums.TaskExecutionStatus;
import org.apache.dolphinscheduler.plugin.task.api.log.TaskInstanceLogHeader;
import org.apache.dolphinscheduler.plugin.task.api.utils.LogUtils;
@@ -59,7 +58,7 @@ public abstract class MasterTaskExecuteRunnable implements
Runnable {
protected void afterThrowing(Throwable throwable) {
try {
cancelTask();
- log.info("Get a exception when execute the task, canceled the
task");
+ log.error("Get a exception when execute the task, canceled the
task", throwable);
} catch (Exception e) {
log.error("Cancel task failed,", e);
}
@@ -68,7 +67,8 @@ public abstract class MasterTaskExecuteRunnable implements
Runnable {
log.info(
"Get a exception when execute the task, sent the task execute
result to master, the current task execute result is {}",
taskExecutionContext.getCurrentExecutionStatus());
-
TaskExecutionContextCacheManager.removeByTaskInstanceId(taskExecutionContext.getTaskInstanceId());
+
MasterTaskExecutionContextHolder.removeTaskExecutionContext(taskExecutionContext.getTaskInstanceId());
+
MasterTaskExecuteRunnableHolder.removeMasterTaskExecuteRunnable(taskExecutionContext.getTaskInstanceId());
log.info("Get a exception when execute the task, removed the
TaskExecutionContext");
}
diff --git
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/execute/MasterTaskExecuteRunnableHolder.java
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/execute/MasterTaskExecuteRunnableHolder.java
new file mode 100644
index 0000000000..6b29897611
--- /dev/null
+++
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/execute/MasterTaskExecuteRunnableHolder.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.server.master.runner.execute;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+import lombok.experimental.UtilityClass;
+import lombok.extern.slf4j.Slf4j;
+
+@Slf4j
+@UtilityClass
+public class MasterTaskExecuteRunnableHolder {
+
+ private static final Map<Integer, MasterTaskExecuteRunnable>
SUBMITTED_MASTER_TASK_MAP = new ConcurrentHashMap<>();
+
+ public void putMasterTaskExecuteRunnable(MasterTaskExecuteRunnable
masterTaskExecuteRunnable) {
+
SUBMITTED_MASTER_TASK_MAP.put(masterTaskExecuteRunnable.getTaskExecutionContext().getTaskInstanceId(),
+ masterTaskExecuteRunnable);
+ }
+
+ public MasterTaskExecuteRunnable getMasterTaskExecuteRunnable(Integer
taskInstanceId) {
+ return SUBMITTED_MASTER_TASK_MAP.get(taskInstanceId);
+ }
+
+ public void removeMasterTaskExecuteRunnable(Integer taskInstanceId) {
+ SUBMITTED_MASTER_TASK_MAP.remove(taskInstanceId);
+ }
+
+}
diff --git
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/execute/MasterTaskExecuteRunnableThreadPool.java
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/execute/MasterTaskExecuteRunnableThreadPool.java
new file mode 100644
index 0000000000..4f542556e4
--- /dev/null
+++
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/execute/MasterTaskExecuteRunnableThreadPool.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.server.master.runner.execute;
+
+import org.apache.dolphinscheduler.common.thread.ThreadUtils;
+import org.apache.dolphinscheduler.server.master.config.MasterConfig;
+
+import lombok.extern.slf4j.Slf4j;
+
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Component;
+
+import com.google.common.util.concurrent.ListeningExecutorService;
+import com.google.common.util.concurrent.MoreExecutors;
+
+@Slf4j
+@Component
+public class MasterTaskExecuteRunnableThreadPool {
+
+ @Autowired
+ private MasterConfig masterConfig;
+
+ private ListeningExecutorService listeningExecutorService;
+
+ public synchronized void start() {
+ log.info("MasterTaskExecuteRunnableThreadPool starting...");
+ this.listeningExecutorService =
MoreExecutors.listeningDecorator(ThreadUtils.newDaemonFixedThreadExecutor(
+ "MasterTaskExecuteRunnableThread",
masterConfig.getMasterTaskExecuteThreadPoolSize()));
+ log.info("MasterTaskExecuteRunnableThreadPool started...");
+ }
+
+ public void submitMasterTaskExecuteRunnable(MasterTaskExecuteRunnable
masterTaskExecuteRunnable) {
+ listeningExecutorService.submit(masterTaskExecuteRunnable);
+ }
+
+}
diff --git
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/execute/SyncMasterDelayTaskExecuteRunnable.java
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/execute/SyncMasterDelayTaskExecuteRunnable.java
index 03d3c0ea6f..04f9321c08 100644
---
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/execute/SyncMasterDelayTaskExecuteRunnable.java
+++
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/execute/SyncMasterDelayTaskExecuteRunnable.java
@@ -55,6 +55,8 @@ public class SyncMasterDelayTaskExecuteRunnable extends
MasterDelayTaskExecuteRu
"Execute task finished, will send the task execute result to
master, the current task execute result is {}",
taskExecutionContext.getCurrentExecutionStatus().name());
closeLogAppender();
+
MasterTaskExecutionContextHolder.removeTaskExecutionContext(taskExecutionContext.getTaskInstanceId());
+
MasterTaskExecuteRunnableHolder.removeMasterTaskExecuteRunnable(taskExecutionContext.getTaskInstanceId());
}
}
diff --git
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/operator/TaskTimeoutOperator.java
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/operator/TaskTimeoutOperator.java
index 741a681d8a..17de1e9539 100644
---
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/operator/TaskTimeoutOperator.java
+++
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/operator/TaskTimeoutOperator.java
@@ -19,12 +19,9 @@ package
org.apache.dolphinscheduler.server.master.runner.operator;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.dao.repository.TaskInstanceDao;
-import org.apache.dolphinscheduler.plugin.task.api.enums.TaskExecutionStatus;
import org.apache.dolphinscheduler.plugin.task.api.enums.TaskTimeoutStrategy;
import
org.apache.dolphinscheduler.server.master.runner.execute.DefaultTaskExecuteRunnable;
-import java.util.Date;
-
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
@@ -51,9 +48,8 @@ public class TaskTimeoutOperator implements TaskOperator {
taskInstance.getName(), taskTimeoutStrategy.name());
return;
}
- log.info("TaskInstance: {} timeout, will kill the task instance",
taskInstance.getName());
- taskInstance.setState(TaskExecutionStatus.FAILURE);
- taskInstance.setEndTime(new Date());
- taskInstanceDao.upsertTaskInstance(taskInstance);
+ taskExecuteRunnable.kill();
+ log.info("TaskInstance: {} timeout, killed the task instance",
taskInstance.getName());
+
}
}