This is an automated email from the ASF dual-hosted git repository.

caishunfeng pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git


The following commit(s) were added to refs/heads/dev by this push:
     new 6c7e00c4aa [Bug-11101] fix task failover NPE (#11168)
6c7e00c4aa is described below

commit 6c7e00c4aa51172dd36d505885ebcd848eed4f3c
Author: caishunfeng <[email protected]>
AuthorDate: Wed Jul 27 19:21:21 2022 +0800

    [Bug-11101] fix task failover NPE (#11168)
---
 .../master/service/MasterFailoverService.java      | 24 ++++++++++++++--------
 1 file changed, 16 insertions(+), 8 deletions(-)

diff --git 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/service/MasterFailoverService.java
 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/service/MasterFailoverService.java
index f89c872784..856418b509 100644
--- 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/service/MasterFailoverService.java
+++ 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/service/MasterFailoverService.java
@@ -40,6 +40,7 @@ import 
org.apache.dolphinscheduler.server.master.runner.task.TaskProcessorFactor
 import org.apache.dolphinscheduler.server.utils.ProcessUtils;
 import org.apache.dolphinscheduler.service.process.ProcessService;
 import org.apache.dolphinscheduler.service.registry.RegistryClient;
+import org.apache.dolphinscheduler.spi.utils.StringUtils;
 
 import org.apache.commons.collections4.CollectionUtils;
 import org.apache.commons.lang3.time.StopWatch;
@@ -232,14 +233,7 @@ public class MasterFailoverService {
             // kill worker task, When the master failover and worker failover 
happened in the same time,
             // the task may not be failover if we don't set 
NEED_FAULT_TOLERANCE.
             // This can be improved if we can load all task when cache a 
workflowInstance in memory
-            try {
-                TaskKillRequestCommand killCommand = new 
TaskKillRequestCommand(taskInstance.getId());
-                Host workerHost = Host.of(taskInstance.getHost());
-                nettyExecutorManager.doExecute(workerHost, 
killCommand.convert2Command());
-                LOGGER.info("Failover task success, has killed the task in 
worker: {}", taskInstance.getHost());
-            } catch (ExecuteException e) {
-                LOGGER.error("Kill task failed", e);
-            }
+            sendKillCommandToWorker(taskInstance);
         } else {
             LOGGER.info("The failover taskInstance is a master task");
         }
@@ -249,6 +243,20 @@ public class MasterFailoverService {
         processService.saveTaskInstance(taskInstance);
     }
 
+    private void sendKillCommandToWorker(@NonNull TaskInstance taskInstance) {
+        if (StringUtils.isEmpty(taskInstance.getHost())) {
+            return;
+        }
+        try {
+            TaskKillRequestCommand killCommand = new 
TaskKillRequestCommand(taskInstance.getId());
+            Host workerHost = Host.of(taskInstance.getHost());
+            nettyExecutorManager.doExecute(workerHost, 
killCommand.convert2Command());
+            LOGGER.info("Failover task success, has killed the task in worker: 
{}", taskInstance.getHost());
+        } catch (ExecuteException e) {
+            LOGGER.error("Kill task failed", e);
+        }
+    }
+
     private boolean checkTaskInstanceNeedFailover(@NonNull TaskInstance 
taskInstance) {
         if (taskInstance.getState() != null && 
taskInstance.getState().typeIsFinished()) {
             // The task is already finished, so we don't need to failover this 
task instance

Reply via email to