This is an automated email from the ASF dual-hosted git repository.

wenjun pushed a commit to branch 3.0.0-prepare
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git

commit 17f0be5bfbbb17a910ff05e795a582e378d07f63
Author: Wenjun Ruan <[email protected]>
AuthorDate: Fri Jul 15 18:44:42 2022 +0800

    Kill task when we do master failover to avoid task lost (#10997)
    
    (cherry picked from commit 998e4d74ddf6fce03d65c351b75b91694c761b07)
---
 .../master/service/MasterFailoverService.java      | 21 ++++++++++++++++++++-
 .../server/master/service/FailoverServiceTest.java |  6 +++++-
 .../remote/command/TaskKillRequestCommand.java     | 22 +++++++---------------
 3 files changed, 32 insertions(+), 17 deletions(-)

diff --git 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/service/MasterFailoverService.java
 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/service/MasterFailoverService.java
index 61ba7c3fd6..93bdae2431 100644
--- 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/service/MasterFailoverService.java
+++ 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/service/MasterFailoverService.java
@@ -28,8 +28,12 @@ import 
org.apache.dolphinscheduler.dao.entity.ProcessInstance;
 import org.apache.dolphinscheduler.dao.entity.TaskInstance;
 import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
 import org.apache.dolphinscheduler.plugin.task.api.enums.ExecutionStatus;
+import org.apache.dolphinscheduler.remote.command.TaskKillRequestCommand;
+import org.apache.dolphinscheduler.remote.utils.Host;
 import org.apache.dolphinscheduler.server.builder.TaskExecutionContextBuilder;
 import org.apache.dolphinscheduler.server.master.config.MasterConfig;
+import 
org.apache.dolphinscheduler.server.master.dispatch.exceptions.ExecuteException;
+import 
org.apache.dolphinscheduler.server.master.dispatch.executor.NettyExecutorManager;
 import 
org.apache.dolphinscheduler.server.master.metrics.ProcessInstanceMetrics;
 import org.apache.dolphinscheduler.server.master.metrics.TaskMetrics;
 import 
org.apache.dolphinscheduler.server.master.runner.task.TaskProcessorFactory;
@@ -63,13 +67,17 @@ public class MasterFailoverService {
     private final ProcessService processService;
     private final String localAddress;
 
+    private final NettyExecutorManager nettyExecutorManager;
+
     public MasterFailoverService(@NonNull RegistryClient registryClient,
                                  @NonNull MasterConfig masterConfig,
-                                 @NonNull ProcessService processService) {
+                                 @NonNull ProcessService processService,
+                                 @NonNull NettyExecutorManager 
nettyExecutorManager) {
         this.registryClient = registryClient;
         this.masterConfig = masterConfig;
         this.processService = processService;
         this.localAddress = NetUtils.getAddr(masterConfig.getListenPort());
+        this.nettyExecutorManager = nettyExecutorManager;
 
     }
 
@@ -221,6 +229,17 @@ public class MasterFailoverService {
                 LOGGER.info("TaskInstance failover begin kill the task related 
yarn job");
                 ProcessUtils.killYarnJob(taskExecutionContext);
             }
+            // kill worker task, When the master failover and worker failover 
happened in the same time,
+            // the task may not be failover if we don't set 
NEED_FAULT_TOLERANCE.
+            // This can be improved if we can load all task when cache a 
workflowInstance in memory
+            try {
+                TaskKillRequestCommand killCommand = new 
TaskKillRequestCommand(taskInstance.getId());
+                Host workerHost = Host.of(taskInstance.getHost());
+                nettyExecutorManager.doExecute(workerHost, 
killCommand.convert2Command());
+                LOGGER.info("Failover task success, has killed the task in 
worker: {}", taskInstance.getHost());
+            } catch (ExecuteException e) {
+                LOGGER.error("Kill task failed", e);
+            }
         } else {
             LOGGER.info("The failover taskInstance is a master task");
         }
diff --git 
a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/service/FailoverServiceTest.java
 
b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/service/FailoverServiceTest.java
index 44e5a382f6..ab71cd75cd 100644
--- 
a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/service/FailoverServiceTest.java
+++ 
b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/service/FailoverServiceTest.java
@@ -33,6 +33,7 @@ import org.apache.dolphinscheduler.dao.entity.TaskInstance;
 import org.apache.dolphinscheduler.plugin.task.api.enums.ExecutionStatus;
 import 
org.apache.dolphinscheduler.server.master.cache.ProcessInstanceExecCacheManager;
 import org.apache.dolphinscheduler.server.master.config.MasterConfig;
+import 
org.apache.dolphinscheduler.server.master.dispatch.executor.NettyExecutorManager;
 import org.apache.dolphinscheduler.server.master.event.StateEvent;
 import 
org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteRunnable;
 import 
org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteThreadPool;
@@ -82,6 +83,9 @@ public class FailoverServiceTest {
     @Mock
     private ProcessInstanceExecCacheManager cacheManager;
 
+    @Mock
+    private NettyExecutorManager nettyExecutorManager;
+
     private static int masterPort = 5678;
     private static int workerPort = 1234;
 
@@ -100,7 +104,7 @@ public class FailoverServiceTest {
 
         given(masterConfig.getListenPort()).willReturn(masterPort);
         MasterFailoverService masterFailoverService =
-            new MasterFailoverService(registryClient, masterConfig, 
processService);
+            new MasterFailoverService(registryClient, masterConfig, 
processService, nettyExecutorManager);
         WorkerFailoverService workerFailoverService = new 
WorkerFailoverService(registryClient,
             masterConfig,
             processService,
diff --git 
a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/TaskKillRequestCommand.java
 
b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/TaskKillRequestCommand.java
index 155b31785e..7dc03d778c 100644
--- 
a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/TaskKillRequestCommand.java
+++ 
b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/TaskKillRequestCommand.java
@@ -21,9 +21,16 @@ import org.apache.dolphinscheduler.common.utils.JSONUtils;
 
 import java.io.Serializable;
 
+import lombok.AllArgsConstructor;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
 /**
  *  kill task request command
  */
+@Data
+@NoArgsConstructor
+@AllArgsConstructor
 public class TaskKillRequestCommand implements Serializable {
 
     /**
@@ -31,14 +38,6 @@ public class TaskKillRequestCommand implements Serializable {
      */
     private int taskInstanceId;
 
-    public int getTaskInstanceId() {
-        return taskInstanceId;
-    }
-
-    public void setTaskInstanceId(int taskInstanceId) {
-        this.taskInstanceId = taskInstanceId;
-    }
-
     /**
      *  package request command
      *
@@ -51,11 +50,4 @@ public class TaskKillRequestCommand implements Serializable {
         command.setBody(body);
         return command;
     }
-
-    @Override
-    public String toString() {
-        return "TaskKillRequestCommand{"
-                + "taskInstanceId=" + taskInstanceId
-                + '}';
-    }
 }

Reply via email to