This is an automated email from the ASF dual-hosted git repository.
xincheng pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git
The following commit(s) were added to refs/heads/dev by this push:
new e3bd26322f [Improvement][UT] Improve Worker runner coverage (#15428)
e3bd26322f is described below
commit e3bd26322fcb75f620c5733c6f009823fb74dc3a
Author: John Huang <[email protected]>
AuthorDate: Tue Mar 5 18:32:32 2024 +0800
[Improvement][UT] Improve Worker runner coverage (#15428)
Co-authored-by: Rick Cheng <[email protected]>
Co-authored-by: caishunfeng <[email protected]>
---
.../runner/WorkerTaskExecutorFactoryBuilder.java | 15 ++
...anceExecutionEventAckListenFunctionManager.java | 9 +
...tanceExecutionFinishEventAckListenFunction.java | 4 +
...nstanceExecutionInfoEventAckListenFunction.java | 4 +
...anceExecutionRunningEventAckListenFunction.java | 3 +
.../TaskInstanceDispatchOperationFunction.java | 9 +
.../TaskInstanceKillOperationFunction.java | 7 +
.../TaskInstanceOperationFunctionManager.java | 11 +
.../UpdateWorkflowHostOperationFunction.java | 4 +
...nstanceExecutionEventAckListenFunctionTest.java | 104 ++++++++
.../TaskInstanceOperationFunctionTest.java | 280 +++++++++++++++++++++
11 files changed, 450 insertions(+)
diff --git
a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerTaskExecutorFactoryBuilder.java
b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerTaskExecutorFactoryBuilder.java
index a9c2948482..599746818d 100644
---
a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerTaskExecutorFactoryBuilder.java
+++
b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerTaskExecutorFactoryBuilder.java
@@ -48,6 +48,21 @@ public class WorkerTaskExecutorFactoryBuilder {
@Autowired
private WorkerRegistryClient workerRegistryClient;
+ public WorkerTaskExecutorFactoryBuilder(
+ WorkerConfig workerConfig,
+ WorkerMessageSender
workerMessageSender,
+ TaskPluginManager
taskPluginManager,
+ WorkerTaskExecutorThreadPool
workerManager,
+ StorageOperate storageOperate,
+ WorkerRegistryClient
workerRegistryClient) {
+ this.workerConfig = workerConfig;
+ this.workerMessageSender = workerMessageSender;
+ this.taskPluginManager = taskPluginManager;
+ this.workerManager = workerManager;
+ this.storageOperate = storageOperate;
+ this.workerRegistryClient = workerRegistryClient;
+ }
+
public WorkerTaskExecutorFactory<? extends WorkerTaskExecutor>
createWorkerTaskExecutorFactory(TaskExecutionContext taskExecutionContext) {
return new DefaultWorkerTaskExecutorFactory(taskExecutionContext,
workerConfig,
diff --git
a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/listener/TaskInstanceExecutionEventAckListenFunctionManager.java
b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/listener/TaskInstanceExecutionEventAckListenFunctionManager.java
index b4423f4880..3214be8c89 100644
---
a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/listener/TaskInstanceExecutionEventAckListenFunctionManager.java
+++
b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/listener/TaskInstanceExecutionEventAckListenFunctionManager.java
@@ -35,6 +35,15 @@ public class
TaskInstanceExecutionEventAckListenFunctionManager {
@Autowired
private TaskInstanceExecutionInfoEventAckListenFunction
taskInstanceExecutionInfoEventAckListenFunction;
+ public TaskInstanceExecutionEventAckListenFunctionManager(
+
TaskInstanceExecutionRunningEventAckListenFunction
taskInstanceExecutionRunningEventAckListenFunction,
+
TaskInstanceExecutionFinishEventAckListenFunction
taskInstanceExecutionFinishEventAckListenFunction,
+
TaskInstanceExecutionInfoEventAckListenFunction
taskInstanceExecutionInfoEventAckListenFunction) {
+ this.taskInstanceExecutionRunningEventAckListenFunction =
taskInstanceExecutionRunningEventAckListenFunction;
+ this.taskInstanceExecutionFinishEventAckListenFunction =
taskInstanceExecutionFinishEventAckListenFunction;
+ this.taskInstanceExecutionInfoEventAckListenFunction =
taskInstanceExecutionInfoEventAckListenFunction;
+ }
+
public TaskInstanceExecutionRunningEventAckListenFunction
getTaskInstanceExecutionRunningEventAckListenFunction() {
return taskInstanceExecutionRunningEventAckListenFunction;
}
diff --git
a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/listener/TaskInstanceExecutionFinishEventAckListenFunction.java
b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/listener/TaskInstanceExecutionFinishEventAckListenFunction.java
index ad7892bc7a..a358623519 100644
---
a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/listener/TaskInstanceExecutionFinishEventAckListenFunction.java
+++
b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/listener/TaskInstanceExecutionFinishEventAckListenFunction.java
@@ -36,6 +36,10 @@ public class
TaskInstanceExecutionFinishEventAckListenFunction
@Autowired
private MessageRetryRunner messageRetryRunner;
+ public
TaskInstanceExecutionFinishEventAckListenFunction(MessageRetryRunner
messageRetryRunner) {
+ this.messageRetryRunner = messageRetryRunner;
+ }
+
@Override
public void
handleTaskInstanceExecutionEventAck(TaskInstanceExecutionFinishEventAck
taskInstanceExecutionFinishEventAck) {
try {
diff --git
a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/listener/TaskInstanceExecutionInfoEventAckListenFunction.java
b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/listener/TaskInstanceExecutionInfoEventAckListenFunction.java
index 971343103a..b3dcc9bf8a 100644
---
a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/listener/TaskInstanceExecutionInfoEventAckListenFunction.java
+++
b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/listener/TaskInstanceExecutionInfoEventAckListenFunction.java
@@ -37,6 +37,10 @@ public class TaskInstanceExecutionInfoEventAckListenFunction
@Resource
private MessageRetryRunner messageRetryRunner;
+ public TaskInstanceExecutionInfoEventAckListenFunction(MessageRetryRunner
messageRetryRunner) {
+ this.messageRetryRunner = messageRetryRunner;
+ }
+
@Override
public void
handleTaskInstanceExecutionEventAck(TaskInstanceExecutionInfoEventAck
taskInstanceExecutionInfoEventAck) {
try {
diff --git
a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/listener/TaskInstanceExecutionRunningEventAckListenFunction.java
b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/listener/TaskInstanceExecutionRunningEventAckListenFunction.java
index 9d6de78e02..e17d72ad99 100644
---
a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/listener/TaskInstanceExecutionRunningEventAckListenFunction.java
+++
b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/listener/TaskInstanceExecutionRunningEventAckListenFunction.java
@@ -36,6 +36,9 @@ public class
TaskInstanceExecutionRunningEventAckListenFunction
@Autowired
private MessageRetryRunner messageRetryRunner;
+ public
TaskInstanceExecutionRunningEventAckListenFunction(MessageRetryRunner
messageRetryRunner) {
+ this.messageRetryRunner = messageRetryRunner;
+ }
@Override
public void
handleTaskInstanceExecutionEventAck(TaskInstanceExecutionRunningEventAck
taskInstanceExecutionRunningEventAck) {
try {
diff --git
a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/operator/TaskInstanceDispatchOperationFunction.java
b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/operator/TaskInstanceDispatchOperationFunction.java
index e6d259412f..fc128a9a34 100644
---
a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/operator/TaskInstanceDispatchOperationFunction.java
+++
b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/operator/TaskInstanceDispatchOperationFunction.java
@@ -48,6 +48,15 @@ public class TaskInstanceDispatchOperationFunction
@Autowired
private WorkerTaskExecutorThreadPool workerTaskExecutorThreadPool;
+ public TaskInstanceDispatchOperationFunction(
+ WorkerConfig workerConfig,
+
WorkerTaskExecutorFactoryBuilder workerTaskExecutorFactoryBuilder,
+ WorkerTaskExecutorThreadPool
workerTaskExecutorThreadPool) {
+ this.workerConfig = workerConfig;
+ this.workerTaskExecutorFactoryBuilder =
workerTaskExecutorFactoryBuilder;
+ this.workerTaskExecutorThreadPool = workerTaskExecutorThreadPool;
+ }
+
@Override
public TaskInstanceDispatchResponse operate(TaskInstanceDispatchRequest
taskInstanceDispatchRequest) {
log.info("Receive TaskInstanceDispatchRequest: {}",
taskInstanceDispatchRequest);
diff --git
a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/operator/TaskInstanceKillOperationFunction.java
b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/operator/TaskInstanceKillOperationFunction.java
index 69e3994a90..d55765d23f 100644
---
a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/operator/TaskInstanceKillOperationFunction.java
+++
b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/operator/TaskInstanceKillOperationFunction.java
@@ -50,6 +50,13 @@ public class TaskInstanceKillOperationFunction
@Autowired
private MessageRetryRunner messageRetryRunner;
+ public TaskInstanceKillOperationFunction(
+ WorkerTaskExecutorThreadPool
workerManager,
+ MessageRetryRunner
messageRetryRunner) {
+ this.workerManager = workerManager;
+ this.messageRetryRunner = messageRetryRunner;
+ }
+
@Override
public TaskInstanceKillResponse operate(TaskInstanceKillRequest
taskInstanceKillRequest) {
log.info("Receive TaskInstanceKillRequest: {}",
taskInstanceKillRequest);
diff --git
a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/operator/TaskInstanceOperationFunctionManager.java
b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/operator/TaskInstanceOperationFunctionManager.java
index 99ae193b47..8014b88fd1 100644
---
a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/operator/TaskInstanceOperationFunctionManager.java
+++
b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/operator/TaskInstanceOperationFunctionManager.java
@@ -35,6 +35,17 @@ public class TaskInstanceOperationFunctionManager {
@Autowired
private TaskInstancePauseOperationFunction
taskInstancePauseOperationFunction;
+ public TaskInstanceOperationFunctionManager(
+
TaskInstanceKillOperationFunction taskInstanceKillOperationFunction,
+
UpdateWorkflowHostOperationFunction updateWorkflowHostOperationFunction,
+
TaskInstanceDispatchOperationFunction taskInstanceDispatchOperationFunction,
+
TaskInstancePauseOperationFunction taskInstancePauseOperationFunction) {
+ this.taskInstanceKillOperationFunction =
taskInstanceKillOperationFunction;
+ this.updateWorkflowHostOperationFunction =
updateWorkflowHostOperationFunction;
+ this.taskInstanceDispatchOperationFunction =
taskInstanceDispatchOperationFunction;
+ this.taskInstancePauseOperationFunction =
taskInstancePauseOperationFunction;
+ }
+
public TaskInstanceKillOperationFunction
getTaskInstanceKillOperationFunction() {
return taskInstanceKillOperationFunction;
}
diff --git
a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/operator/UpdateWorkflowHostOperationFunction.java
b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/operator/UpdateWorkflowHostOperationFunction.java
index 7485b9230f..c0ab345450 100644
---
a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/operator/UpdateWorkflowHostOperationFunction.java
+++
b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/operator/UpdateWorkflowHostOperationFunction.java
@@ -39,6 +39,10 @@ public class UpdateWorkflowHostOperationFunction
@Autowired
private MessageRetryRunner messageRetryRunner;
+ public UpdateWorkflowHostOperationFunction(MessageRetryRunner
messageRetryRunner) {
+ this.messageRetryRunner = messageRetryRunner;
+ }
+
@Override
public UpdateWorkflowHostResponse operate(UpdateWorkflowHostRequest
updateWorkflowHostRequest) {
try {
diff --git
a/dolphinscheduler-worker/src/test/java/org/apache/dolphinscheduler/server/worker/runner/listener/TaskInstanceExecutionEventAckListenFunctionTest.java
b/dolphinscheduler-worker/src/test/java/org/apache/dolphinscheduler/server/worker/runner/listener/TaskInstanceExecutionEventAckListenFunctionTest.java
new file mode 100644
index 0000000000..5044fba11e
--- /dev/null
+++
b/dolphinscheduler-worker/src/test/java/org/apache/dolphinscheduler/server/worker/runner/listener/TaskInstanceExecutionEventAckListenFunctionTest.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.server.worker.runner.listener;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.mockito.Mockito.times;
+
+import
org.apache.dolphinscheduler.extract.master.transportor.ITaskInstanceExecutionEvent;
+import
org.apache.dolphinscheduler.extract.worker.transportor.TaskInstanceExecutionFinishEventAck;
+import
org.apache.dolphinscheduler.extract.worker.transportor.TaskInstanceExecutionInfoEventAck;
+import
org.apache.dolphinscheduler.extract.worker.transportor.TaskInstanceExecutionRunningEventAck;
+import org.apache.dolphinscheduler.server.worker.message.MessageRetryRunner;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+import org.mockito.ArgumentCaptor;
+import org.mockito.Mockito;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class TaskInstanceExecutionEventAckListenFunctionTest {
+
+ private static final Logger log =
LoggerFactory.getLogger(TaskInstanceExecutionEventAckListenFunctionTest.class);
+ private MessageRetryRunner messageRetryRunner =
Mockito.mock(MessageRetryRunner.class);
+
+ @Test
+ public void testTaskInstanceExecutionEventAckListenFunctionManager() {
+ TaskInstanceExecutionFinishEventAckListenFunction
taskInstanceExecutionFinishEventAckListenFunction =
+ new
TaskInstanceExecutionFinishEventAckListenFunction(messageRetryRunner);
+ TaskInstanceExecutionInfoEventAckListenFunction
taskInstanceExecutionInfoEventAckListenFunction =
+ new
TaskInstanceExecutionInfoEventAckListenFunction(messageRetryRunner);
+ TaskInstanceExecutionRunningEventAckListenFunction
taskInstanceExecutionRunningEventAckListenFunction =
+ new
TaskInstanceExecutionRunningEventAckListenFunction(messageRetryRunner);
+ TaskInstanceExecutionEventAckListenFunctionManager
taskInstanceExecutionEventAckListenFunctionManager =
+ new TaskInstanceExecutionEventAckListenFunctionManager(
+ taskInstanceExecutionRunningEventAckListenFunction,
+ taskInstanceExecutionFinishEventAckListenFunction,
+ taskInstanceExecutionInfoEventAckListenFunction);
+
Assertions.assertEquals(taskInstanceExecutionRunningEventAckListenFunction,
+ taskInstanceExecutionEventAckListenFunctionManager
+
.getTaskInstanceExecutionRunningEventAckListenFunction());
+
Assertions.assertEquals(taskInstanceExecutionInfoEventAckListenFunction,
+ taskInstanceExecutionEventAckListenFunctionManager
+ .getTaskInstanceExecutionInfoEventAckListenFunction());
+
Assertions.assertEquals(taskInstanceExecutionFinishEventAckListenFunction,
+ taskInstanceExecutionEventAckListenFunctionManager
+
.getTaskInstanceExecutionFinishEventAckListenFunction());
+ }
+
+ @Test
+ public void testTaskInstanceExecutionEventAckListenFunctionDryRun() {
+ int taskInstanceId1 = 111;
+ int taskInstanceId2 = 222;
+ int taskInstanceId3 = 333;
+ TaskInstanceExecutionFinishEventAckListenFunction
taskInstanceExecutionFinishEventAckListenFunction =
+ new
TaskInstanceExecutionFinishEventAckListenFunction(messageRetryRunner);
+
taskInstanceExecutionFinishEventAckListenFunction.handleTaskInstanceExecutionEventAck(
+ TaskInstanceExecutionFinishEventAck.success(taskInstanceId1));
+
+ ArgumentCaptor acInt = ArgumentCaptor.forClass(int.class);
+ ArgumentCaptor acEventType =
+
ArgumentCaptor.forClass(ITaskInstanceExecutionEvent.TaskInstanceExecutionEventType.class);
+
+ Mockito.verify(messageRetryRunner, times(1)).removeRetryMessage(
+ (int) acInt.capture(),
+ (ITaskInstanceExecutionEvent.TaskInstanceExecutionEventType)
acEventType.capture());
+
+ assertEquals(taskInstanceId1, acInt.getValue());
+
+ TaskInstanceExecutionInfoEventAckListenFunction
taskInstanceExecutionInfoEventAckListenFunction =
+ new
TaskInstanceExecutionInfoEventAckListenFunction(messageRetryRunner);
+
taskInstanceExecutionInfoEventAckListenFunction.handleTaskInstanceExecutionEventAck(
+ TaskInstanceExecutionInfoEventAck.success(taskInstanceId2));
+
+ Mockito.verify(messageRetryRunner, times(2)).removeRetryMessage(
+ (int) acInt.capture(),
+ (ITaskInstanceExecutionEvent.TaskInstanceExecutionEventType)
acEventType.capture());
+ assertEquals(taskInstanceId2, acInt.getValue());
+
+ TaskInstanceExecutionRunningEventAckListenFunction
taskInstanceExecutionRunningEventAckListenFunction =
+ new
TaskInstanceExecutionRunningEventAckListenFunction(messageRetryRunner);
+
taskInstanceExecutionRunningEventAckListenFunction.handleTaskInstanceExecutionEventAck(
+ TaskInstanceExecutionRunningEventAck.success(taskInstanceId3));
+ Mockito.verify(messageRetryRunner, times(3)).removeRetryMessage(
+ (int) acInt.capture(),
+ (ITaskInstanceExecutionEvent.TaskInstanceExecutionEventType)
acEventType.capture());
+ assertEquals(taskInstanceId3, acInt.getValue());
+ }
+}
diff --git
a/dolphinscheduler-worker/src/test/java/org/apache/dolphinscheduler/server/worker/runner/operator/TaskInstanceOperationFunctionTest.java
b/dolphinscheduler-worker/src/test/java/org/apache/dolphinscheduler/server/worker/runner/operator/TaskInstanceOperationFunctionTest.java
new file mode 100644
index 0000000000..592340214f
--- /dev/null
+++
b/dolphinscheduler-worker/src/test/java/org/apache/dolphinscheduler/server/worker/runner/operator/TaskInstanceOperationFunctionTest.java
@@ -0,0 +1,280 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.server.worker.runner.operator;
+
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.BDDMockito.given;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import
org.apache.dolphinscheduler.extract.worker.transportor.TaskInstanceDispatchRequest;
+import
org.apache.dolphinscheduler.extract.worker.transportor.TaskInstanceDispatchResponse;
+import
org.apache.dolphinscheduler.extract.worker.transportor.TaskInstanceKillRequest;
+import
org.apache.dolphinscheduler.extract.worker.transportor.TaskInstanceKillResponse;
+import
org.apache.dolphinscheduler.extract.worker.transportor.TaskInstancePauseRequest;
+import
org.apache.dolphinscheduler.extract.worker.transportor.TaskInstancePauseResponse;
+import
org.apache.dolphinscheduler.extract.worker.transportor.UpdateWorkflowHostRequest;
+import
org.apache.dolphinscheduler.extract.worker.transportor.UpdateWorkflowHostResponse;
+import org.apache.dolphinscheduler.plugin.storage.api.StorageOperate;
+import org.apache.dolphinscheduler.plugin.task.api.AbstractTask;
+import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
+import org.apache.dolphinscheduler.plugin.task.api.TaskPluginManager;
+import org.apache.dolphinscheduler.plugin.task.api.utils.LogUtils;
+import org.apache.dolphinscheduler.server.worker.config.WorkerConfig;
+import org.apache.dolphinscheduler.server.worker.message.MessageRetryRunner;
+import org.apache.dolphinscheduler.server.worker.registry.WorkerRegistryClient;
+import org.apache.dolphinscheduler.server.worker.rpc.WorkerMessageSender;
+import org.apache.dolphinscheduler.server.worker.runner.WorkerTaskExecutor;
+import
org.apache.dolphinscheduler.server.worker.runner.WorkerTaskExecutorFactoryBuilder;
+import
org.apache.dolphinscheduler.server.worker.runner.WorkerTaskExecutorHolder;
+import
org.apache.dolphinscheduler.server.worker.runner.WorkerTaskExecutorThreadPool;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+import org.mockito.MockedStatic;
+import org.mockito.Mockito;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class TaskInstanceOperationFunctionTest {
+
+ private static final Logger log =
LoggerFactory.getLogger(TaskInstanceOperationFunctionTest.class);
+ private MessageRetryRunner messageRetryRunner =
Mockito.mock(MessageRetryRunner.class);
+
+ private WorkerConfig workerConfig = Mockito.mock(WorkerConfig.class);
+
+ private TaskExecutionContext taskExecutionContext =
Mockito.mock(TaskExecutionContext.class);
+
+ private WorkerTaskExecutorThreadPool workerTaskExecutorThreadPool =
+ Mockito.mock(WorkerTaskExecutorThreadPool.class);
+
+ private WorkerTaskExecutor workerTaskExecutor =
Mockito.mock(WorkerTaskExecutor.class);
+
+ private AbstractTask task = Mockito.mock(AbstractTask.class);
+
+ private WorkerMessageSender workerMessageSender =
Mockito.mock(WorkerMessageSender.class);
+
+ private TaskPluginManager taskPluginManager =
Mockito.mock(TaskPluginManager.class);
+
+ private WorkerTaskExecutorThreadPool workerManager =
Mockito.mock(WorkerTaskExecutorThreadPool.class);
+
+ private StorageOperate storageOperate = Mockito.mock(StorageOperate.class);
+
+ private WorkerRegistryClient workerRegistryClient =
Mockito.mock(WorkerRegistryClient.class);
+
+ @Test
+ public void testTaskInstanceOperationFunctionManager() {
+ TaskInstanceKillOperationFunction taskInstanceKillOperationFunction =
new TaskInstanceKillOperationFunction(
+ workerTaskExecutorThreadPool,
+ messageRetryRunner);
+
+ TaskInstancePauseOperationFunction taskInstancePauseOperationFunction =
+ new TaskInstancePauseOperationFunction();
+
+ UpdateWorkflowHostOperationFunction
updateWorkflowHostOperationFunction =
+ new UpdateWorkflowHostOperationFunction(
+ messageRetryRunner);
+
+ WorkerTaskExecutorFactoryBuilder workerTaskExecutorFactoryBuilder =
new WorkerTaskExecutorFactoryBuilder(
+ workerConfig,
+ workerMessageSender,
+ taskPluginManager,
+ workerManager,
+ storageOperate,
+ workerRegistryClient);
+
+ TaskInstanceDispatchOperationFunction
taskInstanceDispatchOperationFunction =
+ new TaskInstanceDispatchOperationFunction(
+ workerConfig,
+ workerTaskExecutorFactoryBuilder,
+ workerTaskExecutorThreadPool);
+
+ TaskInstanceOperationFunctionManager
taskInstanceOperationFunctionManager =
+ new TaskInstanceOperationFunctionManager(
+ taskInstanceKillOperationFunction,
+ updateWorkflowHostOperationFunction,
+ taskInstanceDispatchOperationFunction,
+ taskInstancePauseOperationFunction);
+
+ Assertions.assertEquals(taskInstanceKillOperationFunction,
+
taskInstanceOperationFunctionManager.getTaskInstanceKillOperationFunction());
+ Assertions.assertEquals(taskInstancePauseOperationFunction,
+
taskInstanceOperationFunctionManager.getTaskInstancePauseOperationFunction());
+ Assertions.assertEquals(updateWorkflowHostOperationFunction,
+
taskInstanceOperationFunctionManager.getUpdateWorkflowHostOperationFunction());
+ Assertions.assertEquals(taskInstanceDispatchOperationFunction,
+
taskInstanceOperationFunctionManager.getTaskInstanceDispatchOperationFunction());
+ }
+
+ @Test
+ public void testUpdateWorkflowHostOperationFunction() {
+ UpdateWorkflowHostOperationFunction
updateWorkflowHostOperationFunction =
+ new UpdateWorkflowHostOperationFunction(
+ messageRetryRunner);
+
+ try (MockedStatic<LogUtils> logUtilsMockedStatic =
Mockito.mockStatic(LogUtils.class)) {
+ logUtilsMockedStatic
+ .when(() -> LogUtils
+ .setTaskInstanceIdMDC(any(Integer.class)))
+ .then(invocationOnMock -> null);
+ UpdateWorkflowHostRequest request = new
UpdateWorkflowHostRequest();
+ request.setTaskInstanceId(1);
+ request.setWorkflowHost("host");
+ UpdateWorkflowHostResponse taskInstanceDispatchResponse =
updateWorkflowHostOperationFunction.operate(
+ request);
+ Assertions.assertEquals(taskInstanceDispatchResponse.isSuccess(),
false);
+ }
+
+ try (MockedStatic<LogUtils> logUtilsMockedStatic =
Mockito.mockStatic(LogUtils.class)) {
+ logUtilsMockedStatic
+ .when(() -> LogUtils
+ .setTaskInstanceIdMDC(any(Integer.class)))
+ .then(invocationOnMock -> null);
+
+ try (
+ MockedStatic<WorkerTaskExecutorHolder>
workerTaskExecutorHolderMockedStatic =
+
Mockito.mockStatic(WorkerTaskExecutorHolder.class)) {
+
given(workerTaskExecutor.getTaskExecutionContext()).willReturn(taskExecutionContext);
+ workerTaskExecutorHolderMockedStatic
+ .when(() ->
WorkerTaskExecutorHolder.get(any(Integer.class)))
+ .thenReturn(workerTaskExecutor);
+ int taskInstanceId = 111;
+ UpdateWorkflowHostRequest request = new
UpdateWorkflowHostRequest();
+ request.setTaskInstanceId(taskInstanceId);
+ request.setWorkflowHost("host");
+
+ UpdateWorkflowHostResponse taskInstanceDispatchResponse =
updateWorkflowHostOperationFunction.operate(
+ request);
+
Assertions.assertEquals(taskInstanceDispatchResponse.isSuccess(), true);
+ }
+ }
+ }
+
+ @Test
+ public void testTaskInstancePauseOperationFunction() {
+ TaskInstancePauseOperationFunction taskInstancePauseOperationFunction =
+ new TaskInstancePauseOperationFunction();
+
+ try (MockedStatic<LogUtils> logUtilsMockedStatic =
Mockito.mockStatic(LogUtils.class)) {
+ logUtilsMockedStatic
+ .when(() -> LogUtils
+ .setTaskInstanceIdMDC(any(Integer.class)))
+ .then(invocationOnMock -> null);
+ TaskInstancePauseRequest request = new TaskInstancePauseRequest();
+ request.setTaskInstanceId(1);
+ TaskInstancePauseResponse taskInstanceDispatchResponse =
taskInstancePauseOperationFunction.operate(
+ request);
+ Assertions.assertEquals(taskInstanceDispatchResponse.isSuccess(),
true);
+ }
+ }
+
+ @Test
+ public void testTaskInstanceDispatchOperationFunction() {
+ WorkerTaskExecutorFactoryBuilder workerTaskExecutorFactoryBuilder =
new WorkerTaskExecutorFactoryBuilder(
+ workerConfig,
+ workerMessageSender,
+ taskPluginManager,
+ workerManager,
+ storageOperate,
+ workerRegistryClient);
+
+ TaskInstanceDispatchOperationFunction
taskInstanceDispatchOperationFunction =
+ new TaskInstanceDispatchOperationFunction(
+ workerConfig,
+ workerTaskExecutorFactoryBuilder,
+ workerTaskExecutorThreadPool);
+
+ try (MockedStatic<LogUtils> logUtilsMockedStatic =
Mockito.mockStatic(LogUtils.class)) {
+ logUtilsMockedStatic
+ .when(() -> LogUtils
+
.getTaskInstanceLogFullPath(any(TaskExecutionContext.class)))
+ .thenReturn("test");
+ TaskInstanceDispatchResponse taskInstanceDispatchResponse =
taskInstanceDispatchOperationFunction.operate(
+ new TaskInstanceDispatchRequest(taskExecutionContext));
+
Assertions.assertEquals(taskInstanceDispatchResponse.isDispatchSuccess(),
false);
+ logUtilsMockedStatic.verify(times(1), () ->
LogUtils.removeWorkflowAndTaskInstanceIdMDC());
+
+
given(workerTaskExecutorThreadPool.submitWorkerTaskExecutor(any())).willReturn(true);
+ taskInstanceDispatchResponse =
taskInstanceDispatchOperationFunction.operate(
+ new TaskInstanceDispatchRequest(taskExecutionContext));
+
Assertions.assertEquals(taskInstanceDispatchResponse.isDispatchSuccess(), true);
+ logUtilsMockedStatic.verify(times(2), () ->
LogUtils.removeWorkflowAndTaskInstanceIdMDC());
+ }
+ }
+
+ @Test
+ public void testTaskInstanceKillOperationFunction() {
+ TaskInstanceKillOperationFunction taskInstanceKillOperationFunction =
new TaskInstanceKillOperationFunction(
+ workerManager,
+ messageRetryRunner);
+
+ try (MockedStatic<LogUtils> logUtilsMockedStatic =
Mockito.mockStatic(LogUtils.class)) {
+ int taskInstanceId = 111;
+ logUtilsMockedStatic
+ .when(() -> LogUtils
+ .setTaskInstanceLogFullPathMDC(any(String.class)))
+ .then(invocationOnMock -> null);
+ TaskInstanceKillResponse response =
taskInstanceKillOperationFunction.operate(
+ new TaskInstanceKillRequest(taskInstanceId));
+ Assertions.assertEquals("Cannot find WorkerTaskExecutor",
response.getMessage());
+ }
+
+ try (MockedStatic<LogUtils> logUtilsMockedStatic =
Mockito.mockStatic(LogUtils.class)) {
+ int processId = 12;
+ int taskInstanceId = 111;
+ Mockito.reset(taskExecutionContext);
+ given(taskExecutionContext.getProcessId()).willReturn(processId);
+ given(taskExecutionContext.getLogPath()).willReturn("logpath");
+ logUtilsMockedStatic
+ .when(() -> LogUtils
+ .setTaskInstanceLogFullPathMDC(any(String.class)))
+ .then(invocationOnMock -> null);
+ taskInstanceKillOperationFunction.operate(
+ new TaskInstanceKillRequest(taskInstanceId));
+ logUtilsMockedStatic.verify(times(1), () ->
LogUtils.removeTaskInstanceIdMDC());
+ logUtilsMockedStatic.verify(times(1), () ->
LogUtils.removeTaskInstanceLogFullPathMDC());
+ }
+
+ try (MockedStatic<LogUtils> logUtilsMockedStatic =
Mockito.mockStatic(LogUtils.class)) {
+ try (
+ MockedStatic<WorkerTaskExecutorHolder>
workerTaskExecutorHolderMockedStatic =
+
Mockito.mockStatic(WorkerTaskExecutorHolder.class)) {
+
given(workerTaskExecutor.getTaskExecutionContext()).willReturn(taskExecutionContext);
+ workerTaskExecutorHolderMockedStatic
+ .when(() ->
WorkerTaskExecutorHolder.get(any(Integer.class)))
+ .thenReturn(workerTaskExecutor);
+ int processId = 12;
+ int taskInstanceId = 111;
+ Mockito.reset(taskExecutionContext);
+
given(taskExecutionContext.getProcessId()).willReturn(processId);
+ given(taskExecutionContext.getLogPath()).willReturn("logpath");
+ logUtilsMockedStatic
+ .when(() -> LogUtils
+
.setTaskInstanceLogFullPathMDC(any(String.class)))
+ .then(invocationOnMock -> null);
+ when(workerTaskExecutor.getTask()).thenReturn(task);
+ //
given(workerManager.getTaskExecuteThread(taskInstanceId)).willReturn(workerTaskExecutor);
+ taskInstanceKillOperationFunction.operate(
+ new TaskInstanceKillRequest(taskInstanceId));
+ verify(task, times(1)).cancel();
+ }
+
+ }
+ }
+}