This is an automated email from the ASF dual-hosted git repository.

xincheng pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git


The following commit(s) were added to refs/heads/dev by this push:
     new e3bd26322f [Improvement][UT] Improve Worker runner coverage (#15428)
e3bd26322f is described below

commit e3bd26322fcb75f620c5733c6f009823fb74dc3a
Author: John Huang <[email protected]>
AuthorDate: Tue Mar 5 18:32:32 2024 +0800

    [Improvement][UT] Improve Worker runner coverage (#15428)
    
    Co-authored-by: Rick Cheng <[email protected]>
    Co-authored-by: caishunfeng <[email protected]>
---
 .../runner/WorkerTaskExecutorFactoryBuilder.java   |  15 ++
 ...anceExecutionEventAckListenFunctionManager.java |   9 +
 ...tanceExecutionFinishEventAckListenFunction.java |   4 +
 ...nstanceExecutionInfoEventAckListenFunction.java |   4 +
 ...anceExecutionRunningEventAckListenFunction.java |   3 +
 .../TaskInstanceDispatchOperationFunction.java     |   9 +
 .../TaskInstanceKillOperationFunction.java         |   7 +
 .../TaskInstanceOperationFunctionManager.java      |  11 +
 .../UpdateWorkflowHostOperationFunction.java       |   4 +
 ...nstanceExecutionEventAckListenFunctionTest.java | 104 ++++++++
 .../TaskInstanceOperationFunctionTest.java         | 280 +++++++++++++++++++++
 11 files changed, 450 insertions(+)

diff --git 
a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerTaskExecutorFactoryBuilder.java
 
b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerTaskExecutorFactoryBuilder.java
index a9c2948482..599746818d 100644
--- 
a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerTaskExecutorFactoryBuilder.java
+++ 
b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerTaskExecutorFactoryBuilder.java
@@ -48,6 +48,21 @@ public class WorkerTaskExecutorFactoryBuilder {
     @Autowired
     private WorkerRegistryClient workerRegistryClient;
 
+    public WorkerTaskExecutorFactoryBuilder(
+                                            WorkerConfig workerConfig,
+                                            WorkerMessageSender 
workerMessageSender,
+                                            TaskPluginManager 
taskPluginManager,
+                                            WorkerTaskExecutorThreadPool 
workerManager,
+                                            StorageOperate storageOperate,
+                                            WorkerRegistryClient 
workerRegistryClient) {
+        this.workerConfig = workerConfig;
+        this.workerMessageSender = workerMessageSender;
+        this.taskPluginManager = taskPluginManager;
+        this.workerManager = workerManager;
+        this.storageOperate = storageOperate;
+        this.workerRegistryClient = workerRegistryClient;
+    }
+
     public WorkerTaskExecutorFactory<? extends WorkerTaskExecutor> 
createWorkerTaskExecutorFactory(TaskExecutionContext taskExecutionContext) {
         return new DefaultWorkerTaskExecutorFactory(taskExecutionContext,
                 workerConfig,
diff --git 
a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/listener/TaskInstanceExecutionEventAckListenFunctionManager.java
 
b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/listener/TaskInstanceExecutionEventAckListenFunctionManager.java
index b4423f4880..3214be8c89 100644
--- 
a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/listener/TaskInstanceExecutionEventAckListenFunctionManager.java
+++ 
b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/listener/TaskInstanceExecutionEventAckListenFunctionManager.java
@@ -35,6 +35,15 @@ public class 
TaskInstanceExecutionEventAckListenFunctionManager {
     @Autowired
     private TaskInstanceExecutionInfoEventAckListenFunction 
taskInstanceExecutionInfoEventAckListenFunction;
 
+    public TaskInstanceExecutionEventAckListenFunctionManager(
+                                                              
TaskInstanceExecutionRunningEventAckListenFunction 
taskInstanceExecutionRunningEventAckListenFunction,
+                                                              
TaskInstanceExecutionFinishEventAckListenFunction 
taskInstanceExecutionFinishEventAckListenFunction,
+                                                              
TaskInstanceExecutionInfoEventAckListenFunction 
taskInstanceExecutionInfoEventAckListenFunction) {
+        this.taskInstanceExecutionRunningEventAckListenFunction = 
taskInstanceExecutionRunningEventAckListenFunction;
+        this.taskInstanceExecutionFinishEventAckListenFunction = 
taskInstanceExecutionFinishEventAckListenFunction;
+        this.taskInstanceExecutionInfoEventAckListenFunction = 
taskInstanceExecutionInfoEventAckListenFunction;
+    }
+
     public TaskInstanceExecutionRunningEventAckListenFunction 
getTaskInstanceExecutionRunningEventAckListenFunction() {
         return taskInstanceExecutionRunningEventAckListenFunction;
     }
diff --git 
a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/listener/TaskInstanceExecutionFinishEventAckListenFunction.java
 
b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/listener/TaskInstanceExecutionFinishEventAckListenFunction.java
index ad7892bc7a..a358623519 100644
--- 
a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/listener/TaskInstanceExecutionFinishEventAckListenFunction.java
+++ 
b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/listener/TaskInstanceExecutionFinishEventAckListenFunction.java
@@ -36,6 +36,10 @@ public class 
TaskInstanceExecutionFinishEventAckListenFunction
     @Autowired
     private MessageRetryRunner messageRetryRunner;
 
+    public 
TaskInstanceExecutionFinishEventAckListenFunction(MessageRetryRunner 
messageRetryRunner) {
+        this.messageRetryRunner = messageRetryRunner;
+    }
+
     @Override
     public void 
handleTaskInstanceExecutionEventAck(TaskInstanceExecutionFinishEventAck 
taskInstanceExecutionFinishEventAck) {
         try {
diff --git 
a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/listener/TaskInstanceExecutionInfoEventAckListenFunction.java
 
b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/listener/TaskInstanceExecutionInfoEventAckListenFunction.java
index 971343103a..b3dcc9bf8a 100644
--- 
a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/listener/TaskInstanceExecutionInfoEventAckListenFunction.java
+++ 
b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/listener/TaskInstanceExecutionInfoEventAckListenFunction.java
@@ -37,6 +37,10 @@ public class TaskInstanceExecutionInfoEventAckListenFunction
     @Resource
     private MessageRetryRunner messageRetryRunner;
 
+    public TaskInstanceExecutionInfoEventAckListenFunction(MessageRetryRunner 
messageRetryRunner) {
+        this.messageRetryRunner = messageRetryRunner;
+    }
+
     @Override
     public void 
handleTaskInstanceExecutionEventAck(TaskInstanceExecutionInfoEventAck 
taskInstanceExecutionInfoEventAck) {
         try {
diff --git 
a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/listener/TaskInstanceExecutionRunningEventAckListenFunction.java
 
b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/listener/TaskInstanceExecutionRunningEventAckListenFunction.java
index 9d6de78e02..e17d72ad99 100644
--- 
a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/listener/TaskInstanceExecutionRunningEventAckListenFunction.java
+++ 
b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/listener/TaskInstanceExecutionRunningEventAckListenFunction.java
@@ -36,6 +36,9 @@ public class 
TaskInstanceExecutionRunningEventAckListenFunction
     @Autowired
     private MessageRetryRunner messageRetryRunner;
 
+    public 
TaskInstanceExecutionRunningEventAckListenFunction(MessageRetryRunner 
messageRetryRunner) {
+        this.messageRetryRunner = messageRetryRunner;
+    }
     @Override
     public void 
handleTaskInstanceExecutionEventAck(TaskInstanceExecutionRunningEventAck 
taskInstanceExecutionRunningEventAck) {
         try {
diff --git 
a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/operator/TaskInstanceDispatchOperationFunction.java
 
b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/operator/TaskInstanceDispatchOperationFunction.java
index e6d259412f..fc128a9a34 100644
--- 
a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/operator/TaskInstanceDispatchOperationFunction.java
+++ 
b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/operator/TaskInstanceDispatchOperationFunction.java
@@ -48,6 +48,15 @@ public class TaskInstanceDispatchOperationFunction
     @Autowired
     private WorkerTaskExecutorThreadPool workerTaskExecutorThreadPool;
 
+    public TaskInstanceDispatchOperationFunction(
+                                                 WorkerConfig workerConfig,
+                                                 
WorkerTaskExecutorFactoryBuilder workerTaskExecutorFactoryBuilder,
+                                                 WorkerTaskExecutorThreadPool 
workerTaskExecutorThreadPool) {
+        this.workerConfig = workerConfig;
+        this.workerTaskExecutorFactoryBuilder = 
workerTaskExecutorFactoryBuilder;
+        this.workerTaskExecutorThreadPool = workerTaskExecutorThreadPool;
+    }
+
     @Override
     public TaskInstanceDispatchResponse operate(TaskInstanceDispatchRequest 
taskInstanceDispatchRequest) {
         log.info("Receive TaskInstanceDispatchRequest: {}", 
taskInstanceDispatchRequest);
diff --git 
a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/operator/TaskInstanceKillOperationFunction.java
 
b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/operator/TaskInstanceKillOperationFunction.java
index 69e3994a90..d55765d23f 100644
--- 
a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/operator/TaskInstanceKillOperationFunction.java
+++ 
b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/operator/TaskInstanceKillOperationFunction.java
@@ -50,6 +50,13 @@ public class TaskInstanceKillOperationFunction
     @Autowired
     private MessageRetryRunner messageRetryRunner;
 
+    public TaskInstanceKillOperationFunction(
+                                             WorkerTaskExecutorThreadPool 
workerManager,
+                                             MessageRetryRunner 
messageRetryRunner) {
+        this.workerManager = workerManager;
+        this.messageRetryRunner = messageRetryRunner;
+    }
+
     @Override
     public TaskInstanceKillResponse operate(TaskInstanceKillRequest 
taskInstanceKillRequest) {
         log.info("Receive TaskInstanceKillRequest: {}", 
taskInstanceKillRequest);
diff --git 
a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/operator/TaskInstanceOperationFunctionManager.java
 
b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/operator/TaskInstanceOperationFunctionManager.java
index 99ae193b47..8014b88fd1 100644
--- 
a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/operator/TaskInstanceOperationFunctionManager.java
+++ 
b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/operator/TaskInstanceOperationFunctionManager.java
@@ -35,6 +35,17 @@ public class TaskInstanceOperationFunctionManager {
     @Autowired
     private TaskInstancePauseOperationFunction 
taskInstancePauseOperationFunction;
 
+    public TaskInstanceOperationFunctionManager(
+                                                
TaskInstanceKillOperationFunction taskInstanceKillOperationFunction,
+                                                
UpdateWorkflowHostOperationFunction updateWorkflowHostOperationFunction,
+                                                
TaskInstanceDispatchOperationFunction taskInstanceDispatchOperationFunction,
+                                                
TaskInstancePauseOperationFunction taskInstancePauseOperationFunction) {
+        this.taskInstanceKillOperationFunction = 
taskInstanceKillOperationFunction;
+        this.updateWorkflowHostOperationFunction = 
updateWorkflowHostOperationFunction;
+        this.taskInstanceDispatchOperationFunction = 
taskInstanceDispatchOperationFunction;
+        this.taskInstancePauseOperationFunction = 
taskInstancePauseOperationFunction;
+    }
+
     public TaskInstanceKillOperationFunction 
getTaskInstanceKillOperationFunction() {
         return taskInstanceKillOperationFunction;
     }
diff --git 
a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/operator/UpdateWorkflowHostOperationFunction.java
 
b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/operator/UpdateWorkflowHostOperationFunction.java
index 7485b9230f..c0ab345450 100644
--- 
a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/operator/UpdateWorkflowHostOperationFunction.java
+++ 
b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/operator/UpdateWorkflowHostOperationFunction.java
@@ -39,6 +39,10 @@ public class UpdateWorkflowHostOperationFunction
     @Autowired
     private MessageRetryRunner messageRetryRunner;
 
+    public UpdateWorkflowHostOperationFunction(MessageRetryRunner 
messageRetryRunner) {
+        this.messageRetryRunner = messageRetryRunner;
+    }
+
     @Override
     public UpdateWorkflowHostResponse operate(UpdateWorkflowHostRequest 
updateWorkflowHostRequest) {
         try {
diff --git 
a/dolphinscheduler-worker/src/test/java/org/apache/dolphinscheduler/server/worker/runner/listener/TaskInstanceExecutionEventAckListenFunctionTest.java
 
b/dolphinscheduler-worker/src/test/java/org/apache/dolphinscheduler/server/worker/runner/listener/TaskInstanceExecutionEventAckListenFunctionTest.java
new file mode 100644
index 0000000000..5044fba11e
--- /dev/null
+++ 
b/dolphinscheduler-worker/src/test/java/org/apache/dolphinscheduler/server/worker/runner/listener/TaskInstanceExecutionEventAckListenFunctionTest.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.server.worker.runner.listener;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.mockito.Mockito.times;
+
+import 
org.apache.dolphinscheduler.extract.master.transportor.ITaskInstanceExecutionEvent;
+import 
org.apache.dolphinscheduler.extract.worker.transportor.TaskInstanceExecutionFinishEventAck;
+import 
org.apache.dolphinscheduler.extract.worker.transportor.TaskInstanceExecutionInfoEventAck;
+import 
org.apache.dolphinscheduler.extract.worker.transportor.TaskInstanceExecutionRunningEventAck;
+import org.apache.dolphinscheduler.server.worker.message.MessageRetryRunner;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+import org.mockito.ArgumentCaptor;
+import org.mockito.Mockito;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class TaskInstanceExecutionEventAckListenFunctionTest {
+
+    private static final Logger log = 
LoggerFactory.getLogger(TaskInstanceExecutionEventAckListenFunctionTest.class);
+    private MessageRetryRunner messageRetryRunner = 
Mockito.mock(MessageRetryRunner.class);
+
+    @Test
+    public void testTaskInstanceExecutionEventAckListenFunctionManager() {
+        TaskInstanceExecutionFinishEventAckListenFunction 
taskInstanceExecutionFinishEventAckListenFunction =
+                new 
TaskInstanceExecutionFinishEventAckListenFunction(messageRetryRunner);
+        TaskInstanceExecutionInfoEventAckListenFunction 
taskInstanceExecutionInfoEventAckListenFunction =
+                new 
TaskInstanceExecutionInfoEventAckListenFunction(messageRetryRunner);
+        TaskInstanceExecutionRunningEventAckListenFunction 
taskInstanceExecutionRunningEventAckListenFunction =
+                new 
TaskInstanceExecutionRunningEventAckListenFunction(messageRetryRunner);
+        TaskInstanceExecutionEventAckListenFunctionManager 
taskInstanceExecutionEventAckListenFunctionManager =
+                new TaskInstanceExecutionEventAckListenFunctionManager(
+                        taskInstanceExecutionRunningEventAckListenFunction,
+                        taskInstanceExecutionFinishEventAckListenFunction,
+                        taskInstanceExecutionInfoEventAckListenFunction);
+        
Assertions.assertEquals(taskInstanceExecutionRunningEventAckListenFunction,
+                taskInstanceExecutionEventAckListenFunctionManager
+                        
.getTaskInstanceExecutionRunningEventAckListenFunction());
+        
Assertions.assertEquals(taskInstanceExecutionInfoEventAckListenFunction,
+                taskInstanceExecutionEventAckListenFunctionManager
+                        .getTaskInstanceExecutionInfoEventAckListenFunction());
+        
Assertions.assertEquals(taskInstanceExecutionFinishEventAckListenFunction,
+                taskInstanceExecutionEventAckListenFunctionManager
+                        
.getTaskInstanceExecutionFinishEventAckListenFunction());
+    }
+
+    @Test
+    public void testTaskInstanceExecutionEventAckListenFunctionDryRun() {
+        int taskInstanceId1 = 111;
+        int taskInstanceId2 = 222;
+        int taskInstanceId3 = 333;
+        TaskInstanceExecutionFinishEventAckListenFunction 
taskInstanceExecutionFinishEventAckListenFunction =
+                new 
TaskInstanceExecutionFinishEventAckListenFunction(messageRetryRunner);
+        
taskInstanceExecutionFinishEventAckListenFunction.handleTaskInstanceExecutionEventAck(
+                TaskInstanceExecutionFinishEventAck.success(taskInstanceId1));
+
+        ArgumentCaptor acInt = ArgumentCaptor.forClass(int.class);
+        ArgumentCaptor acEventType =
+                
ArgumentCaptor.forClass(ITaskInstanceExecutionEvent.TaskInstanceExecutionEventType.class);
+
+        Mockito.verify(messageRetryRunner, times(1)).removeRetryMessage(
+                (int) acInt.capture(),
+                (ITaskInstanceExecutionEvent.TaskInstanceExecutionEventType) 
acEventType.capture());
+
+        assertEquals(taskInstanceId1, acInt.getValue());
+
+        TaskInstanceExecutionInfoEventAckListenFunction 
taskInstanceExecutionInfoEventAckListenFunction =
+                new 
TaskInstanceExecutionInfoEventAckListenFunction(messageRetryRunner);
+        
taskInstanceExecutionInfoEventAckListenFunction.handleTaskInstanceExecutionEventAck(
+                TaskInstanceExecutionInfoEventAck.success(taskInstanceId2));
+
+        Mockito.verify(messageRetryRunner, times(2)).removeRetryMessage(
+                (int) acInt.capture(),
+                (ITaskInstanceExecutionEvent.TaskInstanceExecutionEventType) 
acEventType.capture());
+        assertEquals(taskInstanceId2, acInt.getValue());
+
+        TaskInstanceExecutionRunningEventAckListenFunction 
taskInstanceExecutionRunningEventAckListenFunction =
+                new 
TaskInstanceExecutionRunningEventAckListenFunction(messageRetryRunner);
+        
taskInstanceExecutionRunningEventAckListenFunction.handleTaskInstanceExecutionEventAck(
+                TaskInstanceExecutionRunningEventAck.success(taskInstanceId3));
+        Mockito.verify(messageRetryRunner, times(3)).removeRetryMessage(
+                (int) acInt.capture(),
+                (ITaskInstanceExecutionEvent.TaskInstanceExecutionEventType) 
acEventType.capture());
+        assertEquals(taskInstanceId3, acInt.getValue());
+    }
+}
diff --git 
a/dolphinscheduler-worker/src/test/java/org/apache/dolphinscheduler/server/worker/runner/operator/TaskInstanceOperationFunctionTest.java
 
b/dolphinscheduler-worker/src/test/java/org/apache/dolphinscheduler/server/worker/runner/operator/TaskInstanceOperationFunctionTest.java
new file mode 100644
index 0000000000..592340214f
--- /dev/null
+++ 
b/dolphinscheduler-worker/src/test/java/org/apache/dolphinscheduler/server/worker/runner/operator/TaskInstanceOperationFunctionTest.java
@@ -0,0 +1,280 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.server.worker.runner.operator;
+
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.BDDMockito.given;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import 
org.apache.dolphinscheduler.extract.worker.transportor.TaskInstanceDispatchRequest;
+import 
org.apache.dolphinscheduler.extract.worker.transportor.TaskInstanceDispatchResponse;
+import 
org.apache.dolphinscheduler.extract.worker.transportor.TaskInstanceKillRequest;
+import 
org.apache.dolphinscheduler.extract.worker.transportor.TaskInstanceKillResponse;
+import 
org.apache.dolphinscheduler.extract.worker.transportor.TaskInstancePauseRequest;
+import 
org.apache.dolphinscheduler.extract.worker.transportor.TaskInstancePauseResponse;
+import 
org.apache.dolphinscheduler.extract.worker.transportor.UpdateWorkflowHostRequest;
+import 
org.apache.dolphinscheduler.extract.worker.transportor.UpdateWorkflowHostResponse;
+import org.apache.dolphinscheduler.plugin.storage.api.StorageOperate;
+import org.apache.dolphinscheduler.plugin.task.api.AbstractTask;
+import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
+import org.apache.dolphinscheduler.plugin.task.api.TaskPluginManager;
+import org.apache.dolphinscheduler.plugin.task.api.utils.LogUtils;
+import org.apache.dolphinscheduler.server.worker.config.WorkerConfig;
+import org.apache.dolphinscheduler.server.worker.message.MessageRetryRunner;
+import org.apache.dolphinscheduler.server.worker.registry.WorkerRegistryClient;
+import org.apache.dolphinscheduler.server.worker.rpc.WorkerMessageSender;
+import org.apache.dolphinscheduler.server.worker.runner.WorkerTaskExecutor;
+import 
org.apache.dolphinscheduler.server.worker.runner.WorkerTaskExecutorFactoryBuilder;
+import 
org.apache.dolphinscheduler.server.worker.runner.WorkerTaskExecutorHolder;
+import 
org.apache.dolphinscheduler.server.worker.runner.WorkerTaskExecutorThreadPool;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+import org.mockito.MockedStatic;
+import org.mockito.Mockito;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class TaskInstanceOperationFunctionTest {
+
+    private static final Logger log = 
LoggerFactory.getLogger(TaskInstanceOperationFunctionTest.class);
+    private MessageRetryRunner messageRetryRunner = 
Mockito.mock(MessageRetryRunner.class);
+
+    private WorkerConfig workerConfig = Mockito.mock(WorkerConfig.class);
+
+    private TaskExecutionContext taskExecutionContext = 
Mockito.mock(TaskExecutionContext.class);
+
+    private WorkerTaskExecutorThreadPool workerTaskExecutorThreadPool =
+            Mockito.mock(WorkerTaskExecutorThreadPool.class);
+
+    private WorkerTaskExecutor workerTaskExecutor = 
Mockito.mock(WorkerTaskExecutor.class);
+
+    private AbstractTask task = Mockito.mock(AbstractTask.class);
+
+    private WorkerMessageSender workerMessageSender = 
Mockito.mock(WorkerMessageSender.class);
+
+    private TaskPluginManager taskPluginManager = 
Mockito.mock(TaskPluginManager.class);
+
+    private WorkerTaskExecutorThreadPool workerManager = 
Mockito.mock(WorkerTaskExecutorThreadPool.class);
+
+    private StorageOperate storageOperate = Mockito.mock(StorageOperate.class);
+
+    private WorkerRegistryClient workerRegistryClient = 
Mockito.mock(WorkerRegistryClient.class);
+
+    @Test
+    public void testTaskInstanceOperationFunctionManager() {
+        TaskInstanceKillOperationFunction taskInstanceKillOperationFunction = 
new TaskInstanceKillOperationFunction(
+                workerTaskExecutorThreadPool,
+                messageRetryRunner);
+
+        TaskInstancePauseOperationFunction taskInstancePauseOperationFunction =
+                new TaskInstancePauseOperationFunction();
+
+        UpdateWorkflowHostOperationFunction 
updateWorkflowHostOperationFunction =
+                new UpdateWorkflowHostOperationFunction(
+                        messageRetryRunner);
+
+        WorkerTaskExecutorFactoryBuilder workerTaskExecutorFactoryBuilder = 
new WorkerTaskExecutorFactoryBuilder(
+                workerConfig,
+                workerMessageSender,
+                taskPluginManager,
+                workerManager,
+                storageOperate,
+                workerRegistryClient);
+
+        TaskInstanceDispatchOperationFunction 
taskInstanceDispatchOperationFunction =
+                new TaskInstanceDispatchOperationFunction(
+                        workerConfig,
+                        workerTaskExecutorFactoryBuilder,
+                        workerTaskExecutorThreadPool);
+
+        TaskInstanceOperationFunctionManager 
taskInstanceOperationFunctionManager =
+                new TaskInstanceOperationFunctionManager(
+                        taskInstanceKillOperationFunction,
+                        updateWorkflowHostOperationFunction,
+                        taskInstanceDispatchOperationFunction,
+                        taskInstancePauseOperationFunction);
+
+        Assertions.assertEquals(taskInstanceKillOperationFunction,
+                
taskInstanceOperationFunctionManager.getTaskInstanceKillOperationFunction());
+        Assertions.assertEquals(taskInstancePauseOperationFunction,
+                
taskInstanceOperationFunctionManager.getTaskInstancePauseOperationFunction());
+        Assertions.assertEquals(updateWorkflowHostOperationFunction,
+                
taskInstanceOperationFunctionManager.getUpdateWorkflowHostOperationFunction());
+        Assertions.assertEquals(taskInstanceDispatchOperationFunction,
+                
taskInstanceOperationFunctionManager.getTaskInstanceDispatchOperationFunction());
+    }
+
+    @Test
+    public void testUpdateWorkflowHostOperationFunction() {
+        UpdateWorkflowHostOperationFunction 
updateWorkflowHostOperationFunction =
+                new UpdateWorkflowHostOperationFunction(
+                        messageRetryRunner);
+
+        try (MockedStatic<LogUtils> logUtilsMockedStatic = 
Mockito.mockStatic(LogUtils.class)) {
+            logUtilsMockedStatic
+                    .when(() -> LogUtils
+                            .setTaskInstanceIdMDC(any(Integer.class)))
+                    .then(invocationOnMock -> null);
+            UpdateWorkflowHostRequest request = new 
UpdateWorkflowHostRequest();
+            request.setTaskInstanceId(1);
+            request.setWorkflowHost("host");
+            UpdateWorkflowHostResponse taskInstanceDispatchResponse = 
updateWorkflowHostOperationFunction.operate(
+                    request);
+            Assertions.assertEquals(taskInstanceDispatchResponse.isSuccess(), 
false);
+        }
+
+        try (MockedStatic<LogUtils> logUtilsMockedStatic = 
Mockito.mockStatic(LogUtils.class)) {
+            logUtilsMockedStatic
+                    .when(() -> LogUtils
+                            .setTaskInstanceIdMDC(any(Integer.class)))
+                    .then(invocationOnMock -> null);
+
+            try (
+                    MockedStatic<WorkerTaskExecutorHolder> 
workerTaskExecutorHolderMockedStatic =
+                            
Mockito.mockStatic(WorkerTaskExecutorHolder.class)) {
+                
given(workerTaskExecutor.getTaskExecutionContext()).willReturn(taskExecutionContext);
+                workerTaskExecutorHolderMockedStatic
+                        .when(() -> 
WorkerTaskExecutorHolder.get(any(Integer.class)))
+                        .thenReturn(workerTaskExecutor);
+                int taskInstanceId = 111;
+                UpdateWorkflowHostRequest request = new 
UpdateWorkflowHostRequest();
+                request.setTaskInstanceId(taskInstanceId);
+                request.setWorkflowHost("host");
+
+                UpdateWorkflowHostResponse taskInstanceDispatchResponse = 
updateWorkflowHostOperationFunction.operate(
+                        request);
+                
Assertions.assertEquals(taskInstanceDispatchResponse.isSuccess(), true);
+            }
+        }
+    }
+
+    @Test
+    public void testTaskInstancePauseOperationFunction() {
+        TaskInstancePauseOperationFunction taskInstancePauseOperationFunction =
+                new TaskInstancePauseOperationFunction();
+
+        try (MockedStatic<LogUtils> logUtilsMockedStatic = 
Mockito.mockStatic(LogUtils.class)) {
+            logUtilsMockedStatic
+                    .when(() -> LogUtils
+                            .setTaskInstanceIdMDC(any(Integer.class)))
+                    .then(invocationOnMock -> null);
+            TaskInstancePauseRequest request = new TaskInstancePauseRequest();
+            request.setTaskInstanceId(1);
+            TaskInstancePauseResponse taskInstanceDispatchResponse = 
taskInstancePauseOperationFunction.operate(
+                    request);
+            Assertions.assertEquals(taskInstanceDispatchResponse.isSuccess(), 
true);
+        }
+    }
+
+    @Test
+    public void testTaskInstanceDispatchOperationFunction() {
+        WorkerTaskExecutorFactoryBuilder workerTaskExecutorFactoryBuilder = 
new WorkerTaskExecutorFactoryBuilder(
+                workerConfig,
+                workerMessageSender,
+                taskPluginManager,
+                workerManager,
+                storageOperate,
+                workerRegistryClient);
+
+        TaskInstanceDispatchOperationFunction 
taskInstanceDispatchOperationFunction =
+                new TaskInstanceDispatchOperationFunction(
+                        workerConfig,
+                        workerTaskExecutorFactoryBuilder,
+                        workerTaskExecutorThreadPool);
+
+        try (MockedStatic<LogUtils> logUtilsMockedStatic = 
Mockito.mockStatic(LogUtils.class)) {
+            logUtilsMockedStatic
+                    .when(() -> LogUtils
+                            
.getTaskInstanceLogFullPath(any(TaskExecutionContext.class)))
+                    .thenReturn("test");
+            TaskInstanceDispatchResponse taskInstanceDispatchResponse = 
taskInstanceDispatchOperationFunction.operate(
+                    new TaskInstanceDispatchRequest(taskExecutionContext));
+            
Assertions.assertEquals(taskInstanceDispatchResponse.isDispatchSuccess(), 
false);
+            logUtilsMockedStatic.verify(times(1), () -> 
LogUtils.removeWorkflowAndTaskInstanceIdMDC());
+
+            
given(workerTaskExecutorThreadPool.submitWorkerTaskExecutor(any())).willReturn(true);
+            taskInstanceDispatchResponse = 
taskInstanceDispatchOperationFunction.operate(
+                    new TaskInstanceDispatchRequest(taskExecutionContext));
+            
Assertions.assertEquals(taskInstanceDispatchResponse.isDispatchSuccess(), true);
+            logUtilsMockedStatic.verify(times(2), () -> 
LogUtils.removeWorkflowAndTaskInstanceIdMDC());
+        }
+    }
+
+    @Test
+    public void testTaskInstanceKillOperationFunction() {
+        TaskInstanceKillOperationFunction taskInstanceKillOperationFunction = 
new TaskInstanceKillOperationFunction(
+                workerManager,
+                messageRetryRunner);
+
+        try (MockedStatic<LogUtils> logUtilsMockedStatic = 
Mockito.mockStatic(LogUtils.class)) {
+            int taskInstanceId = 111;
+            logUtilsMockedStatic
+                    .when(() -> LogUtils
+                            .setTaskInstanceLogFullPathMDC(any(String.class)))
+                    .then(invocationOnMock -> null);
+            TaskInstanceKillResponse response = 
taskInstanceKillOperationFunction.operate(
+                    new TaskInstanceKillRequest(taskInstanceId));
+            Assertions.assertEquals("Cannot find WorkerTaskExecutor", 
response.getMessage());
+        }
+
+        try (MockedStatic<LogUtils> logUtilsMockedStatic = 
Mockito.mockStatic(LogUtils.class)) {
+            int processId = 12;
+            int taskInstanceId = 111;
+            Mockito.reset(taskExecutionContext);
+            given(taskExecutionContext.getProcessId()).willReturn(processId);
+            given(taskExecutionContext.getLogPath()).willReturn("logpath");
+            logUtilsMockedStatic
+                    .when(() -> LogUtils
+                            .setTaskInstanceLogFullPathMDC(any(String.class)))
+                    .then(invocationOnMock -> null);
+            taskInstanceKillOperationFunction.operate(
+                    new TaskInstanceKillRequest(taskInstanceId));
+            logUtilsMockedStatic.verify(times(1), () -> 
LogUtils.removeTaskInstanceIdMDC());
+            logUtilsMockedStatic.verify(times(1), () -> 
LogUtils.removeTaskInstanceLogFullPathMDC());
+        }
+
+        try (MockedStatic<LogUtils> logUtilsMockedStatic = 
Mockito.mockStatic(LogUtils.class)) {
+            try (
+                    MockedStatic<WorkerTaskExecutorHolder> 
workerTaskExecutorHolderMockedStatic =
+                            
Mockito.mockStatic(WorkerTaskExecutorHolder.class)) {
+                
given(workerTaskExecutor.getTaskExecutionContext()).willReturn(taskExecutionContext);
+                workerTaskExecutorHolderMockedStatic
+                        .when(() -> 
WorkerTaskExecutorHolder.get(any(Integer.class)))
+                        .thenReturn(workerTaskExecutor);
+                int processId = 12;
+                int taskInstanceId = 111;
+                Mockito.reset(taskExecutionContext);
+                
given(taskExecutionContext.getProcessId()).willReturn(processId);
+                given(taskExecutionContext.getLogPath()).willReturn("logpath");
+                logUtilsMockedStatic
+                        .when(() -> LogUtils
+                                
.setTaskInstanceLogFullPathMDC(any(String.class)))
+                        .then(invocationOnMock -> null);
+                when(workerTaskExecutor.getTask()).thenReturn(task);
+                // 
given(workerManager.getTaskExecuteThread(taskInstanceId)).willReturn(workerTaskExecutor);
+                taskInstanceKillOperationFunction.operate(
+                        new TaskInstanceKillRequest(taskInstanceId));
+                verify(task, times(1)).cancel();
+            }
+
+        }
+    }
+}


Reply via email to