This is an automated email from the ASF dual-hosted git repository.
caishunfeng pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git
The following commit(s) were added to refs/heads/dev by this push:
new 6c7e00c4aa [Bug-11101] fix task failover NPE (#11168)
6c7e00c4aa is described below
commit 6c7e00c4aa51172dd36d505885ebcd848eed4f3c
Author: caishunfeng <[email protected]>
AuthorDate: Wed Jul 27 19:21:21 2022 +0800
[Bug-11101] fix task failover NPE (#11168)
---
.../master/service/MasterFailoverService.java | 24 ++++++++++++++--------
1 file changed, 16 insertions(+), 8 deletions(-)
diff --git
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/service/MasterFailoverService.java
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/service/MasterFailoverService.java
index f89c872784..856418b509 100644
---
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/service/MasterFailoverService.java
+++
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/service/MasterFailoverService.java
@@ -40,6 +40,7 @@ import
org.apache.dolphinscheduler.server.master.runner.task.TaskProcessorFactor
import org.apache.dolphinscheduler.server.utils.ProcessUtils;
import org.apache.dolphinscheduler.service.process.ProcessService;
import org.apache.dolphinscheduler.service.registry.RegistryClient;
+import org.apache.dolphinscheduler.spi.utils.StringUtils;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.lang3.time.StopWatch;
@@ -232,14 +233,7 @@ public class MasterFailoverService {
// kill worker task, When the master failover and worker failover
happened in the same time,
// the task may not be failover if we don't set
NEED_FAULT_TOLERANCE.
// This can be improved if we can load all task when cache a
workflowInstance in memory
- try {
- TaskKillRequestCommand killCommand = new
TaskKillRequestCommand(taskInstance.getId());
- Host workerHost = Host.of(taskInstance.getHost());
- nettyExecutorManager.doExecute(workerHost,
killCommand.convert2Command());
- LOGGER.info("Failover task success, has killed the task in
worker: {}", taskInstance.getHost());
- } catch (ExecuteException e) {
- LOGGER.error("Kill task failed", e);
- }
+ sendKillCommandToWorker(taskInstance);
} else {
LOGGER.info("The failover taskInstance is a master task");
}
@@ -249,6 +243,20 @@ public class MasterFailoverService {
processService.saveTaskInstance(taskInstance);
}
+ private void sendKillCommandToWorker(@NonNull TaskInstance taskInstance) {
+ if (StringUtils.isEmpty(taskInstance.getHost())) {
+ return;
+ }
+ try {
+ TaskKillRequestCommand killCommand = new
TaskKillRequestCommand(taskInstance.getId());
+ Host workerHost = Host.of(taskInstance.getHost());
+ nettyExecutorManager.doExecute(workerHost,
killCommand.convert2Command());
+ LOGGER.info("Failover task success, has killed the task in worker:
{}", taskInstance.getHost());
+ } catch (ExecuteException e) {
+ LOGGER.error("Kill task failed", e);
+ }
+ }
+
private boolean checkTaskInstanceNeedFailover(@NonNull TaskInstance
taskInstance) {
if (taskInstance.getState() != null &&
taskInstance.getState().typeIsFinished()) {
// The task is already finished, so we don't need to failover this
task instance