This is an automated email from the ASF dual-hosted git repository.
wenjun pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git
The following commit(s) were added to refs/heads/dev by this push:
new 5523a62825 Remove taskQueue and looper in worker (#15292)
5523a62825 is described below
commit 5523a62825eb7168de8cb2fda564d5a923b5edd4
Author: Wenjun Ruan <[email protected]>
AuthorDate: Wed Dec 27 19:51:45 2023 +0800
Remove taskQueue and looper in worker (#15292)
---
.../common/thread/ThreadUtils.java | 6 +-
.../server/master/config/MasterConfig.java | 4 +-
...ogicITaskInstanceDispatchOperationFunction.java | 16 +-
.../LogicITaskInstanceKillOperationFunction.java | 8 +-
.../GlobalMasterTaskExecuteRunnableQueue.java | 52 ------
...GlobalMasterTaskExecuteRunnableQueueLooper.java | 84 ---------
.../runner/GlobalTaskDispatchWaitingQueue.java | 8 +-
.../GlobalTaskDispatchWaitingQueueLooper.java | 16 +-
.../master/runner/MasterTaskExecutorBootstrap.java | 6 -
.../runner/dispatcher/TaskDispatchFactory.java | 5 +
.../execute/AsyncMasterTaskDelayQueueLooper.java | 15 +-
.../IMasterTaskExecutorThreadPool.java} | 24 +--
.../execute/MasterAsyncTaskExecutorThreadPool.java | 58 ++++++
....java => MasterSyncTaskExecutorThreadPool.java} | 34 ++--
.../MasterTaskExecutorThreadPoolManager.java | 59 +++++++
.../BaseTaskExecuteRunnableDispatchOperator.java | 2 +-
.../plugin/task/api/AbstractCommandExecutor.java | 16 +-
.../task/api/TaskExecutionContextCacheManager.java | 72 --------
.../plugin/task/api/k8s/impl/K8sTaskExecutor.java | 12 +-
.../plugin/task/api/k8s/K8sTaskExecutorTest.java | 4 +-
.../plugin/task/dvc/DvcTaskTest.java | 2 -
.../plugin/kubeflow/KubeflowTaskTest.java | 2 -
.../plugin/task/mlflow/MlflowTaskTest.java | 2 -
.../plugin/task/pytorch/PytorchTaskTest.java | 2 -
.../server/worker/WorkerServer.java | 38 ++--
.../server/worker/message/MessageRetryRunner.java | 13 +-
.../worker/registry/WorkerRegistryClient.java | 6 +-
.../worker/registry/WorkerWaitingStrategy.java | 11 +-
.../rpc/StreamingTaskInstanceOperatorImpl.java | 19 +-
.../server/worker/rpc/WorkerLogServiceImpl.java | 14 +-
.../runner/GlobalTaskInstanceWaitingQueue.java | 70 --------
.../GlobalTaskInstanceWaitingQueueLooper.java | 101 -----------
.../server/worker/runner/TaskCallbackImpl.java | 18 +-
.../server/worker/runner/WorkerExecService.java | 91 ----------
.../server/worker/runner/WorkerManagerThread.java | 150 ----------------
.../server/worker/runner/WorkerTaskExecutor.java | 9 +-
.../runner/WorkerTaskExecutorFactoryBuilder.java | 33 ++--
.../worker/runner/WorkerTaskExecutorHolder.java | 54 ++++++
.../runner/WorkerTaskExecutorThreadPool.java | 96 ++++++++++
.../TaskInstanceDispatchOperationFunction.java | 20 ++-
.../TaskInstanceKillOperationFunction.java | 37 ++--
.../UpdateWorkflowHostOperationFunction.java | 33 ++--
.../worker/registry/WorkerRegistryClientTest.java | 4 +-
.../runner/WorkerTaskExecutorThreadPoolTest.java | 194 +++++++++++++++++++++
44 files changed, 651 insertions(+), 869 deletions(-)
diff --git
a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/thread/ThreadUtils.java
b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/thread/ThreadUtils.java
index 3b66552274..5eef04ed82 100644
---
a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/thread/ThreadUtils.java
+++
b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/thread/ThreadUtils.java
@@ -17,10 +17,10 @@
package org.apache.dolphinscheduler.common.thread;
-import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.ThreadPoolExecutor;
import lombok.experimental.UtilityClass;
import lombok.extern.slf4j.Slf4j;
@@ -38,9 +38,9 @@ public class ThreadUtils {
* @param threadsNum threadsNum
* @return ExecutorService
*/
- public static ExecutorService newDaemonFixedThreadExecutor(String
threadName, int threadsNum) {
+ public static ThreadPoolExecutor newDaemonFixedThreadExecutor(String
threadName, int threadsNum) {
ThreadFactory threadFactory = new
ThreadFactoryBuilder().setDaemon(true).setNameFormat(threadName).build();
- return Executors.newFixedThreadPool(threadsNum, threadFactory);
+ return (ThreadPoolExecutor) Executors.newFixedThreadPool(threadsNum,
threadFactory);
}
public static ScheduledExecutorService
newSingleDaemonScheduledExecutorService(String threadName) {
diff --git
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/config/MasterConfig.java
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/config/MasterConfig.java
index f1b8485c69..d7f10f03fb 100644
---
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/config/MasterConfig.java
+++
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/config/MasterConfig.java
@@ -64,9 +64,9 @@ public class MasterConfig implements Validator {
private int execThreads = 10;
// todo: change to sync thread pool/ async thread pool ?
- private int masterTaskExecuteThreadPoolSize =
Runtime.getRuntime().availableProcessors();
+ private int masterSyncTaskExecutorThreadPoolSize =
Runtime.getRuntime().availableProcessors();
- private int masterAsyncTaskStateCheckThreadPoolSize =
Runtime.getRuntime().availableProcessors();
+ private int masterAsyncTaskExecutorThreadPoolSize =
Runtime.getRuntime().availableProcessors();
/**
* The task dispatch thread pool size.
*/
diff --git
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/rpc/LogicITaskInstanceDispatchOperationFunction.java
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/rpc/LogicITaskInstanceDispatchOperationFunction.java
index f69ff4fb77..7590708208 100644
---
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/rpc/LogicITaskInstanceDispatchOperationFunction.java
+++
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/rpc/LogicITaskInstanceDispatchOperationFunction.java
@@ -21,10 +21,10 @@ import
org.apache.dolphinscheduler.extract.master.transportor.LogicTaskDispatchR
import
org.apache.dolphinscheduler.extract.master.transportor.LogicTaskDispatchResponse;
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
import org.apache.dolphinscheduler.plugin.task.api.utils.LogUtils;
-import
org.apache.dolphinscheduler.server.master.runner.GlobalMasterTaskExecuteRunnableQueue;
import
org.apache.dolphinscheduler.server.master.runner.execute.MasterTaskExecutionContextHolder;
import
org.apache.dolphinscheduler.server.master.runner.execute.MasterTaskExecutor;
import
org.apache.dolphinscheduler.server.master.runner.execute.MasterTaskExecutorFactoryBuilder;
+import
org.apache.dolphinscheduler.server.master.runner.execute.MasterTaskExecutorThreadPoolManager;
import lombok.extern.slf4j.Slf4j;
@@ -41,7 +41,7 @@ public class LogicITaskInstanceDispatchOperationFunction
private MasterTaskExecutorFactoryBuilder masterTaskExecutorFactoryBuilder;
@Autowired
- private GlobalMasterTaskExecuteRunnableQueue
globalMasterTaskExecuteRunnableQueue;
+ private MasterTaskExecutorThreadPoolManager masterTaskExecutorThreadPool;
@Override
public LogicTaskDispatchResponse operate(LogicTaskDispatchRequest
taskDispatchRequest) {
@@ -62,16 +62,12 @@ public class LogicITaskInstanceDispatchOperationFunction
MasterTaskExecutor masterTaskExecutor =
masterTaskExecutorFactoryBuilder
.createMasterTaskExecutorFactory(taskExecutionContext.getTaskType())
.createMasterTaskExecutor(taskExecutionContext);
- if (globalMasterTaskExecuteRunnableQueue
- .submitMasterTaskExecuteRunnable(masterTaskExecutor)) {
- log.info("Submit LogicTask: {} to
MasterDelayTaskExecuteRunnableDelayQueue success", taskInstanceName);
+ if
(masterTaskExecutorThreadPool.submitMasterTaskExecutor(masterTaskExecutor)) {
+ log.info("Submit LogicTask: {} to MasterTaskExecutorThreadPool
success", taskInstanceName);
return LogicTaskDispatchResponse.success(taskInstanceId);
} else {
- log.error(
- "Submit LogicTask: {} to
MasterDelayTaskExecuteRunnableDelayQueue failed, current task waiting queue
size: {} is full",
- taskInstanceName,
globalMasterTaskExecuteRunnableQueue.size());
- return LogicTaskDispatchResponse.failed(taskInstanceId,
- "MasterDelayTaskExecuteRunnableDelayQueue is full");
+ log.error("Submit LogicTask: {} to
MasterTaskExecutorThreadPool failed", taskInstanceName);
+ return LogicTaskDispatchResponse.failed(taskInstanceId,
"MasterTaskExecutorThreadPool is full");
}
} finally {
LogUtils.removeWorkflowAndTaskInstanceIdMDC();
diff --git
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/rpc/LogicITaskInstanceKillOperationFunction.java
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/rpc/LogicITaskInstanceKillOperationFunction.java
index 27fc52333d..121c5ae6e1 100644
---
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/rpc/LogicITaskInstanceKillOperationFunction.java
+++
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/rpc/LogicITaskInstanceKillOperationFunction.java
@@ -21,10 +21,10 @@ import
org.apache.dolphinscheduler.extract.master.transportor.LogicTaskKillReque
import
org.apache.dolphinscheduler.extract.master.transportor.LogicTaskKillResponse;
import org.apache.dolphinscheduler.plugin.task.api.utils.LogUtils;
import
org.apache.dolphinscheduler.server.master.exception.MasterTaskExecuteException;
-import
org.apache.dolphinscheduler.server.master.runner.GlobalMasterTaskExecuteRunnableQueue;
import
org.apache.dolphinscheduler.server.master.runner.execute.MasterTaskExecutionContextHolder;
import
org.apache.dolphinscheduler.server.master.runner.execute.MasterTaskExecutor;
import
org.apache.dolphinscheduler.server.master.runner.execute.MasterTaskExecutorHolder;
+import
org.apache.dolphinscheduler.server.master.runner.execute.MasterTaskExecutorThreadPoolManager;
import lombok.extern.slf4j.Slf4j;
@@ -38,7 +38,7 @@ public class LogicITaskInstanceKillOperationFunction
ITaskInstanceOperationFunction<LogicTaskKillRequest,
LogicTaskKillResponse> {
@Autowired
- private GlobalMasterTaskExecuteRunnableQueue
globalMasterTaskExecuteRunnableQueue;
+ private MasterTaskExecutorThreadPoolManager masterTaskExecutorThreadPool;
@Override
public LogicTaskKillResponse operate(LogicTaskKillRequest taskKillRequest)
{
@@ -54,8 +54,8 @@ public class LogicITaskInstanceKillOperationFunction
}
try {
masterTaskExecutor.cancelTask();
- globalMasterTaskExecuteRunnableQueue
- .removeMasterTaskExecuteRunnable(masterTaskExecutor);
+ // todo: if we remove success then we don't need to cancel?
+
masterTaskExecutorThreadPool.removeMasterTaskExecutor(masterTaskExecutor);
return LogicTaskKillResponse.success();
} catch (MasterTaskExecuteException e) {
log.error("Cancel MasterTaskExecuteRunnable failed ", e);
diff --git
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/GlobalMasterTaskExecuteRunnableQueue.java
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/GlobalMasterTaskExecuteRunnableQueue.java
deleted file mode 100644
index 9005416b92..0000000000
---
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/GlobalMasterTaskExecuteRunnableQueue.java
+++ /dev/null
@@ -1,52 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.dolphinscheduler.server.master.runner;
-
-import
org.apache.dolphinscheduler.server.master.runner.execute.MasterTaskExecutor;
-
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.LinkedBlockingQueue;
-
-import org.springframework.stereotype.Component;
-
-/**
- *
- */
-@Component
-public class GlobalMasterTaskExecuteRunnableQueue {
-
- private final BlockingQueue<MasterTaskExecutor>
masterTaskExecutorBlockingQueue =
- new LinkedBlockingQueue<>();
-
- public boolean submitMasterTaskExecuteRunnable(MasterTaskExecutor
masterTaskExecutor) {
- return masterTaskExecutorBlockingQueue.offer(masterTaskExecutor);
- }
-
- public MasterTaskExecutor takeMasterTaskExecuteRunnable() throws
InterruptedException {
- return masterTaskExecutorBlockingQueue.take();
- }
-
- public boolean removeMasterTaskExecuteRunnable(MasterTaskExecutor
masterTaskExecutor) {
- return masterTaskExecutorBlockingQueue.remove(masterTaskExecutor);
- }
-
- public int size() {
- return masterTaskExecutorBlockingQueue.size();
- }
-
-}
diff --git
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/GlobalMasterTaskExecuteRunnableQueueLooper.java
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/GlobalMasterTaskExecuteRunnableQueueLooper.java
deleted file mode 100644
index e0b1f80704..0000000000
---
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/GlobalMasterTaskExecuteRunnableQueueLooper.java
+++ /dev/null
@@ -1,84 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.dolphinscheduler.server.master.runner;
-
-import org.apache.dolphinscheduler.common.thread.BaseDaemonThread;
-import
org.apache.dolphinscheduler.server.master.runner.execute.MasterTaskExecutor;
-import
org.apache.dolphinscheduler.server.master.runner.execute.MasterTaskExecutorHolder;
-import
org.apache.dolphinscheduler.server.master.runner.execute.MasterTaskExecutorThreadPool;
-
-import java.util.concurrent.atomic.AtomicBoolean;
-
-import lombok.extern.slf4j.Slf4j;
-
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.stereotype.Component;
-
-@Slf4j
-@Component
-public class GlobalMasterTaskExecuteRunnableQueueLooper extends
BaseDaemonThread implements AutoCloseable {
-
- @Autowired
- private GlobalMasterTaskExecuteRunnableQueue
globalMasterTaskExecuteRunnableQueue;
-
- @Autowired
- private MasterTaskExecutorThreadPool masterTaskExecutorThreadPool;
-
- private final AtomicBoolean RUNNING_FLAG = new AtomicBoolean(false);
-
- public GlobalMasterTaskExecuteRunnableQueueLooper() {
- super("MasterDelayTaskExecuteRunnableDelayQueueLooper");
- }
-
- @Override
- public synchronized void start() {
- if (!RUNNING_FLAG.compareAndSet(false, true)) {
- log.error("The MasterDelayTaskExecuteRunnableDelayQueueLooper
already started, will not start again");
- return;
- }
- log.info("MasterDelayTaskExecuteRunnableDelayQueueLooper starting...");
- super.start();
- masterTaskExecutorThreadPool.start();
- log.info("MasterDelayTaskExecuteRunnableDelayQueueLooper started...");
- }
-
- @Override
- public void run() {
- while (RUNNING_FLAG.get()) {
- try {
- final MasterTaskExecutor masterTaskExecutor =
-
globalMasterTaskExecuteRunnableQueue.takeMasterTaskExecuteRunnable();
-
masterTaskExecutorThreadPool.submitMasterTaskExecutor(masterTaskExecutor);
-
MasterTaskExecutorHolder.putMasterTaskExecuteRunnable(masterTaskExecutor);
- } catch (InterruptedException ex) {
- Thread.currentThread().interrupt();
- log.warn("MasterDelayTaskExecuteRunnableDelayQueueLooper has
been interrupted, will stop loop");
- break;
- }
- }
- log.info("MasterDelayTaskExecuteRunnableDelayQueueLooper stop
loop...");
- }
-
- @Override
- public void close() throws Exception {
- if (RUNNING_FLAG.compareAndSet(true, false)) {
- log.info("MasterDelayTaskExecuteRunnableDelayQueueLooper
stopping...");
- log.info("MasterDelayTaskExecuteRunnableDelayQueueLooper
stopped...");
- }
- }
-}
diff --git
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/GlobalTaskDispatchWaitingQueue.java
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/GlobalTaskDispatchWaitingQueue.java
index f4d50537c6..7e0d683571 100644
---
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/GlobalTaskDispatchWaitingQueue.java
+++
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/GlobalTaskDispatchWaitingQueue.java
@@ -23,17 +23,21 @@ import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
+/**
+ * The class is used to store {@link TaskExecuteRunnable} which needs to be
dispatched. The {@link TaskExecuteRunnable} will be stored in a {@link
DelayQueue},
+ * if the {@link TaskExecuteRunnable}'s delay time is 0, then it will be
consumed by {@link GlobalTaskDispatchWaitingQueueLooper}.
+ */
@Slf4j
@Component
public class GlobalTaskDispatchWaitingQueue {
private final DelayQueue<DefaultTaskExecuteRunnable> queue = new
DelayQueue<>();
- public void
submitNeedToDispatchTaskExecuteRunnable(DefaultTaskExecuteRunnable
priorityTaskExecuteRunnable) {
+ public void submitTaskExecuteRunnable(DefaultTaskExecuteRunnable
priorityTaskExecuteRunnable) {
queue.put(priorityTaskExecuteRunnable);
}
- public DefaultTaskExecuteRunnable takeNeedToDispatchTaskExecuteRunnable()
throws InterruptedException {
+ public DefaultTaskExecuteRunnable takeTaskExecuteRunnable() throws
InterruptedException {
return queue.take();
}
diff --git
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/GlobalTaskDispatchWaitingQueueLooper.java
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/GlobalTaskDispatchWaitingQueueLooper.java
index b496bea5a5..a1f4b28783 100644
---
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/GlobalTaskDispatchWaitingQueueLooper.java
+++
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/GlobalTaskDispatchWaitingQueueLooper.java
@@ -42,7 +42,7 @@ public class GlobalTaskDispatchWaitingQueueLooper extends
BaseDaemonThread imple
private final AtomicBoolean RUNNING_FLAG = new AtomicBoolean(false);
- private final AtomicInteger DISPATCHED_TIMES = new AtomicInteger();
+ private final AtomicInteger DISPATCHED_CONSECUTIVE_FAILURE_TIMES = new
AtomicInteger();
private static final Integer MAX_DISPATCHED_FAILED_TIMES = 100;
@@ -66,24 +66,24 @@ public class GlobalTaskDispatchWaitingQueueLooper extends
BaseDaemonThread imple
DefaultTaskExecuteRunnable defaultTaskExecuteRunnable;
while (RUNNING_FLAG.get()) {
try {
- defaultTaskExecuteRunnable =
globalTaskDispatchWaitingQueue.takeNeedToDispatchTaskExecuteRunnable();
+ defaultTaskExecuteRunnable =
globalTaskDispatchWaitingQueue.takeTaskExecuteRunnable();
} catch (InterruptedException e) {
log.warn("Get waiting dispatch task failed, the current thread
has been interrupted, will stop loop");
Thread.currentThread().interrupt();
break;
}
try {
- final TaskDispatcher taskDispatcher = taskDispatchFactory
-
.getTaskDispatcher(defaultTaskExecuteRunnable.getTaskInstance().getTaskType());
+ TaskDispatcher taskDispatcher =
+
taskDispatchFactory.getTaskDispatcher(defaultTaskExecuteRunnable.getTaskInstance());
taskDispatcher.dispatchTask(defaultTaskExecuteRunnable);
- DISPATCHED_TIMES.set(0);
+ DISPATCHED_CONSECUTIVE_FAILURE_TIMES.set(0);
} catch (Exception e) {
defaultTaskExecuteRunnable.getTaskExecutionContext().increaseDispatchFailTimes();
-
globalTaskDispatchWaitingQueue.submitNeedToDispatchTaskExecuteRunnable(defaultTaskExecuteRunnable);
- if (DISPATCHED_TIMES.incrementAndGet() >
MAX_DISPATCHED_FAILED_TIMES) {
+
globalTaskDispatchWaitingQueue.submitTaskExecuteRunnable(defaultTaskExecuteRunnable);
+ if (DISPATCHED_CONSECUTIVE_FAILURE_TIMES.incrementAndGet() >
MAX_DISPATCHED_FAILED_TIMES) {
ThreadUtils.sleep(10 * 1000L);
}
- log.error("Dispatch task failed", e);
+ log.error("Dispatch Task: {} failed",
defaultTaskExecuteRunnable.getTaskInstance().getName(), e);
}
}
log.info("GlobalTaskDispatchWaitingQueueLooper started...");
diff --git
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterTaskExecutorBootstrap.java
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterTaskExecutorBootstrap.java
index 3e99d2141c..7d3bf6940b 100644
---
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterTaskExecutorBootstrap.java
+++
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterTaskExecutorBootstrap.java
@@ -31,16 +31,12 @@ public class MasterTaskExecutorBootstrap implements
AutoCloseable {
@Autowired
private GlobalTaskDispatchWaitingQueueLooper
globalTaskDispatchWaitingQueueLooper;
- @Autowired
- private GlobalMasterTaskExecuteRunnableQueueLooper
globalMasterTaskExecuteRunnableQueueLooper;
-
@Autowired
private AsyncMasterTaskDelayQueueLooper asyncMasterTaskDelayQueueLooper;
public synchronized void start() {
log.info("MasterTaskExecutorBootstrap starting...");
globalTaskDispatchWaitingQueueLooper.start();
- globalMasterTaskExecuteRunnableQueueLooper.start();
asyncMasterTaskDelayQueueLooper.start();
log.info("MasterTaskExecutorBootstrap started...");
}
@@ -51,8 +47,6 @@ public class MasterTaskExecutorBootstrap implements
AutoCloseable {
try (
final GlobalTaskDispatchWaitingQueueLooper
globalTaskDispatchWaitingQueueLooper1 =
globalTaskDispatchWaitingQueueLooper;
- final GlobalMasterTaskExecuteRunnableQueueLooper
globalMasterTaskExecuteRunnableQueueLooper1 =
- globalMasterTaskExecuteRunnableQueueLooper;
final AsyncMasterTaskDelayQueueLooper
asyncMasterTaskDelayQueueLooper1 =
asyncMasterTaskDelayQueueLooper) {
// closed the resource
diff --git
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/dispatcher/TaskDispatchFactory.java
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/dispatcher/TaskDispatchFactory.java
index 1979b48de5..52469fb54c 100644
---
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/dispatcher/TaskDispatchFactory.java
+++
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/dispatcher/TaskDispatchFactory.java
@@ -17,6 +17,7 @@
package org.apache.dolphinscheduler.server.master.runner.dispatcher;
+import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.server.master.utils.TaskUtils;
import lombok.extern.slf4j.Slf4j;
@@ -38,4 +39,8 @@ public class TaskDispatchFactory {
return TaskUtils.isMasterTask(taskType) ? masterTaskDispatcher :
workerTaskDispatcher;
}
+ public TaskDispatcher getTaskDispatcher(TaskInstance taskInstance) {
+ return getTaskDispatcher(taskInstance.getTaskType());
+ }
+
}
diff --git
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/execute/AsyncMasterTaskDelayQueueLooper.java
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/execute/AsyncMasterTaskDelayQueueLooper.java
index 63b8636be6..86b711f245 100644
---
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/execute/AsyncMasterTaskDelayQueueLooper.java
+++
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/execute/AsyncMasterTaskDelayQueueLooper.java
@@ -18,12 +18,9 @@
package org.apache.dolphinscheduler.server.master.runner.execute;
import org.apache.dolphinscheduler.common.thread.BaseDaemonThread;
-import org.apache.dolphinscheduler.common.thread.ThreadUtils;
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
import org.apache.dolphinscheduler.plugin.task.api.utils.LogUtils;
-import org.apache.dolphinscheduler.server.master.config.MasterConfig;
-import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicBoolean;
import lombok.extern.slf4j.Slf4j;
@@ -39,12 +36,9 @@ public class AsyncMasterTaskDelayQueueLooper extends
BaseDaemonThread implements
private AsyncMasterTaskDelayQueue asyncMasterTaskDelayQueue;
@Autowired
- private MasterConfig masterConfig;
-
+ private MasterAsyncTaskExecutorThreadPool
masterAsyncTaskExecutorThreadPool;
private static final AtomicBoolean RUNNING_FLAG = new AtomicBoolean(false);
- private ExecutorService asyncTaskStateCheckThreadPool;
-
public AsyncMasterTaskDelayQueueLooper() {
super("AsyncMasterTaskDelayQueueLooper");
}
@@ -63,8 +57,6 @@ public class AsyncMasterTaskDelayQueueLooper extends
BaseDaemonThread implements
@Override
public void run() {
- asyncTaskStateCheckThreadPool =
ThreadUtils.newDaemonFixedThreadExecutor("AsyncTaskStateCheckThreadPool",
- masterConfig.getMasterAsyncTaskStateCheckThreadPoolSize());
while (RUNNING_FLAG.get()) {
AsyncTaskExecutionContext asyncTaskExecutionContext;
try {
@@ -86,7 +78,7 @@ public class AsyncMasterTaskDelayQueueLooper extends
BaseDaemonThread implements
"Cannot find the taskInstance from
TaskExecutionContextCacheManager, the task may already been killed, will stop
the async master task");
continue;
}
- asyncTaskStateCheckThreadPool.submit(() -> {
+ masterAsyncTaskExecutorThreadPool.getThreadPool().submit(() ->
{
final AsyncTaskExecuteFunction asyncTaskExecuteFunction =
asyncTaskExecutionContext.getAsyncTaskExecuteFunction();
final AsyncTaskCallbackFunction asyncTaskCallbackFunction =
@@ -131,8 +123,5 @@ public class AsyncMasterTaskDelayQueueLooper extends
BaseDaemonThread implements
log.warn("The AsyncMasterTaskDelayQueueLooper is not started, will
not close");
return;
}
- log.info("AsyncMasterTaskDelayQueueLooper closing...");
- asyncTaskStateCheckThreadPool.shutdown();
- log.info("AsyncMasterTaskDelayQueueLooper closed...");
}
}
diff --git
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/dispatcher/TaskDispatchFactory.java
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/execute/IMasterTaskExecutorThreadPool.java
similarity index 55%
copy from
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/dispatcher/TaskDispatchFactory.java
copy to
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/execute/IMasterTaskExecutorThreadPool.java
index 1979b48de5..6c8e2caa73 100644
---
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/dispatcher/TaskDispatchFactory.java
+++
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/execute/IMasterTaskExecutorThreadPool.java
@@ -15,27 +15,11 @@
* limitations under the License.
*/
-package org.apache.dolphinscheduler.server.master.runner.dispatcher;
+package org.apache.dolphinscheduler.server.master.runner.execute;
-import org.apache.dolphinscheduler.server.master.utils.TaskUtils;
+public interface IMasterTaskExecutorThreadPool<T extends MasterTaskExecutor> {
-import lombok.extern.slf4j.Slf4j;
-
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.stereotype.Component;
-
-@Slf4j
-@Component
-public class TaskDispatchFactory {
-
- @Autowired
- private MasterTaskDispatcher masterTaskDispatcher;
-
- @Autowired
- private WorkerTaskDispatcher workerTaskDispatcher;
-
- public TaskDispatcher getTaskDispatcher(String taskType) {
- return TaskUtils.isMasterTask(taskType) ? masterTaskDispatcher :
workerTaskDispatcher;
- }
+ boolean submitMasterTaskExecutor(T masterTaskExecutor);
+ boolean removeMasterTaskExecutor(T masterTaskExecutor);
}
diff --git
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/execute/MasterAsyncTaskExecutorThreadPool.java
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/execute/MasterAsyncTaskExecutorThreadPool.java
new file mode 100644
index 0000000000..868b66b6df
--- /dev/null
+++
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/execute/MasterAsyncTaskExecutorThreadPool.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.server.master.runner.execute;
+
+import org.apache.dolphinscheduler.common.thread.ThreadUtils;
+import org.apache.dolphinscheduler.server.master.config.MasterConfig;
+
+import java.util.concurrent.ThreadPoolExecutor;
+
+import lombok.extern.slf4j.Slf4j;
+
+import org.springframework.stereotype.Component;
+
+@Slf4j
+@Component
+public class MasterAsyncTaskExecutorThreadPool implements
IMasterTaskExecutorThreadPool<AsyncMasterTaskExecutor> {
+
+ private final ThreadPoolExecutor threadPoolExecutor;
+
+ public MasterAsyncTaskExecutorThreadPool(MasterConfig masterConfig) {
+ this.threadPoolExecutor =
ThreadUtils.newDaemonFixedThreadExecutor("MasterAsyncTaskExecutorThreadPool",
+ masterConfig.getMasterSyncTaskExecutorThreadPoolSize());
+ }
+
+ @Override
+ public boolean submitMasterTaskExecutor(AsyncMasterTaskExecutor
asyncMasterTaskExecutor) {
+ synchronized (MasterAsyncTaskExecutorThreadPool.class) {
+ // todo: check if the thread pool is overload
+ threadPoolExecutor.submit(asyncMasterTaskExecutor);
+ return true;
+ }
+ }
+
+ @Override
+ public boolean removeMasterTaskExecutor(AsyncMasterTaskExecutor
asyncMasterTaskExecutor) {
+ return threadPoolExecutor.remove(asyncMasterTaskExecutor);
+ }
+
+ // todo: remove this method, it's not a good idea to expose the
ThreadPoolExecutor to out side.
+ ThreadPoolExecutor getThreadPool() {
+ return threadPoolExecutor;
+ }
+}
diff --git
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/execute/MasterTaskExecutorThreadPool.java
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/execute/MasterSyncTaskExecutorThreadPool.java
similarity index 53%
rename from
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/execute/MasterTaskExecutorThreadPool.java
rename to
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/execute/MasterSyncTaskExecutorThreadPool.java
index d61d058d22..3f683076e6 100644
---
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/execute/MasterTaskExecutorThreadPool.java
+++
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/execute/MasterSyncTaskExecutorThreadPool.java
@@ -20,32 +20,34 @@ package
org.apache.dolphinscheduler.server.master.runner.execute;
import org.apache.dolphinscheduler.common.thread.ThreadUtils;
import org.apache.dolphinscheduler.server.master.config.MasterConfig;
+import java.util.concurrent.ThreadPoolExecutor;
+
import lombok.extern.slf4j.Slf4j;
-import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
-import com.google.common.util.concurrent.ListeningExecutorService;
-import com.google.common.util.concurrent.MoreExecutors;
-
@Slf4j
@Component
-public class MasterTaskExecutorThreadPool {
+public class MasterSyncTaskExecutorThreadPool implements
IMasterTaskExecutorThreadPool<SyncMasterTaskExecutor> {
- @Autowired
- private MasterConfig masterConfig;
+ private final ThreadPoolExecutor threadPoolExecutor;
- private ListeningExecutorService listeningExecutorService;
-
- public synchronized void start() {
- log.info("MasterTaskExecuteRunnableThreadPool starting...");
- this.listeningExecutorService =
MoreExecutors.listeningDecorator(ThreadUtils.newDaemonFixedThreadExecutor(
- "MasterTaskExecuteRunnableThread",
masterConfig.getMasterTaskExecuteThreadPoolSize()));
- log.info("MasterTaskExecuteRunnableThreadPool started...");
+ public MasterSyncTaskExecutorThreadPool(MasterConfig masterConfig) {
+ this.threadPoolExecutor =
ThreadUtils.newDaemonFixedThreadExecutor("MasterSyncTaskExecutorThreadPool",
+ masterConfig.getMasterSyncTaskExecutorThreadPoolSize());
}
- public void submitMasterTaskExecutor(MasterTaskExecutor
masterTaskExecutor) {
- listeningExecutorService.submit(masterTaskExecutor);
+ @Override
+ public boolean submitMasterTaskExecutor(SyncMasterTaskExecutor
syncMasterTaskExecutor) {
+ synchronized (MasterSyncTaskExecutorThreadPool.class) {
+ // todo: check if the thread pool is overload
+ threadPoolExecutor.submit(syncMasterTaskExecutor);
+ return true;
+ }
}
+ @Override
+ public boolean removeMasterTaskExecutor(SyncMasterTaskExecutor
syncMasterTaskExecutor) {
+ return threadPoolExecutor.remove(syncMasterTaskExecutor);
+ }
}
diff --git
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/execute/MasterTaskExecutorThreadPoolManager.java
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/execute/MasterTaskExecutorThreadPoolManager.java
new file mode 100644
index 0000000000..1f4a916897
--- /dev/null
+++
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/execute/MasterTaskExecutorThreadPoolManager.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.server.master.runner.execute;
+
+import lombok.extern.slf4j.Slf4j;
+
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Component;
+
+@Slf4j
+@Component
+public class MasterTaskExecutorThreadPoolManager {
+
+ @Autowired
+ private MasterSyncTaskExecutorThreadPool masterSyncTaskExecutorThreadPool;
+
+ @Autowired
+ private MasterAsyncTaskExecutorThreadPool
masterAsyncTaskExecutorThreadPool;
+
+ public boolean submitMasterTaskExecutor(MasterTaskExecutor
masterTaskExecutor) {
+ if (masterTaskExecutor instanceof SyncMasterTaskExecutor) {
+ return masterSyncTaskExecutorThreadPool
+ .submitMasterTaskExecutor((SyncMasterTaskExecutor)
masterTaskExecutor);
+ }
+ if (masterTaskExecutor instanceof AsyncMasterTaskExecutor) {
+ return masterAsyncTaskExecutorThreadPool
+ .submitMasterTaskExecutor((AsyncMasterTaskExecutor)
masterTaskExecutor);
+ }
+ throw new IllegalArgumentException("Unknown type of
MasterTaskExecutor: " + masterTaskExecutor);
+ }
+
+ public boolean removeMasterTaskExecutor(MasterTaskExecutor
masterTaskExecutor) {
+ if (masterTaskExecutor instanceof SyncMasterTaskExecutor) {
+ return masterSyncTaskExecutorThreadPool
+ .removeMasterTaskExecutor((SyncMasterTaskExecutor)
masterTaskExecutor);
+ }
+ if (masterTaskExecutor instanceof AsyncMasterTaskExecutor) {
+ return masterAsyncTaskExecutorThreadPool
+ .removeMasterTaskExecutor((AsyncMasterTaskExecutor)
masterTaskExecutor);
+ }
+ throw new IllegalArgumentException("Unknown type of
MasterTaskExecutor: " + masterTaskExecutor);
+ }
+
+}
diff --git
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/operator/BaseTaskExecuteRunnableDispatchOperator.java
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/operator/BaseTaskExecuteRunnableDispatchOperator.java
index 6f0419ae97..cb2c7a0e07 100644
---
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/operator/BaseTaskExecuteRunnableDispatchOperator.java
+++
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/operator/BaseTaskExecuteRunnableDispatchOperator.java
@@ -52,6 +52,6 @@ public abstract class BaseTaskExecuteRunnableDispatchOperator
implements TaskExe
taskInstance.getName(),
taskInstance.getDelayTime(), remainTime);
}
-
globalTaskDispatchWaitingQueue.submitNeedToDispatchTaskExecuteRunnable(taskExecuteRunnable);
+
globalTaskDispatchWaitingQueue.submitTaskExecuteRunnable(taskExecuteRunnable);
}
}
diff --git
a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/AbstractCommandExecutor.java
b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/AbstractCommandExecutor.java
index 8aa4c36af2..4203516f42 100644
---
a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/AbstractCommandExecutor.java
+++
b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/AbstractCommandExecutor.java
@@ -105,13 +105,7 @@ public abstract class AbstractCommandExecutor {
TaskCallBack taskCallBack) throws Exception {
TaskResponse result = new TaskResponse();
int taskInstanceId = taskRequest.getTaskInstanceId();
- if (null ==
TaskExecutionContextCacheManager.getByTaskInstanceId(taskInstanceId)) {
- log.warn(
- "Cannot find the taskInstance: {} from
TaskExecutionContextCacheManager, the task might already been killed",
- taskInstanceId);
- result.setExitStatusCode(EXIT_CODE_KILL);
- return result;
- }
+ // todo: we need to use state like JDK Thread to make sure the killed
task should not be executed
iShellInterceptorBuilder = iShellInterceptorBuilder
.shellDirectory(taskRequest.getExecutePath())
.shellName(taskRequest.getTaskAppId());
@@ -155,13 +149,7 @@ public abstract class AbstractCommandExecutor {
// cache processId
taskRequest.setProcessId(processId);
- boolean updateTaskExecutionContextStatus =
-
TaskExecutionContextCacheManager.updateTaskExecutionContext(taskRequest);
- if (Boolean.FALSE.equals(updateTaskExecutionContextStatus)) {
- result.setExitStatusCode(EXIT_CODE_KILL);
- cancelApplication();
- return result;
- }
+
// print process id
log.info("process start, process id is: {}", processId);
diff --git
a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/TaskExecutionContextCacheManager.java
b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/TaskExecutionContextCacheManager.java
deleted file mode 100644
index dbe39d7042..0000000000
---
a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/TaskExecutionContextCacheManager.java
+++ /dev/null
@@ -1,72 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.dolphinscheduler.plugin.task.api;
-
-import java.util.Collection;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-
-public class TaskExecutionContextCacheManager {
-
- private TaskExecutionContextCacheManager() {
- throw new IllegalStateException("Utility class");
- }
-
- /**
- * taskInstance cache
- */
- private static final Map<Integer, TaskExecutionContext>
taskRequestContextCache = new ConcurrentHashMap<>();
-
- /**
- * get taskInstance by taskInstance id
- *
- * @param taskInstanceId taskInstanceId
- * @return taskInstance
- */
-
- public static TaskExecutionContext getByTaskInstanceId(Integer
taskInstanceId) {
- return taskRequestContextCache.get(taskInstanceId);
- }
-
- /**
- * cache taskInstance
- *
- * @param request request
- */
- public static void cacheTaskExecutionContext(TaskExecutionContext request)
{
- taskRequestContextCache.put(request.getTaskInstanceId(), request);
- }
-
- /**
- * remove taskInstance by taskInstanceId
- *
- * @param taskInstanceId taskInstanceId
- */
- public static void removeByTaskInstanceId(Integer taskInstanceId) {
- taskRequestContextCache.remove(taskInstanceId);
- }
-
- public static boolean updateTaskExecutionContext(TaskExecutionContext
request) {
- taskRequestContextCache.computeIfPresent(request.getTaskInstanceId(),
(k, v) -> request);
- return
taskRequestContextCache.containsKey(request.getTaskInstanceId());
- }
-
- public static Collection<TaskExecutionContext> getAllTaskRequestList() {
- return taskRequestContextCache.values();
- }
-}
diff --git
a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/k8s/impl/K8sTaskExecutor.java
b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/k8s/impl/K8sTaskExecutor.java
index f5f794859b..1ce6b12c22 100644
---
a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/k8s/impl/K8sTaskExecutor.java
+++
b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/k8s/impl/K8sTaskExecutor.java
@@ -21,7 +21,6 @@ import static java.util.Collections.singletonList;
import static
org.apache.dolphinscheduler.plugin.task.api.TaskConstants.API_VERSION;
import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.CPU;
import static
org.apache.dolphinscheduler.plugin.task.api.TaskConstants.EXIT_CODE_FAILURE;
-import static
org.apache.dolphinscheduler.plugin.task.api.TaskConstants.EXIT_CODE_KILL;
import static
org.apache.dolphinscheduler.plugin.task.api.TaskConstants.EXIT_CODE_SUCCESS;
import static
org.apache.dolphinscheduler.plugin.task.api.TaskConstants.JOB_TTL_SECONDS;
import static
org.apache.dolphinscheduler.plugin.task.api.TaskConstants.LAYER_LABEL;
@@ -39,7 +38,6 @@ import
org.apache.dolphinscheduler.plugin.task.api.K8sTaskExecutionContext;
import org.apache.dolphinscheduler.plugin.task.api.TaskConstants;
import org.apache.dolphinscheduler.plugin.task.api.TaskException;
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
-import
org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContextCacheManager;
import org.apache.dolphinscheduler.plugin.task.api.enums.TaskTimeoutStrategy;
import org.apache.dolphinscheduler.plugin.task.api.k8s.AbstractK8sTaskExecutor;
import org.apache.dolphinscheduler.plugin.task.api.k8s.K8sTaskMainParameters;
@@ -284,12 +282,7 @@ public class K8sTaskExecutor extends
AbstractK8sTaskExecutor {
TaskResponse result = new TaskResponse();
int taskInstanceId = taskRequest.getTaskInstanceId();
try {
- if (null ==
TaskExecutionContextCacheManager.getByTaskInstanceId(taskInstanceId)) {
- result.setExitStatusCode(EXIT_CODE_KILL);
- return result;
- }
if (StringUtils.isEmpty(k8sParameterStr)) {
-
TaskExecutionContextCacheManager.removeByTaskInstanceId(taskInstanceId);
return result;
}
K8sTaskExecutionContext k8sTaskExecutionContext =
taskRequest.getK8sTaskExecutionContext();
@@ -371,10 +364,7 @@ public class K8sTaskExecutor extends
AbstractK8sTaskExecutor {
public void setTaskStatus(int jobStatus, String taskInstanceId,
TaskResponse taskResponse) {
if (jobStatus == EXIT_CODE_SUCCESS || jobStatus == EXIT_CODE_FAILURE) {
- if (null ==
TaskExecutionContextCacheManager.getByTaskInstanceId(Integer.valueOf(taskInstanceId)))
{
- log.info("[K8sJobExecutor-{}] killed",
job.getMetadata().getName());
- taskResponse.setExitStatusCode(EXIT_CODE_KILL);
- } else if (jobStatus == EXIT_CODE_SUCCESS) {
+ if (jobStatus == EXIT_CODE_SUCCESS) {
log.info("[K8sJobExecutor-{}] succeed in k8s",
job.getMetadata().getName());
taskResponse.setExitStatusCode(EXIT_CODE_SUCCESS);
} else {
diff --git
a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/test/java/org/apache/dolphinscheduler/plugin/task/api/k8s/K8sTaskExecutorTest.java
b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/test/java/org/apache/dolphinscheduler/plugin/task/api/k8s/K8sTaskExecutorTest.java
index 4bbd29d18e..1e7629acce 100644
---
a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/test/java/org/apache/dolphinscheduler/plugin/task/api/k8s/K8sTaskExecutorTest.java
+++
b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/test/java/org/apache/dolphinscheduler/plugin/task/api/k8s/K8sTaskExecutorTest.java
@@ -17,8 +17,6 @@
package org.apache.dolphinscheduler.plugin.task.api.k8s;
-import static
org.apache.dolphinscheduler.plugin.task.api.TaskConstants.EXIT_CODE_KILL;
-
import org.apache.dolphinscheduler.plugin.task.api.TaskException;
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
import org.apache.dolphinscheduler.plugin.task.api.k8s.impl.K8sTaskExecutor;
@@ -90,7 +88,7 @@ public class K8sTaskExecutorTest {
TaskResponse taskResponse = new TaskResponse();
k8sTaskExecutor.setJob(job);
k8sTaskExecutor.setTaskStatus(jobStatus,
String.valueOf(taskInstanceId), taskResponse);
- Assertions.assertEquals(0, Integer.compare(EXIT_CODE_KILL,
taskResponse.getExitStatusCode()));
+ Assertions.assertEquals(0, taskResponse.getExitStatusCode());
}
@Test
public void testWaitTimeoutNormal() {
diff --git
a/dolphinscheduler-task-plugin/dolphinscheduler-task-dvc/src/test/java/org/apache/dolphinscheduler/plugin/task/dvc/DvcTaskTest.java
b/dolphinscheduler-task-plugin/dolphinscheduler-task-dvc/src/test/java/org/apache/dolphinscheduler/plugin/task/dvc/DvcTaskTest.java
index 3f87271015..204b024e19 100644
---
a/dolphinscheduler-task-plugin/dolphinscheduler-task-dvc/src/test/java/org/apache/dolphinscheduler/plugin/task/dvc/DvcTaskTest.java
+++
b/dolphinscheduler-task-plugin/dolphinscheduler-task-dvc/src/test/java/org/apache/dolphinscheduler/plugin/task/dvc/DvcTaskTest.java
@@ -19,7 +19,6 @@ package org.apache.dolphinscheduler.plugin.task.dvc;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
-import
org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContextCacheManager;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
@@ -35,7 +34,6 @@ public class DvcTaskTest {
TaskExecutionContext taskExecutionContext =
Mockito.mock(TaskExecutionContext.class);
Mockito.when(taskExecutionContext.getTaskParams()).thenReturn(parameters);
-
TaskExecutionContextCacheManager.cacheTaskExecutionContext(taskExecutionContext);
return taskExecutionContext;
}
diff --git
a/dolphinscheduler-task-plugin/dolphinscheduler-task-kubeflow/src/test/java/org/apache/dolphinscheduler/plugin/kubeflow/KubeflowTaskTest.java
b/dolphinscheduler-task-plugin/dolphinscheduler-task-kubeflow/src/test/java/org/apache/dolphinscheduler/plugin/kubeflow/KubeflowTaskTest.java
index e6df60491d..0f70b67f75 100644
---
a/dolphinscheduler-task-plugin/dolphinscheduler-task-kubeflow/src/test/java/org/apache/dolphinscheduler/plugin/kubeflow/KubeflowTaskTest.java
+++
b/dolphinscheduler-task-plugin/dolphinscheduler-task-kubeflow/src/test/java/org/apache/dolphinscheduler/plugin/kubeflow/KubeflowTaskTest.java
@@ -21,7 +21,6 @@ import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.plugin.task.api.TaskConstants;
import org.apache.dolphinscheduler.plugin.task.api.TaskException;
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
-import
org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContextCacheManager;
import java.io.File;
import java.io.IOException;
@@ -127,7 +126,6 @@ public class KubeflowTaskTest {
}
Mockito.when(taskExecutionContext.getK8sTaskExecutionContext().getConfigYaml())
.thenReturn(kubeflowParameters.getClusterYAML());
-
TaskExecutionContextCacheManager.cacheTaskExecutionContext(taskExecutionContext);
return taskExecutionContext;
}
diff --git
a/dolphinscheduler-task-plugin/dolphinscheduler-task-mlflow/src/test/java/org/apache/dolphinler/plugin/task/mlflow/MlflowTaskTest.java
b/dolphinscheduler-task-plugin/dolphinscheduler-task-mlflow/src/test/java/org/apache/dolphinler/plugin/task/mlflow/MlflowTaskTest.java
index cf746799c6..4196e208e7 100644
---
a/dolphinscheduler-task-plugin/dolphinscheduler-task-mlflow/src/test/java/org/apache/dolphinler/plugin/task/mlflow/MlflowTaskTest.java
+++
b/dolphinscheduler-task-plugin/dolphinscheduler-task-mlflow/src/test/java/org/apache/dolphinler/plugin/task/mlflow/MlflowTaskTest.java
@@ -20,7 +20,6 @@ package org.apache.dolphinler.plugin.task.mlflow;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.common.utils.PropertyUtils;
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
-import
org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContextCacheManager;
import org.apache.dolphinscheduler.plugin.task.mlflow.MlflowConstants;
import org.apache.dolphinscheduler.plugin.task.mlflow.MlflowParameters;
import org.apache.dolphinscheduler.plugin.task.mlflow.MlflowTask;
@@ -58,7 +57,6 @@ public class MlflowTaskTest {
String parameters = JSONUtils.toJsonString(mlflowParameters);
TaskExecutionContext taskExecutionContext =
Mockito.mock(TaskExecutionContext.class);
Mockito.when(taskExecutionContext.getTaskParams()).thenReturn(parameters);
-
TaskExecutionContextCacheManager.cacheTaskExecutionContext(taskExecutionContext);
return taskExecutionContext;
}
diff --git
a/dolphinscheduler-task-plugin/dolphinscheduler-task-pytorch/src/test/java/org/apache/dolphinscheduler/plugin/task/pytorch/PytorchTaskTest.java
b/dolphinscheduler-task-plugin/dolphinscheduler-task-pytorch/src/test/java/org/apache/dolphinscheduler/plugin/task/pytorch/PytorchTaskTest.java
index 25b975b885..c213021607 100644
---
a/dolphinscheduler-task-plugin/dolphinscheduler-task-pytorch/src/test/java/org/apache/dolphinscheduler/plugin/task/pytorch/PytorchTaskTest.java
+++
b/dolphinscheduler-task-plugin/dolphinscheduler-task-pytorch/src/test/java/org/apache/dolphinscheduler/plugin/task/pytorch/PytorchTaskTest.java
@@ -21,7 +21,6 @@ import static
org.apache.dolphinscheduler.plugin.task.api.TaskConstants.RWXR_XR_
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
-import
org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContextCacheManager;
import org.apache.commons.lang3.SystemUtils;
@@ -198,7 +197,6 @@ public class PytorchTaskTest {
String parameters = JSONUtils.toJsonString(pytorchParameters);
TaskExecutionContext taskExecutionContext =
Mockito.mock(TaskExecutionContext.class);
Mockito.when(taskExecutionContext.getTaskParams()).thenReturn(parameters);
-
TaskExecutionContextCacheManager.cacheTaskExecutionContext(taskExecutionContext);
return taskExecutionContext;
}
diff --git
a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java
b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java
index e61409c9d2..e8ae5381fd 100644
---
a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java
+++
b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java
@@ -22,15 +22,14 @@ import
org.apache.dolphinscheduler.common.constants.Constants;
import org.apache.dolphinscheduler.common.lifecycle.ServerLifeCycleManager;
import org.apache.dolphinscheduler.common.thread.ThreadUtils;
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
-import
org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContextCacheManager;
import org.apache.dolphinscheduler.plugin.task.api.TaskPluginManager;
import org.apache.dolphinscheduler.plugin.task.api.utils.LogUtils;
import org.apache.dolphinscheduler.plugin.task.api.utils.ProcessUtils;
import org.apache.dolphinscheduler.server.worker.message.MessageRetryRunner;
import org.apache.dolphinscheduler.server.worker.registry.WorkerRegistryClient;
import org.apache.dolphinscheduler.server.worker.rpc.WorkerRpcServer;
-import
org.apache.dolphinscheduler.server.worker.runner.GlobalTaskInstanceWaitingQueueLooper;
-import org.apache.dolphinscheduler.server.worker.runner.WorkerManagerThread;
+import org.apache.dolphinscheduler.server.worker.runner.WorkerTaskExecutor;
+import
org.apache.dolphinscheduler.server.worker.runner.WorkerTaskExecutorHolder;
import org.apache.commons.collections4.CollectionUtils;
@@ -52,9 +51,6 @@ import
org.springframework.transaction.annotation.EnableTransactionManagement;
@Slf4j
public class WorkerServer implements IStoppable {
- @Autowired
- private WorkerManagerThread workerManagerThread;
-
@Autowired
private WorkerRegistryClient workerRegistryClient;
@@ -67,9 +63,6 @@ public class WorkerServer implements IStoppable {
@Autowired
private MessageRetryRunner messageRetryRunner;
- @Autowired
- private GlobalTaskInstanceWaitingQueueLooper
globalTaskInstanceWaitingQueueLooper;
-
/**
* worker server startup, not use web service
*
@@ -88,10 +81,7 @@ public class WorkerServer implements IStoppable {
this.workerRegistryClient.setRegistryStoppable(this);
this.workerRegistryClient.start();
- this.workerManagerThread.start();
-
this.messageRetryRunner.start();
- this.globalTaskInstanceWaitingQueueLooper.start();
/*
* registry hooks, which are called before the process exits
@@ -114,7 +104,13 @@ public class WorkerServer implements IStoppable {
WorkerRpcServer closedWorkerRpcServer = workerRpcServer;
WorkerRegistryClient closedRegistryClient =
workerRegistryClient) {
log.info("Worker server is stopping, current cause : {}", cause);
- // kill running tasks
+ // todo: we need to remove this method
+ // since for some task, we need to take-over the remote task after
the worker restart
+ // and if the worker crash, the `killAllRunningTasks` will not be
execute, this will cause there exist two
+ // kind of situation:
+ // 1. If the worker is stop by kill, the tasks will be kill.
+ // 2. If the worker is stop by kill -9, the tasks will not be kill.
+ // So we don't need to kill the tasks.
this.killAllRunningTasks();
} catch (Exception e) {
log.error("Worker server stop failed, current cause: {}", cause,
e);
@@ -129,25 +125,25 @@ public class WorkerServer implements IStoppable {
}
public void killAllRunningTasks() {
- Collection<TaskExecutionContext> taskRequests =
TaskExecutionContextCacheManager.getAllTaskRequestList();
- if (CollectionUtils.isEmpty(taskRequests)) {
+ Collection<WorkerTaskExecutor> workerTaskExecutors =
WorkerTaskExecutorHolder.getAllTaskExecutor();
+ if (CollectionUtils.isEmpty(workerTaskExecutors)) {
return;
}
- log.info("Worker begin to kill all cache task, task size: {}",
taskRequests.size());
+ log.info("Worker begin to kill all cache task, task size: {}",
workerTaskExecutors.size());
int killNumber = 0;
- for (TaskExecutionContext taskRequest : taskRequests) {
+ for (WorkerTaskExecutor workerTaskExecutor : workerTaskExecutors) {
// kill task when it's not finished yet
try {
-
LogUtils.setWorkflowAndTaskInstanceIDMDC(taskRequest.getProcessInstanceId(),
- taskRequest.getTaskInstanceId());
- if (ProcessUtils.kill(taskRequest)) {
+ TaskExecutionContext taskExecutionContext =
workerTaskExecutor.getTaskExecutionContext();
+
LogUtils.setTaskInstanceIdMDC(taskExecutionContext.getTaskInstanceId());
+ if (ProcessUtils.kill(taskExecutionContext)) {
killNumber++;
}
} finally {
LogUtils.removeWorkflowAndTaskInstanceIdMDC();
}
}
- log.info("Worker after kill all cache task, task size: {}, killed
number: {}", taskRequests.size(),
+ log.info("Worker after kill all cache task, task size: {}, killed
number: {}", workerTaskExecutors.size(),
killNumber);
}
}
diff --git
a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/message/MessageRetryRunner.java
b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/message/MessageRetryRunner.java
index 2b9ce2ec5f..28a4d8b214 100644
---
a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/message/MessageRetryRunner.java
+++
b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/message/MessageRetryRunner.java
@@ -23,6 +23,7 @@ import
org.apache.dolphinscheduler.common.thread.BaseDaemonThread;
import
org.apache.dolphinscheduler.extract.master.transportor.ITaskInstanceExecutionEvent;
import org.apache.dolphinscheduler.plugin.task.api.utils.LogUtils;
+import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.collections4.MapUtils;
import java.time.Duration;
@@ -92,13 +93,15 @@ public class MessageRetryRunner extends BaseDaemonThread {
needToRetryMessages.remove(taskInstanceId);
}
- public void updateMessageHost(int taskInstanceId, String
messageReceiverHost) {
+ public boolean updateMessageHost(int taskInstanceId, String
messageReceiverHost) {
List<TaskInstanceMessage> taskInstanceMessages =
this.needToRetryMessages.get(taskInstanceId);
- if (taskInstanceMessages != null) {
- taskInstanceMessages.forEach(taskInstanceMessage -> {
-
taskInstanceMessage.getEvent().setWorkflowInstanceHost(messageReceiverHost);
- });
+ if (CollectionUtils.isEmpty(taskInstanceMessages)) {
+ return false;
}
+ taskInstanceMessages.forEach(taskInstanceMessage -> {
+
taskInstanceMessage.getEvent().setWorkflowInstanceHost(messageReceiverHost);
+ });
+ return true;
}
public void run() {
diff --git
a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistryClient.java
b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistryClient.java
index ab60de9a77..b383af752d 100644
---
a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistryClient.java
+++
b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistryClient.java
@@ -30,7 +30,7 @@ import
org.apache.dolphinscheduler.registry.api.RegistryClient;
import org.apache.dolphinscheduler.registry.api.RegistryException;
import org.apache.dolphinscheduler.registry.api.enums.RegistryNodeType;
import org.apache.dolphinscheduler.server.worker.config.WorkerConfig;
-import org.apache.dolphinscheduler.server.worker.runner.WorkerManagerThread;
+import
org.apache.dolphinscheduler.server.worker.runner.WorkerTaskExecutorThreadPool;
import org.apache.dolphinscheduler.server.worker.task.WorkerHeartBeatTask;
import org.apache.commons.collections4.CollectionUtils;
@@ -55,7 +55,7 @@ public class WorkerRegistryClient implements AutoCloseable {
private WorkerConfig workerConfig;
@Autowired
- private WorkerManagerThread workerManagerThread;
+ private WorkerTaskExecutorThreadPool workerManagerThread;
@Autowired
private RegistryClient registryClient;
@@ -71,7 +71,7 @@ public class WorkerRegistryClient implements AutoCloseable {
this.workerHeartBeatTask = new WorkerHeartBeatTask(
workerConfig,
registryClient,
- () -> workerManagerThread.getWaitSubmitQueueSize());
+ () -> workerManagerThread.getWaitingTaskExecutorSize());
}
public void start() {
diff --git
a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/registry/WorkerWaitingStrategy.java
b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/registry/WorkerWaitingStrategy.java
index dc97a2c9a4..b2c3d0b606 100644
---
a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/registry/WorkerWaitingStrategy.java
+++
b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/registry/WorkerWaitingStrategy.java
@@ -25,8 +25,8 @@ import org.apache.dolphinscheduler.registry.api.StrategyType;
import org.apache.dolphinscheduler.server.worker.config.WorkerConfig;
import org.apache.dolphinscheduler.server.worker.message.MessageRetryRunner;
import org.apache.dolphinscheduler.server.worker.rpc.WorkerRpcServer;
-import
org.apache.dolphinscheduler.server.worker.runner.GlobalTaskInstanceWaitingQueue;
-import org.apache.dolphinscheduler.server.worker.runner.WorkerManagerThread;
+import
org.apache.dolphinscheduler.server.worker.runner.WorkerTaskExecutorHolder;
+import
org.apache.dolphinscheduler.server.worker.runner.WorkerTaskExecutorThreadPool;
import java.time.Duration;
@@ -54,10 +54,7 @@ public class WorkerWaitingStrategy implements
WorkerConnectStrategy {
private MessageRetryRunner messageRetryRunner;
@Autowired
- private WorkerManagerThread workerManagerThread;
-
- @Autowired
- private GlobalTaskInstanceWaitingQueue globalTaskInstanceWaitingQueue;
+ private WorkerTaskExecutorThreadPool workerManagerThread;
@Override
public void disconnect() {
@@ -121,7 +118,7 @@ public class WorkerWaitingStrategy implements
WorkerConnectStrategy {
workerRpcServer.close();
log.warn("Worker server close the RPC server due to lost connection
from registry");
workerManagerThread.clearTask();
- globalTaskInstanceWaitingQueue.clearTask();
+ WorkerTaskExecutorHolder.clear();
log.warn("Worker server clear the tasks due to lost connection from
registry");
messageRetryRunner.clearMessage();
log.warn("Worker server clear the retry message due to lost connection
from registry");
diff --git
a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/rpc/StreamingTaskInstanceOperatorImpl.java
b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/rpc/StreamingTaskInstanceOperatorImpl.java
index c57ee2efca..51b75aa457 100644
---
a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/rpc/StreamingTaskInstanceOperatorImpl.java
+++
b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/rpc/StreamingTaskInstanceOperatorImpl.java
@@ -21,12 +21,11 @@ import
org.apache.dolphinscheduler.extract.worker.IStreamingTaskInstanceOperator
import
org.apache.dolphinscheduler.extract.worker.transportor.TaskInstanceTriggerSavepointRequest;
import
org.apache.dolphinscheduler.extract.worker.transportor.TaskInstanceTriggerSavepointResponse;
import org.apache.dolphinscheduler.plugin.task.api.AbstractTask;
-import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
-import
org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContextCacheManager;
import org.apache.dolphinscheduler.plugin.task.api.stream.StreamTask;
import org.apache.dolphinscheduler.plugin.task.api.utils.LogUtils;
-import org.apache.dolphinscheduler.server.worker.runner.WorkerManagerThread;
import org.apache.dolphinscheduler.server.worker.runner.WorkerTaskExecutor;
+import
org.apache.dolphinscheduler.server.worker.runner.WorkerTaskExecutorHolder;
+import
org.apache.dolphinscheduler.server.worker.runner.WorkerTaskExecutorThreadPool;
import lombok.extern.slf4j.Slf4j;
@@ -38,7 +37,7 @@ import org.springframework.stereotype.Component;
public class StreamingTaskInstanceOperatorImpl implements
IStreamingTaskInstanceOperator {
@Autowired
- private WorkerManagerThread workerManager;
+ private WorkerTaskExecutorThreadPool workerManager;
@Override
public TaskInstanceTriggerSavepointResponse
triggerSavepoint(TaskInstanceTriggerSavepointRequest
taskInstanceTriggerSavepointRequest) {
@@ -47,16 +46,10 @@ public class StreamingTaskInstanceOperatorImpl implements
IStreamingTaskInstance
try {
int taskInstanceId =
taskInstanceTriggerSavepointRequest.getTaskInstanceId();
LogUtils.setTaskInstanceIdMDC(taskInstanceId);
- TaskExecutionContext taskExecutionContext =
-
TaskExecutionContextCacheManager.getByTaskInstanceId(taskInstanceId);
- if (taskExecutionContext == null) {
- log.error("Cannot find TaskExecutionContext for taskInstance:
{}", taskInstanceId);
- return TaskInstanceTriggerSavepointResponse.fail("Cannot find
TaskExecutionContext");
- }
- WorkerTaskExecutor workerTaskExecutor =
workerManager.getTaskExecuteThread(taskInstanceId);
+ WorkerTaskExecutor workerTaskExecutor =
WorkerTaskExecutorHolder.get(taskInstanceId);
if (workerTaskExecutor == null) {
- log.error("Cannot find WorkerTaskExecuteRunnable for
taskInstance: {}", taskInstanceId);
- return TaskInstanceTriggerSavepointResponse.fail("Cannot find
WorkerTaskExecuteRunnable");
+ log.error("Cannot find WorkerTaskExecutor for taskInstance:
{}", taskInstanceId);
+ return TaskInstanceTriggerSavepointResponse.fail("Cannot find
TaskExecutionContext");
}
AbstractTask task = workerTaskExecutor.getTask();
if (task == null) {
diff --git
a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/rpc/WorkerLogServiceImpl.java
b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/rpc/WorkerLogServiceImpl.java
index 24b5477cce..a8a0c5a2b7 100644
---
a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/rpc/WorkerLogServiceImpl.java
+++
b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/rpc/WorkerLogServiceImpl.java
@@ -30,8 +30,8 @@ import
org.apache.dolphinscheduler.extract.common.transportor.TaskInstanceLogFil
import
org.apache.dolphinscheduler.extract.common.transportor.TaskInstanceLogFileDownloadResponse;
import
org.apache.dolphinscheduler.extract.common.transportor.TaskInstanceLogPageQueryRequest;
import
org.apache.dolphinscheduler.extract.common.transportor.TaskInstanceLogPageQueryResponse;
-import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
-import
org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContextCacheManager;
+import org.apache.dolphinscheduler.server.worker.runner.WorkerTaskExecutor;
+import
org.apache.dolphinscheduler.server.worker.runner.WorkerTaskExecutorHolder;
import java.util.List;
@@ -64,9 +64,13 @@ public class WorkerLogServiceImpl implements ILogService {
@Override
public GetAppIdResponse getAppId(GetAppIdRequest getAppIdRequest) {
- TaskExecutionContext taskExecutionContext =
-
TaskExecutionContextCacheManager.getByTaskInstanceId(getAppIdRequest.getTaskInstanceId());
- String appInfoPath = taskExecutionContext.getAppInfoPath();
+ String appInfoPath = null;
+ WorkerTaskExecutor workerTaskExecutor =
WorkerTaskExecutorHolder.get(getAppIdRequest.getTaskInstanceId());
+ if (workerTaskExecutor != null) {
+ // todo: remove this kind of logic, and remove get appId method,
the appId should be send by worker rather
+ // than query by master
+ appInfoPath =
workerTaskExecutor.getTaskExecutionContext().getAppInfoPath();
+ }
String logPath = getAppIdRequest.getLogPath();
List<String> appIds =
org.apache.dolphinscheduler.plugin.task.api.utils.LogUtils.getAppIds(logPath,
appInfoPath,
PropertyUtils.getString(APPID_COLLECT, DEFAULT_COLLECT_WAY));
diff --git
a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/GlobalTaskInstanceWaitingQueue.java
b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/GlobalTaskInstanceWaitingQueue.java
deleted file mode 100644
index e1ac97b2d3..0000000000
---
a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/GlobalTaskInstanceWaitingQueue.java
+++ /dev/null
@@ -1,70 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.dolphinscheduler.server.worker.runner;
-
-import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
-import
org.apache.dolphinscheduler.server.worker.config.TaskExecuteThreadsFullPolicy;
-import org.apache.dolphinscheduler.server.worker.config.WorkerConfig;
-import org.apache.dolphinscheduler.server.worker.metrics.WorkerServerMetrics;
-
-import java.util.concurrent.ArrayBlockingQueue;
-import java.util.concurrent.BlockingQueue;
-
-import lombok.extern.slf4j.Slf4j;
-
-import org.springframework.stereotype.Component;
-
-@Slf4j
-@Component
-public class GlobalTaskInstanceWaitingQueue {
-
- private final WorkerConfig workerConfig;
-
- private final BlockingQueue<TaskExecutionContext> blockingQueue;
-
- public GlobalTaskInstanceWaitingQueue(WorkerConfig workerConfig) {
- this.workerConfig = workerConfig;
- this.blockingQueue = new
ArrayBlockingQueue<>(workerConfig.getExecThreads());
- }
-
- public boolean addDispatchTask(TaskExecutionContext taskExecutionContext) {
- if (workerConfig.getTaskExecuteThreadsFullPolicy() ==
TaskExecuteThreadsFullPolicy.CONTINUE) {
- return blockingQueue.offer(taskExecutionContext);
- }
-
- if (blockingQueue.size() > getQueueSize()) {
- log.warn("Wait submit queue is full, will retry submit task
later");
- WorkerServerMetrics.incWorkerSubmitQueueIsFullCount();
- return false;
- }
- return blockingQueue.offer(taskExecutionContext);
- }
-
- public TaskExecutionContext take() throws InterruptedException {
- return blockingQueue.take();
- }
-
- public void clearTask() {
- blockingQueue.clear();
- }
-
- public int getQueueSize() {
- return workerConfig.getExecThreads();
- }
-
-}
diff --git
a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/GlobalTaskInstanceWaitingQueueLooper.java
b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/GlobalTaskInstanceWaitingQueueLooper.java
deleted file mode 100644
index 6e501a1be6..0000000000
---
a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/GlobalTaskInstanceWaitingQueueLooper.java
+++ /dev/null
@@ -1,101 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.dolphinscheduler.server.worker.runner;
-
-import org.apache.dolphinscheduler.common.thread.BaseDaemonThread;
-import org.apache.dolphinscheduler.plugin.storage.api.StorageOperate;
-import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
-import org.apache.dolphinscheduler.plugin.task.api.TaskPluginManager;
-import org.apache.dolphinscheduler.plugin.task.api.utils.LogUtils;
-import org.apache.dolphinscheduler.server.worker.config.WorkerConfig;
-import org.apache.dolphinscheduler.server.worker.registry.WorkerRegistryClient;
-import org.apache.dolphinscheduler.server.worker.rpc.WorkerMessageSender;
-
-import lombok.extern.slf4j.Slf4j;
-
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.stereotype.Component;
-
-@Slf4j
-@Component
-public class GlobalTaskInstanceWaitingQueueLooper extends BaseDaemonThread {
-
- @Autowired
- private GlobalTaskInstanceWaitingQueue globalTaskInstanceWaitingQueue;
-
- @Autowired
- private WorkerConfig workerConfig;
-
- @Autowired
- private WorkerMessageSender workerMessageSender;
-
- @Autowired
- private TaskPluginManager taskPluginManager;
-
- @Autowired
- private WorkerManagerThread workerManager;
-
- @Autowired(required = false)
- private StorageOperate storageOperate;
-
- @Autowired
- private WorkerRegistryClient workerRegistryClient;
-
- protected GlobalTaskInstanceWaitingQueueLooper() {
- super("GlobalTaskDispatchQueueLooper");
- }
-
- public synchronized void start() {
- log.info("GlobalTaskDispatchQueueLooper starting");
- super.start();
- log.info("GlobalTaskDispatchQueueLooper started");
- }
-
- public void run() {
- while (true) {
- try {
- TaskExecutionContext taskExecutionContext =
globalTaskInstanceWaitingQueue.take();
-
LogUtils.setTaskInstanceLogFullPathMDC(taskExecutionContext.getLogPath());
-
LogUtils.setTaskInstanceIdMDC(taskExecutionContext.getTaskInstanceId());
-
- WorkerTaskExecutor workerTaskExecutor =
WorkerTaskExecutorFactoryBuilder
- .createWorkerTaskExecutorFactory(
- taskExecutionContext,
- workerConfig,
- workerMessageSender,
- taskPluginManager,
- storageOperate,
- workerRegistryClient)
- .createWorkerTaskExecutor();
- if (workerManager.offer(workerTaskExecutor)) {
- log.info("Success submit WorkerDelayTaskExecuteRunnable to
WorkerManagerThread's waiting queue");
- }
- } catch (InterruptedException e) {
- log.error("GlobalTaskDispatchQueueLooper interrupted");
- Thread.currentThread().interrupt();
- break;
- } catch (Exception ex) {
- log.error("GlobalTaskDispatchQueueLooper error", ex);
- } finally {
- LogUtils.removeTaskInstanceIdMDC();
- LogUtils.removeTaskInstanceLogFullPathMDC();
- }
- }
- }
-
-}
diff --git
a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskCallbackImpl.java
b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskCallbackImpl.java
index be8a62a6c8..aeb7d658d1 100644
---
a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskCallbackImpl.java
+++
b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskCallbackImpl.java
@@ -20,7 +20,6 @@ package org.apache.dolphinscheduler.server.worker.runner;
import
org.apache.dolphinscheduler.extract.master.transportor.ITaskInstanceExecutionEvent;
import org.apache.dolphinscheduler.plugin.task.api.TaskCallBack;
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
-import
org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContextCacheManager;
import org.apache.dolphinscheduler.plugin.task.api.model.ApplicationInfo;
import org.apache.dolphinscheduler.server.worker.rpc.WorkerMessageSender;
@@ -42,15 +41,7 @@ public class TaskCallbackImpl implements TaskCallBack {
@Override
public void updateRemoteApplicationInfo(int taskInstanceId,
ApplicationInfo applicationInfo) {
- TaskExecutionContext taskExecutionContext =
-
TaskExecutionContextCacheManager.getByTaskInstanceId(taskInstanceId);
- if (taskExecutionContext == null) {
- log.error("task execution context is empty, taskInstanceId: {},
applicationInfo:{}", taskInstanceId,
- applicationInfo);
- return;
- }
-
- log.info("send remote application info {}", applicationInfo);
+ // todo: use listener
taskExecutionContext.setAppIds(applicationInfo.getAppIds());
workerMessageSender.sendMessageWithRetry(taskExecutionContext,
ITaskInstanceExecutionEvent.TaskInstanceExecutionEventType.RUNNING_INFO);
@@ -58,13 +49,6 @@ public class TaskCallbackImpl implements TaskCallBack {
@Override
public void updateTaskInstanceInfo(int taskInstanceId) {
- TaskExecutionContext taskExecutionContext =
-
TaskExecutionContextCacheManager.getByTaskInstanceId(taskInstanceId);
- if (taskExecutionContext == null) {
- log.error("task execution context is empty, taskInstanceId: {}",
taskInstanceId);
- return;
- }
-
workerMessageSender.sendMessageWithRetry(taskExecutionContext,
ITaskInstanceExecutionEvent.TaskInstanceExecutionEventType.RUNNING_INFO);
}
diff --git
a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerExecService.java
b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerExecService.java
deleted file mode 100644
index 2a6b7feec2..0000000000
---
a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerExecService.java
+++ /dev/null
@@ -1,91 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.dolphinscheduler.server.worker.runner;
-
-import org.apache.dolphinscheduler.server.worker.metrics.WorkerServerMetrics;
-
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.ThreadPoolExecutor;
-
-import lombok.extern.slf4j.Slf4j;
-
-import com.google.common.util.concurrent.FutureCallback;
-import com.google.common.util.concurrent.Futures;
-import com.google.common.util.concurrent.ListenableFuture;
-import com.google.common.util.concurrent.ListeningExecutorService;
-import com.google.common.util.concurrent.MoreExecutors;
-
-@Slf4j
-public class WorkerExecService {
-
- private final ListeningExecutorService listeningExecutorService;
-
- private final ExecutorService execService;
-
- private final ConcurrentHashMap<Integer, WorkerTaskExecutor>
taskExecuteThreadMap;
-
- public WorkerExecService(ExecutorService execService,
- ConcurrentHashMap<Integer, WorkerTaskExecutor>
taskExecuteThreadMap) {
- this.execService = execService;
- this.listeningExecutorService =
MoreExecutors.listeningDecorator(this.execService);
- this.taskExecuteThreadMap = taskExecuteThreadMap;
-
WorkerServerMetrics.registerWorkerTaskTotalGauge(taskExecuteThreadMap::size);
- }
-
- public void submit(final WorkerTaskExecutor taskExecuteThread) {
-
taskExecuteThreadMap.put(taskExecuteThread.getTaskExecutionContext().getTaskInstanceId(),
taskExecuteThread);
- ListenableFuture future =
this.listeningExecutorService.submit(taskExecuteThread);
- FutureCallback futureCallback = new FutureCallback() {
-
- @Override
- public void onSuccess(Object o) {
-
taskExecuteThreadMap.remove(taskExecuteThread.getTaskExecutionContext().getTaskInstanceId());
- }
-
- @Override
- public void onFailure(Throwable throwable) {
- log.error("task execute failed, processInstanceId:{},
taskInstanceId:{}",
-
taskExecuteThread.getTaskExecutionContext().getProcessInstanceId(),
-
taskExecuteThread.getTaskExecutionContext().getTaskInstanceId(),
- throwable);
-
taskExecuteThreadMap.remove(taskExecuteThread.getTaskExecutionContext().getTaskInstanceId());
- }
- };
- Futures.addCallback(future, futureCallback,
this.listeningExecutorService);
- }
-
- /**
- * get thread pool queue size
- *
- * @return queue size
- */
- public int getThreadPoolQueueSize() {
- return ((ThreadPoolExecutor) this.execService).getQueue().size();
- }
-
- public int getActiveExecThreadCount() {
- return ((ThreadPoolExecutor) this.execService).getActiveCount();
- }
-
- public Map<Integer, WorkerTaskExecutor> getTaskExecuteThreadMap() {
- return taskExecuteThreadMap;
- }
-
-}
diff --git
a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerManagerThread.java
b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerManagerThread.java
deleted file mode 100644
index 4b666cfb20..0000000000
---
a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerManagerThread.java
+++ /dev/null
@@ -1,150 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.dolphinscheduler.server.worker.runner;
-
-import org.apache.dolphinscheduler.common.constants.Constants;
-import org.apache.dolphinscheduler.common.lifecycle.ServerLifeCycleManager;
-import org.apache.dolphinscheduler.common.thread.ThreadUtils;
-import org.apache.dolphinscheduler.common.utils.OSUtils;
-import
org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContextCacheManager;
-import org.apache.dolphinscheduler.server.worker.config.WorkerConfig;
-import org.apache.dolphinscheduler.server.worker.metrics.WorkerServerMetrics;
-
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.LinkedBlockingQueue;
-
-import javax.annotation.Nullable;
-
-import lombok.extern.slf4j.Slf4j;
-
-import org.springframework.stereotype.Component;
-
-/**
- * Manage tasks
- */
-@Component
-@Slf4j
-public class WorkerManagerThread implements Runnable {
-
- private final BlockingQueue<WorkerTaskExecutor> waitSubmitQueue;
- private final WorkerExecService workerExecService;
-
- private final int workerExecThreads;
-
- private final ConcurrentHashMap<Integer, WorkerTaskExecutor>
taskExecuteThreadMap = new ConcurrentHashMap<>();
-
- public WorkerManagerThread(WorkerConfig workerConfig) {
- workerExecThreads = workerConfig.getExecThreads();
- this.waitSubmitQueue = new LinkedBlockingQueue<>();
- workerExecService = new WorkerExecService(
-
ThreadUtils.newDaemonFixedThreadExecutor("Worker-Execute-Thread",
workerConfig.getExecThreads()),
- taskExecuteThreadMap);
- }
-
- public @Nullable WorkerTaskExecutor getTaskExecuteThread(Integer
taskInstanceId) {
- return taskExecuteThreadMap.get(taskInstanceId);
- }
-
- /**
- * get wait submit queue size
- *
- * @return queue size
- */
- public int getWaitSubmitQueueSize() {
- return waitSubmitQueue.size();
- }
-
- /**
- * get thread pool queue size
- *
- * @return queue size
- */
- public int getThreadPoolQueueSize() {
- return workerExecService.getThreadPoolQueueSize();
- }
-
- /**
- * Kill tasks that have not been executed, like delay task
- * then send Response to Master, update the execution status of task
instance
- */
- public void killTaskBeforeExecuteByInstanceId(Integer taskInstanceId) {
- waitSubmitQueue.stream()
- .filter(taskExecuteThread ->
taskExecuteThread.getTaskExecutionContext()
- .getTaskInstanceId() == taskInstanceId)
- .forEach(waitSubmitQueue::remove);
- }
-
- public boolean offer(WorkerTaskExecutor workerDelayTaskExecuteRunnable) {
- return waitSubmitQueue.add(workerDelayTaskExecuteRunnable);
- }
-
- public void start() {
- log.info("Worker manager thread starting");
- Thread thread = new Thread(this, this.getClass().getName());
- thread.setDaemon(true);
- thread.start();
- log.info("Worker manager thread started");
- }
-
- @Override
- public void run() {
-
WorkerServerMetrics.registerWorkerCpuUsageGauge(OSUtils::cpuUsagePercentage);
-
WorkerServerMetrics.registerWorkerMemoryAvailableGauge(OSUtils::availablePhysicalMemorySize);
-
WorkerServerMetrics.registerWorkerMemoryUsageGauge(OSUtils::memoryUsagePercentage);
-
WorkerServerMetrics.registerWorkerExecuteQueueSizeGauge(workerExecService::getThreadPoolQueueSize);
-
WorkerServerMetrics.registerWorkerActiveExecuteThreadGauge(workerExecService::getActiveExecThreadCount);
-
- Thread.currentThread().setName("Worker-Execute-Manager-Thread");
- while (!ServerLifeCycleManager.isStopped()) {
- try {
- if (!ServerLifeCycleManager.isRunning()) {
- Thread.sleep(Constants.SLEEP_TIME_MILLIS);
- }
- if (this.getThreadPoolQueueSize() <= workerExecThreads) {
- WorkerTaskExecutor workerTaskExecutor =
waitSubmitQueue.take();
- workerExecService.submit(workerTaskExecutor);
- } else {
- WorkerServerMetrics.incWorkerOverloadCount();
- log.info("Exec queue is full, waiting submit queue {},
waiting exec queue size {}",
- this.getWaitSubmitQueueSize(),
this.getThreadPoolQueueSize());
- ThreadUtils.sleep(Constants.SLEEP_TIME_MILLIS);
- }
- } catch (Exception e) {
- log.error("An unexpected interrupt is happened, "
- + "the exception will be ignored and this thread will
continue to run", e);
- }
- }
- }
-
- public void clearTask() {
- waitSubmitQueue.clear();
-
workerExecService.getTaskExecuteThreadMap().values().forEach(workerTaskExecuteRunnable
-> {
- int taskInstanceId =
workerTaskExecuteRunnable.getTaskExecutionContext().getTaskInstanceId();
- try {
- workerTaskExecuteRunnable.cancelTask();
- log.info("Cancel the taskInstance in worker {}",
taskInstanceId);
- } catch (Exception ex) {
- log.error("Cancel the taskInstance error {}", taskInstanceId,
ex);
- } finally {
-
TaskExecutionContextCacheManager.removeByTaskInstanceId(taskInstanceId);
- }
- });
- workerExecService.getTaskExecuteThreadMap().clear();
- }
-}
diff --git
a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerTaskExecutor.java
b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerTaskExecutor.java
index e47a28d264..f713605a48 100644
---
a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerTaskExecutor.java
+++
b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerTaskExecutor.java
@@ -39,7 +39,6 @@ import
org.apache.dolphinscheduler.plugin.task.api.TaskCallBack;
import org.apache.dolphinscheduler.plugin.task.api.TaskChannel;
import org.apache.dolphinscheduler.plugin.task.api.TaskException;
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
-import
org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContextCacheManager;
import org.apache.dolphinscheduler.plugin.task.api.TaskPluginException;
import org.apache.dolphinscheduler.plugin.task.api.TaskPluginManager;
import org.apache.dolphinscheduler.plugin.task.api.enums.Direct;
@@ -108,7 +107,7 @@ public abstract class WorkerTaskExecutor implements
Runnable {
sendTaskResult();
-
TaskExecutionContextCacheManager.removeByTaskInstanceId(taskExecutionContext.getTaskInstanceId());
+
WorkerTaskExecutorHolder.remove(taskExecutionContext.getTaskInstanceId());
log.info("Remove the current task execute context from worker cache");
clearTaskExecPathIfNeeded();
@@ -118,7 +117,7 @@ public abstract class WorkerTaskExecutor implements
Runnable {
if (cancelTask()) {
log.info("Cancel the task successfully");
}
-
TaskExecutionContextCacheManager.removeByTaskInstanceId(taskExecutionContext.getTaskInstanceId());
+
WorkerTaskExecutorHolder.remove(taskExecutionContext.getTaskInstanceId());
taskExecutionContext.setCurrentExecutionStatus(TaskExecutionStatus.FAILURE);
taskExecutionContext.setEndTime(System.currentTimeMillis());
workerMessageSender.sendMessageWithRetry(taskExecutionContext,
@@ -128,7 +127,7 @@ public abstract class WorkerTaskExecutor implements
Runnable {
}
- public boolean cancelTask() {
+ protected boolean cancelTask() {
// cancel the task
if (task == null) {
return true;
@@ -157,7 +156,7 @@ public abstract class WorkerTaskExecutor implements
Runnable {
if (DRY_RUN_FLAG_YES == taskExecutionContext.getDryRun()) {
taskExecutionContext.setCurrentExecutionStatus(TaskExecutionStatus.SUCCESS);
taskExecutionContext.setEndTime(System.currentTimeMillis());
-
TaskExecutionContextCacheManager.removeByTaskInstanceId(taskExecutionContext.getTaskInstanceId());
+
WorkerTaskExecutorHolder.remove(taskExecutionContext.getTaskInstanceId());
workerMessageSender.sendMessageWithRetry(taskExecutionContext,
ITaskInstanceExecutionEvent.TaskInstanceExecutionEventType.FINISH);
log.info(
diff --git
a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerTaskExecutorFactoryBuilder.java
b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerTaskExecutorFactoryBuilder.java
index c2efdc9c7a..a9c2948482 100644
---
a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerTaskExecutorFactoryBuilder.java
+++
b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerTaskExecutorFactoryBuilder.java
@@ -24,20 +24,31 @@ import
org.apache.dolphinscheduler.server.worker.config.WorkerConfig;
import org.apache.dolphinscheduler.server.worker.registry.WorkerRegistryClient;
import org.apache.dolphinscheduler.server.worker.rpc.WorkerMessageSender;
-import javax.annotation.Nullable;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Component;
-import lombok.NonNull;
-import lombok.experimental.UtilityClass;
-
-@UtilityClass
+@Component
public class WorkerTaskExecutorFactoryBuilder {
- public static WorkerTaskExecutorFactory<? extends WorkerTaskExecutor>
createWorkerTaskExecutorFactory(@NonNull TaskExecutionContext
taskExecutionContext,
-
@NonNull WorkerConfig workerConfig,
-
@NonNull WorkerMessageSender workerMessageSender,
-
@NonNull TaskPluginManager taskPluginManager,
-
@Nullable StorageOperate storageOperate,
-
@NonNull WorkerRegistryClient workerRegistryClient) {
+ @Autowired
+ private WorkerConfig workerConfig;
+
+ @Autowired
+ private WorkerMessageSender workerMessageSender;
+
+ @Autowired
+ private TaskPluginManager taskPluginManager;
+
+ @Autowired
+ private WorkerTaskExecutorThreadPool workerManager;
+
+ @Autowired(required = false)
+ private StorageOperate storageOperate;
+
+ @Autowired
+ private WorkerRegistryClient workerRegistryClient;
+
+ public WorkerTaskExecutorFactory<? extends WorkerTaskExecutor>
createWorkerTaskExecutorFactory(TaskExecutionContext taskExecutionContext) {
return new DefaultWorkerTaskExecutorFactory(taskExecutionContext,
workerConfig,
workerMessageSender,
diff --git
a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerTaskExecutorHolder.java
b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerTaskExecutorHolder.java
new file mode 100644
index 0000000000..fa07dbfde6
--- /dev/null
+++
b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerTaskExecutorHolder.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.server.worker.runner;
+
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Used to store all running and waiting {@link WorkerTaskExecutor}. If the
task has been finished, it will be removed from the map.
+ */
+public class WorkerTaskExecutorHolder {
+
+ private static final Map<Integer, WorkerTaskExecutor>
workerTaskExecutorMap = new HashMap<>();
+
+ public static void put(WorkerTaskExecutor workerTaskExecutor) {
+ int taskInstanceId =
workerTaskExecutor.getTaskExecutionContext().getTaskInstanceId();
+ if (workerTaskExecutorMap.containsKey(taskInstanceId)) {
+ throw new IllegalArgumentException("TaskInstance: " +
taskInstanceId + " already exists");
+ }
+ workerTaskExecutorMap.put(taskInstanceId, workerTaskExecutor);
+ }
+
+ public static WorkerTaskExecutor get(int taskInstanceId) {
+ return workerTaskExecutorMap.get(taskInstanceId);
+ }
+
+ public static WorkerTaskExecutor remove(int taskInstanceId) {
+ return workerTaskExecutorMap.remove(taskInstanceId);
+ }
+
+ public static void clear() {
+ workerTaskExecutorMap.clear();
+ }
+
+ public static Collection<WorkerTaskExecutor> getAllTaskExecutor() {
+ return workerTaskExecutorMap.values();
+ }
+}
diff --git
a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerTaskExecutorThreadPool.java
b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerTaskExecutorThreadPool.java
new file mode 100644
index 0000000000..28588c0dbf
--- /dev/null
+++
b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerTaskExecutorThreadPool.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.server.worker.runner;
+
+import org.apache.dolphinscheduler.common.thread.ThreadUtils;
+import org.apache.dolphinscheduler.common.utils.OSUtils;
+import
org.apache.dolphinscheduler.server.worker.config.TaskExecuteThreadsFullPolicy;
+import org.apache.dolphinscheduler.server.worker.config.WorkerConfig;
+import org.apache.dolphinscheduler.server.worker.metrics.WorkerServerMetrics;
+
+import java.util.concurrent.ThreadPoolExecutor;
+
+import lombok.extern.slf4j.Slf4j;
+
+import org.springframework.stereotype.Component;
+
+@Component
+@Slf4j
+public class WorkerTaskExecutorThreadPool {
+
+ private final ThreadPoolExecutor threadPoolExecutor;
+
+ private final WorkerConfig workerConfig;
+
+ public WorkerTaskExecutorThreadPool(WorkerConfig workerConfig) {
+ this.threadPoolExecutor =
+
ThreadUtils.newDaemonFixedThreadExecutor("WorkerTaskExecutorThreadPool",
workerConfig.getExecThreads());
+ this.workerConfig = workerConfig;
+
+
WorkerServerMetrics.registerWorkerCpuUsageGauge(OSUtils::cpuUsagePercentage);
+
WorkerServerMetrics.registerWorkerMemoryAvailableGauge(OSUtils::availablePhysicalMemorySize);
+
WorkerServerMetrics.registerWorkerMemoryUsageGauge(OSUtils::memoryUsagePercentage);
+ WorkerServerMetrics.registerWorkerExecuteQueueSizeGauge(
+ () -> threadPoolExecutor.getQueue().size() -
threadPoolExecutor.getActiveCount());
+
WorkerServerMetrics.registerWorkerActiveExecuteThreadGauge(threadPoolExecutor::getActiveCount);
+ }
+
+ public boolean submitWorkerTaskExecutor(WorkerTaskExecutor
workerTaskExecutor) {
+ synchronized (WorkerTaskExecutorThreadPool.class) {
+ if
(TaskExecuteThreadsFullPolicy.CONTINUE.equals(workerConfig.getTaskExecuteThreadsFullPolicy()))
{
+ WorkerTaskExecutorHolder.put(workerTaskExecutor);
+ threadPoolExecutor.submit(workerTaskExecutor);
+ return true;
+ }
+ if (isOverload()) {
+ log.warn("WorkerTaskExecutorThreadPool is overload, cannot
submit new WorkerTaskExecutor");
+ WorkerServerMetrics.incWorkerSubmitQueueIsFullCount();
+ return false;
+ }
+ WorkerTaskExecutorHolder.put(workerTaskExecutor);
+ threadPoolExecutor.submit(workerTaskExecutor);
+ return true;
+ }
+ }
+
+ public boolean isOverload() {
+ return threadPoolExecutor.getQueue().size() > 0;
+ }
+
+ public int getWaitingTaskExecutorSize() {
+ return threadPoolExecutor.getQueue().size();
+ }
+
+ public int getRunningTaskExecutorSize() {
+ return threadPoolExecutor.getActiveCount();
+ }
+
+ /**
+ * Kill tasks that have not been executed, e.g. waiting in the queue
+ */
+ public void killTaskBeforeExecuteByInstanceId(Integer taskInstanceId) {
+ synchronized (WorkerTaskExecutorThreadPool.class) {
+ WorkerTaskExecutor workerTaskExecutor =
WorkerTaskExecutorHolder.get(taskInstanceId);
+ threadPoolExecutor.remove(workerTaskExecutor);
+ }
+ }
+
+ public void clearTask() {
+ threadPoolExecutor.getQueue().clear();
+ }
+}
diff --git
a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/operator/TaskInstanceDispatchOperationFunction.java
b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/operator/TaskInstanceDispatchOperationFunction.java
index 4d3ebc9aa9..8c6825df63 100644
---
a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/operator/TaskInstanceDispatchOperationFunction.java
+++
b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/operator/TaskInstanceDispatchOperationFunction.java
@@ -20,11 +20,12 @@ package
org.apache.dolphinscheduler.server.worker.runner.operator;
import
org.apache.dolphinscheduler.extract.worker.transportor.TaskInstanceDispatchRequest;
import
org.apache.dolphinscheduler.extract.worker.transportor.TaskInstanceDispatchResponse;
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
-import
org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContextCacheManager;
import org.apache.dolphinscheduler.plugin.task.api.utils.LogUtils;
import org.apache.dolphinscheduler.server.worker.config.WorkerConfig;
import org.apache.dolphinscheduler.server.worker.metrics.TaskMetrics;
-import
org.apache.dolphinscheduler.server.worker.runner.GlobalTaskInstanceWaitingQueue;
+import org.apache.dolphinscheduler.server.worker.runner.WorkerTaskExecutor;
+import
org.apache.dolphinscheduler.server.worker.runner.WorkerTaskExecutorFactoryBuilder;
+import
org.apache.dolphinscheduler.server.worker.runner.WorkerTaskExecutorThreadPool;
import lombok.extern.slf4j.Slf4j;
@@ -41,15 +42,16 @@ public class TaskInstanceDispatchOperationFunction
private WorkerConfig workerConfig;
@Autowired
- private GlobalTaskInstanceWaitingQueue globalTaskInstanceWaitingQueue;
+ private WorkerTaskExecutorFactoryBuilder workerTaskExecutorFactoryBuilder;
+
+ @Autowired
+ private WorkerTaskExecutorThreadPool workerTaskExecutorThreadPool;
@Override
public TaskInstanceDispatchResponse operate(TaskInstanceDispatchRequest
taskInstanceDispatchRequest) {
log.info("Receive TaskInstanceDispatchRequest: {}",
taskInstanceDispatchRequest);
TaskExecutionContext taskExecutionContext =
taskInstanceDispatchRequest.getTaskExecutionContext();
try {
- // set cache, it will be used when kill task
-
TaskExecutionContextCacheManager.cacheTaskExecutionContext(taskExecutionContext);
taskExecutionContext.setHost(workerConfig.getWorkerAddress());
taskExecutionContext.setLogPath(LogUtils.getTaskInstanceLogFullPath(taskExecutionContext));
@@ -57,9 +59,11 @@ public class TaskInstanceDispatchOperationFunction
taskExecutionContext.getTaskInstanceId());
TaskMetrics.incrTaskTypeExecuteCount(taskExecutionContext.getTaskType());
- if
(!globalTaskInstanceWaitingQueue.addDispatchTask(taskExecutionContext)) {
- log.error("Submit task: {} to wait queue error, current queue
size: {} is full",
- taskExecutionContext.getTaskName(),
workerConfig.getExecThreads());
+ WorkerTaskExecutor workerTaskExecutor =
workerTaskExecutorFactoryBuilder
+
.createWorkerTaskExecutorFactory(taskExecutionContext).createWorkerTaskExecutor();
+ // todo: hold the workerTaskExecutor
+ if
(!workerTaskExecutorThreadPool.submitWorkerTaskExecutor(workerTaskExecutor)) {
+ log.info("Submit task: {} to wait queue failed",
taskExecutionContext.getTaskName());
return
TaskInstanceDispatchResponse.failed(taskExecutionContext.getTaskInstanceId(),
"WorkerManagerThread is full");
} else {
diff --git
a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/operator/TaskInstanceKillOperationFunction.java
b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/operator/TaskInstanceKillOperationFunction.java
index c5f4ffb78b..69e3994a90 100644
---
a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/operator/TaskInstanceKillOperationFunction.java
+++
b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/operator/TaskInstanceKillOperationFunction.java
@@ -23,13 +23,13 @@ import
org.apache.dolphinscheduler.extract.worker.transportor.TaskInstanceKillRe
import org.apache.dolphinscheduler.plugin.task.api.AbstractTask;
import org.apache.dolphinscheduler.plugin.task.api.TaskException;
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
-import
org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContextCacheManager;
import org.apache.dolphinscheduler.plugin.task.api.enums.TaskExecutionStatus;
import org.apache.dolphinscheduler.plugin.task.api.utils.LogUtils;
import org.apache.dolphinscheduler.plugin.task.api.utils.ProcessUtils;
import org.apache.dolphinscheduler.server.worker.message.MessageRetryRunner;
-import org.apache.dolphinscheduler.server.worker.runner.WorkerManagerThread;
import org.apache.dolphinscheduler.server.worker.runner.WorkerTaskExecutor;
+import
org.apache.dolphinscheduler.server.worker.runner.WorkerTaskExecutorHolder;
+import
org.apache.dolphinscheduler.server.worker.runner.WorkerTaskExecutorThreadPool;
import lombok.extern.slf4j.Slf4j;
@@ -45,7 +45,7 @@ public class TaskInstanceKillOperationFunction
ITaskInstanceOperationFunction<TaskInstanceKillRequest,
TaskInstanceKillResponse> {
@Autowired
- private WorkerManagerThread workerManager;
+ private WorkerTaskExecutorThreadPool workerManager;
@Autowired
private MessageRetryRunner messageRetryRunner;
@@ -57,22 +57,24 @@ public class TaskInstanceKillOperationFunction
int taskInstanceId = taskInstanceKillRequest.getTaskInstanceId();
try {
LogUtils.setTaskInstanceIdMDC(taskInstanceId);
- TaskExecutionContext taskExecutionContext =
-
TaskExecutionContextCacheManager.getByTaskInstanceId(taskInstanceId);
- if (taskExecutionContext == null) {
- log.error("Cannot find TaskExecutionContext for taskInstance:
{}", taskInstanceId);
- return TaskInstanceKillResponse.fail("Cannot find
TaskExecutionContext");
+ WorkerTaskExecutor workerTaskExecutor =
WorkerTaskExecutorHolder.get(taskInstanceId);
+ if (workerTaskExecutor == null) {
+ log.error("Cannot find WorkerTaskExecutor for taskInstance:
{}", taskInstanceId);
+ return TaskInstanceKillResponse.fail("Cannot find
WorkerTaskExecutor");
}
+ TaskExecutionContext taskExecutionContext =
workerTaskExecutor.getTaskExecutionContext();
+
LogUtils.setTaskInstanceLogFullPathMDC(taskExecutionContext.getLogPath());
boolean result = doKill(taskExecutionContext);
- this.cancelApplication(taskInstanceId);
+ this.cancelApplication(workerTaskExecutor);
int processId = taskExecutionContext.getProcessId();
if (processId == 0) {
workerManager.killTaskBeforeExecuteByInstanceId(taskInstanceId);
taskExecutionContext.setCurrentExecutionStatus(TaskExecutionStatus.KILL);
-
TaskExecutionContextCacheManager.removeByTaskInstanceId(taskInstanceId);
+ // todo: the task might be executed, but the processId is 0
+ WorkerTaskExecutorHolder.remove(taskInstanceId);
log.info("The task has not been executed and has been
cancelled, task id:{}", taskInstanceId);
return TaskInstanceKillResponse.success(taskExecutionContext);
}
@@ -80,7 +82,7 @@ public class TaskInstanceKillOperationFunction
taskExecutionContext
.setCurrentExecutionStatus(result ?
TaskExecutionStatus.SUCCESS : TaskExecutionStatus.FAILURE);
-
TaskExecutionContextCacheManager.removeByTaskInstanceId(taskExecutionContext.getTaskInstanceId());
+
WorkerTaskExecutorHolder.remove(taskExecutionContext.getTaskInstanceId());
messageRetryRunner.removeRetryMessages(taskExecutionContext.getTaskInstanceId());
return TaskInstanceKillResponse.success(taskExecutionContext);
} finally {
@@ -102,15 +104,11 @@ public class TaskInstanceKillOperationFunction
return processFlag;
}
- protected void cancelApplication(int taskInstanceId) {
- WorkerTaskExecutor workerTaskExecutor =
workerManager.getTaskExecuteThread(taskInstanceId);
- if (workerTaskExecutor == null) {
- log.warn("taskExecuteThread not found, taskInstanceId:{}",
taskInstanceId);
- return;
- }
+ protected void cancelApplication(WorkerTaskExecutor workerTaskExecutor) {
AbstractTask task = workerTaskExecutor.getTask();
if (task == null) {
- log.warn("task not found, taskInstanceId:{}", taskInstanceId);
+ log.warn("task not found, taskInstanceId: {}",
+
workerTaskExecutor.getTaskExecutionContext().getTaskInstanceId());
return;
}
try {
@@ -118,7 +116,8 @@ public class TaskInstanceKillOperationFunction
} catch (Exception e) {
log.error("kill task error", e);
}
- log.info("kill task by cancelApplication, task id:{}", taskInstanceId);
+ log.info("kill task by cancelApplication, taskInstanceId: {}",
+
workerTaskExecutor.getTaskExecutionContext().getTaskInstanceId());
}
protected boolean killProcess(String tenantCode, Integer processId) {
diff --git
a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/operator/UpdateWorkflowHostOperationFunction.java
b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/operator/UpdateWorkflowHostOperationFunction.java
index 6d31feff7e..7485b9230f 100644
---
a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/operator/UpdateWorkflowHostOperationFunction.java
+++
b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/operator/UpdateWorkflowHostOperationFunction.java
@@ -20,9 +20,10 @@ package
org.apache.dolphinscheduler.server.worker.runner.operator;
import
org.apache.dolphinscheduler.extract.worker.transportor.UpdateWorkflowHostRequest;
import
org.apache.dolphinscheduler.extract.worker.transportor.UpdateWorkflowHostResponse;
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
-import
org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContextCacheManager;
import org.apache.dolphinscheduler.plugin.task.api.utils.LogUtils;
import org.apache.dolphinscheduler.server.worker.message.MessageRetryRunner;
+import org.apache.dolphinscheduler.server.worker.runner.WorkerTaskExecutor;
+import
org.apache.dolphinscheduler.server.worker.runner.WorkerTaskExecutorHolder;
import lombok.extern.slf4j.Slf4j;
@@ -47,21 +48,29 @@ public class UpdateWorkflowHostOperationFunction
LogUtils.setTaskInstanceIdMDC(taskInstanceId);
log.info("Received UpdateWorkflowHostRequest: {}",
updateWorkflowHostRequest);
- TaskExecutionContext taskExecutionContext =
-
TaskExecutionContextCacheManager.getByTaskInstanceId(taskInstanceId);
- if (taskExecutionContext == null) {
- log.error("Cannot find the taskExecutionContext for
taskInstance : {}", taskInstanceId);
- return UpdateWorkflowHostResponse.failed("Cannot find the
taskExecutionContext");
+ boolean updateWorkerTaskExecutor =
updateHostInWorkflowTaskExecutor(taskInstanceId, workflowHost);
+ boolean updateMessage = updateHostInMessage(taskInstanceId,
workflowHost);
+ if (updateWorkerTaskExecutor || updateMessage) {
+ return UpdateWorkflowHostResponse.success();
}
-
-
LogUtils.setTaskInstanceLogFullPathMDC(taskExecutionContext.getLogPath());
- taskExecutionContext.setWorkflowInstanceHost(workflowHost);
- messageRetryRunner.updateMessageHost(taskInstanceId, workflowHost);
- log.info("Success update workflow host: {} for taskInstance: {}",
workflowHost, taskInstanceId);
- return UpdateWorkflowHostResponse.success();
+ return UpdateWorkflowHostResponse.failed("The taskInstance is not
in the worker");
} finally {
LogUtils.removeTaskInstanceIdMDC();
LogUtils.removeTaskInstanceLogFullPathMDC();
}
}
+
+ private boolean updateHostInWorkflowTaskExecutor(int taskInstanceId,
String workflowHost) {
+ WorkerTaskExecutor workerTaskExecutor =
WorkerTaskExecutorHolder.get(taskInstanceId);
+ if (workerTaskExecutor == null) {
+ return false;
+ }
+ TaskExecutionContext taskExecutionContext =
workerTaskExecutor.getTaskExecutionContext();
+ taskExecutionContext.setWorkflowInstanceHost(workflowHost);
+ return true;
+ }
+
+ private boolean updateHostInMessage(int taskInstanceId, String
workflowHost) {
+ return messageRetryRunner.updateMessageHost(taskInstanceId,
workflowHost);
+ }
}
diff --git
a/dolphinscheduler-worker/src/test/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistryClientTest.java
b/dolphinscheduler-worker/src/test/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistryClientTest.java
index d55ccc25af..e6043486f5 100644
---
a/dolphinscheduler-worker/src/test/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistryClientTest.java
+++
b/dolphinscheduler-worker/src/test/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistryClientTest.java
@@ -23,7 +23,7 @@ import org.apache.dolphinscheduler.common.utils.NetUtils;
import org.apache.dolphinscheduler.registry.api.RegistryClient;
import org.apache.dolphinscheduler.registry.api.enums.RegistryNodeType;
import org.apache.dolphinscheduler.server.worker.config.WorkerConfig;
-import org.apache.dolphinscheduler.server.worker.runner.WorkerManagerThread;
+import
org.apache.dolphinscheduler.server.worker.runner.WorkerTaskExecutorThreadPool;
import java.time.Duration;
import java.util.Set;
@@ -67,7 +67,7 @@ public class WorkerRegistryClientTest {
private ScheduledExecutorService heartBeatExecutor;
@Mock
- private WorkerManagerThread workerManagerThread;
+ private WorkerTaskExecutorThreadPool workerManagerThread;
@Mock
private WorkerConnectStrategy workerConnectStrategy;
diff --git
a/dolphinscheduler-worker/src/test/java/org/apache/dolphinscheduler/server/worker/runner/WorkerTaskExecutorThreadPoolTest.java
b/dolphinscheduler-worker/src/test/java/org/apache/dolphinscheduler/server/worker/runner/WorkerTaskExecutorThreadPoolTest.java
new file mode 100644
index 0000000000..988f1f7fec
--- /dev/null
+++
b/dolphinscheduler-worker/src/test/java/org/apache/dolphinscheduler/server/worker/runner/WorkerTaskExecutorThreadPoolTest.java
@@ -0,0 +1,194 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.server.worker.runner;
+
+import org.apache.dolphinscheduler.common.enums.ResUploadType;
+import org.apache.dolphinscheduler.plugin.storage.api.StorageEntity;
+import org.apache.dolphinscheduler.plugin.storage.api.StorageOperate;
+import org.apache.dolphinscheduler.plugin.task.api.TaskCallBack;
+import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
+import org.apache.dolphinscheduler.plugin.task.api.TaskPluginManager;
+import
org.apache.dolphinscheduler.server.worker.config.TaskExecuteThreadsFullPolicy;
+import org.apache.dolphinscheduler.server.worker.config.WorkerConfig;
+import org.apache.dolphinscheduler.server.worker.registry.WorkerRegistryClient;
+import org.apache.dolphinscheduler.server.worker.rpc.WorkerMessageSender;
+import org.apache.dolphinscheduler.spi.enums.ResourceType;
+
+import java.io.IOException;
+import java.util.List;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+class WorkerTaskExecutorThreadPoolTest {
+
+ @Test
+ public void testIsOverload() {
+ WorkerConfig workerConfig = new WorkerConfig();
+ workerConfig.setExecThreads(1);
+
workerConfig.setTaskExecuteThreadsFullPolicy(TaskExecuteThreadsFullPolicy.CONTINUE);
+ WorkerTaskExecutorThreadPool workerTaskExecutorThreadPool = new
WorkerTaskExecutorThreadPool(workerConfig);
+ // submit 100 task, the thread pool size is 1
+ // assert the overload should be true
+ // assert the submitQueue should be 99
+ for (int i = 0; i < 100; i++) {
+ boolean submitResult =
+ workerTaskExecutorThreadPool.submitWorkerTaskExecutor(new
MockWorkerTaskExecutor(() -> {
+ try {
+ Thread.sleep(10_000L);
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ }));
+ Assertions.assertTrue(submitResult);
+ }
+ Assertions.assertTrue(workerTaskExecutorThreadPool.isOverload());
+ Assertions.assertEquals(99,
workerTaskExecutorThreadPool.getWaitingTaskExecutorSize());
+ Assertions.assertEquals(1,
workerTaskExecutorThreadPool.getRunningTaskExecutorSize());
+ }
+
+ static class MockWorkerTaskExecutor extends WorkerTaskExecutor {
+
+ private final Runnable runnable;
+
+ protected MockWorkerTaskExecutor(Runnable runnable) {
+ super(TaskExecutionContext.builder().taskInstanceId((int)
System.nanoTime()).build(), new WorkerConfig(),
+ new WorkerMessageSender(), new TaskPluginManager(), new
StorageOperate() {
+
+ @Override
+ public void createTenantDirIfNotExists(String
tenantCode) {
+
+ }
+
+ @Override
+ public String getResDir(String tenantCode) {
+ return null;
+ }
+
+ @Override
+ public String getUdfDir(String tenantCode) {
+ return null;
+ }
+
+ @Override
+ public boolean mkdir(String tenantCode, String path)
throws IOException {
+ return false;
+ }
+
+ @Override
+ public String getResourceFullName(String tenantCode,
String fileName) {
+ return null;
+ }
+
+ @Override
+ public String getResourceFileName(String tenantCode,
String fullName) {
+ return null;
+ }
+
+ @Override
+ public String getFileName(ResourceType resourceType,
String tenantCode, String fileName) {
+ return null;
+ }
+
+ @Override
+ public boolean exists(String fullName) {
+ return false;
+ }
+
+ @Override
+ public boolean delete(String filePath, boolean
recursive) {
+ return false;
+ }
+
+ @Override
+ public boolean delete(String filePath, List<String>
childrenPathArray,
+ boolean recursive) {
+ return false;
+ }
+
+ @Override
+ public boolean copy(String srcPath, String dstPath,
boolean deleteSource,
+ boolean overwrite) {
+ return false;
+ }
+
+ @Override
+ public String getDir(ResourceType resourceType, String
tenantCode) {
+ return null;
+ }
+
+ @Override
+ public boolean upload(String tenantCode, String
srcFile, String dstPath, boolean deleteSource,
+ boolean overwrite) {
+ return false;
+ }
+
+ @Override
+ public void download(String srcFilePath, String
dstFile, boolean overwrite) {
+
+ }
+
+ @Override
+ public List<String> vimFile(String tenantCode, String
filePath, int skipLineNums,
+ int limit) {
+ return null;
+ }
+
+ @Override
+ public void deleteTenant(String tenantCode) {
+
+ }
+
+ @Override
+ public ResUploadType returnStorageType() {
+ return null;
+ }
+
+ @Override
+ public List<StorageEntity>
listFilesStatusRecursively(String path, String defaultPath,
+
String tenantCode, ResourceType type) {
+ return null;
+ }
+
+ @Override
+ public List<StorageEntity> listFilesStatus(String
path, String defaultPath, String tenantCode,
+
ResourceType type) throws Exception {
+ return null;
+ }
+
+ @Override
+ public StorageEntity getFileStatus(String path, String
defaultPath, String tenantCode,
+ ResourceType type)
throws Exception {
+ return null;
+ }
+ }, new WorkerRegistryClient());
+ this.runnable = runnable;
+ }
+
+ @Override
+ public void run() {
+ executeTask(new TaskCallbackImpl(null, null));
+ }
+
+ @Override
+ protected void executeTask(TaskCallBack taskCallBack) {
+ runnable.run();
+ }
+ }
+
+}