This is an automated email from the ASF dual-hosted git repository.
caishunfeng pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git
The following commit(s) were added to refs/heads/dev by this push:
new 998e4d74dd Kill task when we do master failover to avoid task lost
(#10997)
998e4d74dd is described below
commit 998e4d74ddf6fce03d65c351b75b91694c761b07
Author: Wenjun Ruan <[email protected]>
AuthorDate: Fri Jul 15 18:44:42 2022 +0800
Kill task when we do master failover to avoid task lost (#10997)
---
.../master/service/MasterFailoverService.java | 21 ++++++++++++++++++++-
.../server/master/service/FailoverServiceTest.java | 6 +++++-
.../remote/command/TaskKillRequestCommand.java | 22 +++++++---------------
3 files changed, 32 insertions(+), 17 deletions(-)
diff --git
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/service/MasterFailoverService.java
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/service/MasterFailoverService.java
index 61ba7c3fd6..93bdae2431 100644
---
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/service/MasterFailoverService.java
+++
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/service/MasterFailoverService.java
@@ -28,8 +28,12 @@ import
org.apache.dolphinscheduler.dao.entity.ProcessInstance;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
import org.apache.dolphinscheduler.plugin.task.api.enums.ExecutionStatus;
+import org.apache.dolphinscheduler.remote.command.TaskKillRequestCommand;
+import org.apache.dolphinscheduler.remote.utils.Host;
import org.apache.dolphinscheduler.server.builder.TaskExecutionContextBuilder;
import org.apache.dolphinscheduler.server.master.config.MasterConfig;
+import
org.apache.dolphinscheduler.server.master.dispatch.exceptions.ExecuteException;
+import
org.apache.dolphinscheduler.server.master.dispatch.executor.NettyExecutorManager;
import
org.apache.dolphinscheduler.server.master.metrics.ProcessInstanceMetrics;
import org.apache.dolphinscheduler.server.master.metrics.TaskMetrics;
import
org.apache.dolphinscheduler.server.master.runner.task.TaskProcessorFactory;
@@ -63,13 +67,17 @@ public class MasterFailoverService {
private final ProcessService processService;
private final String localAddress;
+ private final NettyExecutorManager nettyExecutorManager;
+
public MasterFailoverService(@NonNull RegistryClient registryClient,
@NonNull MasterConfig masterConfig,
- @NonNull ProcessService processService) {
+ @NonNull ProcessService processService,
+ @NonNull NettyExecutorManager
nettyExecutorManager) {
this.registryClient = registryClient;
this.masterConfig = masterConfig;
this.processService = processService;
this.localAddress = NetUtils.getAddr(masterConfig.getListenPort());
+ this.nettyExecutorManager = nettyExecutorManager;
}
@@ -221,6 +229,17 @@ public class MasterFailoverService {
LOGGER.info("TaskInstance failover begin kill the task related
yarn job");
ProcessUtils.killYarnJob(taskExecutionContext);
}
+ // kill worker task, When the master failover and worker failover
happened in the same time,
+ // the task may not be failover if we don't set
NEED_FAULT_TOLERANCE.
+ // This can be improved if we can load all task when cache a
workflowInstance in memory
+ try {
+ TaskKillRequestCommand killCommand = new
TaskKillRequestCommand(taskInstance.getId());
+ Host workerHost = Host.of(taskInstance.getHost());
+ nettyExecutorManager.doExecute(workerHost,
killCommand.convert2Command());
+ LOGGER.info("Failover task success, has killed the task in
worker: {}", taskInstance.getHost());
+ } catch (ExecuteException e) {
+ LOGGER.error("Kill task failed", e);
+ }
} else {
LOGGER.info("The failover taskInstance is a master task");
}
diff --git
a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/service/FailoverServiceTest.java
b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/service/FailoverServiceTest.java
index 44e5a382f6..ab71cd75cd 100644
---
a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/service/FailoverServiceTest.java
+++
b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/service/FailoverServiceTest.java
@@ -33,6 +33,7 @@ import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.plugin.task.api.enums.ExecutionStatus;
import
org.apache.dolphinscheduler.server.master.cache.ProcessInstanceExecCacheManager;
import org.apache.dolphinscheduler.server.master.config.MasterConfig;
+import
org.apache.dolphinscheduler.server.master.dispatch.executor.NettyExecutorManager;
import org.apache.dolphinscheduler.server.master.event.StateEvent;
import
org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteRunnable;
import
org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteThreadPool;
@@ -82,6 +83,9 @@ public class FailoverServiceTest {
@Mock
private ProcessInstanceExecCacheManager cacheManager;
+ @Mock
+ private NettyExecutorManager nettyExecutorManager;
+
private static int masterPort = 5678;
private static int workerPort = 1234;
@@ -100,7 +104,7 @@ public class FailoverServiceTest {
given(masterConfig.getListenPort()).willReturn(masterPort);
MasterFailoverService masterFailoverService =
- new MasterFailoverService(registryClient, masterConfig,
processService);
+ new MasterFailoverService(registryClient, masterConfig,
processService, nettyExecutorManager);
WorkerFailoverService workerFailoverService = new
WorkerFailoverService(registryClient,
masterConfig,
processService,
diff --git
a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/TaskKillRequestCommand.java
b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/TaskKillRequestCommand.java
index 155b31785e..7dc03d778c 100644
---
a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/TaskKillRequestCommand.java
+++
b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/TaskKillRequestCommand.java
@@ -21,9 +21,16 @@ import org.apache.dolphinscheduler.common.utils.JSONUtils;
import java.io.Serializable;
+import lombok.AllArgsConstructor;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
/**
* kill task request command
*/
+@Data
+@NoArgsConstructor
+@AllArgsConstructor
public class TaskKillRequestCommand implements Serializable {
/**
@@ -31,14 +38,6 @@ public class TaskKillRequestCommand implements Serializable {
*/
private int taskInstanceId;
- public int getTaskInstanceId() {
- return taskInstanceId;
- }
-
- public void setTaskInstanceId(int taskInstanceId) {
- this.taskInstanceId = taskInstanceId;
- }
-
/**
* package request command
*
@@ -51,11 +50,4 @@ public class TaskKillRequestCommand implements Serializable {
command.setBody(body);
return command;
}
-
- @Override
- public String toString() {
- return "TaskKillRequestCommand{"
- + "taskInstanceId=" + taskInstanceId
- + '}';
- }
}