This is an automated email from the ASF dual-hosted git repository.
jinyleechina pushed a commit to branch 2.0.8-prepare
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git
The following commit(s) were added to refs/heads/2.0.8-prepare by this push:
new 0f8689feca [Bug-12482] [Master] check for dependency task instance
need failover is error (#12513)
0f8689feca is described below
commit 0f8689feca40d741f12b9b943934b60eb3822196
Author: Molin Wang <[email protected]>
AuthorDate: Thu Oct 27 14:59:04 2022 +0800
[Bug-12482] [Master] check for dependency task instance need failover is
error (#12513)
---
.../master/registry/MasterRegistryClient.java | 27 ++++++++++++----------
1 file changed, 15 insertions(+), 12 deletions(-)
diff --git
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryClient.java
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryClient.java
index 26e2585dd9..e670818f19 100644
---
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryClient.java
+++
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryClient.java
@@ -253,12 +253,12 @@ public class MasterRegistryClient {
}
/**
- * task needs failover if task start before worker starts
+ * task needs failover if task start before server starts
*
* @param taskInstance task instance
* @return true if task instance need fail over
*/
- private boolean checkTaskInstanceNeedFailover(List<Server> workerServers,
TaskInstance taskInstance) {
+ private boolean checkTaskInstanceNeedFailover(List<Server> servers,
TaskInstance taskInstance) {
// first submit: host is null
// dispatch succeed: host is not null && submit_time is null
@@ -274,8 +274,8 @@ public class MasterRegistryClient {
if (taskInstance.getSubmitTime() == null) {
return false;
}
- //if task start after worker starts, there is no need to failover the
task.
- if (checkTaskAfterWorkerStart(workerServers, taskInstance)) {
+ //if task start after server starts, there is no need to failover the
task.
+ if (checkTaskAfterServerStart(servers, taskInstance)) {
taskNeedFailover = false;
}
@@ -307,21 +307,21 @@ public class MasterRegistryClient {
}
/**
- * check task start after the worker server starts.
+ * check task start after the server starts.
*
* @param taskInstance task instance
- * @return true if task instance start time after worker server start date
+ * @return true if task instance start time after server start date
*/
- private boolean checkTaskAfterWorkerStart(List<Server> workerServers,
TaskInstance taskInstance) {
+ private boolean checkTaskAfterServerStart(List<Server> servers,
TaskInstance taskInstance) {
if (StringUtils.isEmpty(taskInstance.getHost())) {
return false;
}
Date taskTime = taskInstance.getStartTime() == null ?
taskInstance.getSubmitTime() : taskInstance.getStartTime();
- Date workerServerStartDate = getServerStartupTime(workerServers,
taskInstance.getHost());
- if (workerServerStartDate != null) {
- return taskTime.after(workerServerStartDate);
+ Date serverStartDate = getServerStartupTime(servers,
taskInstance.getHost());
+ if (serverStartDate != null) {
+ return taskTime.after(serverStartDate);
}
return false;
}
@@ -417,7 +417,10 @@ public class MasterRegistryClient {
}
Date serverStartupTime = getServerStartupTime(NodeType.MASTER,
masterHost);
- List<Server> workerServers =
registryClient.getServerList(NodeType.WORKER);
+
+ // servers need to contains master hosts and worker hosts, otherwise
the logic task will failover fail.
+ List<Server> servers = registryClient.getServerList(NodeType.WORKER);
+ servers.addAll(registryClient.getServerList(NodeType.MASTER));
long startTime = System.currentTimeMillis();
List<ProcessInstance> needFailoverProcessInstanceList =
processService.queryNeedFailoverProcessInstances(masterHost);
@@ -436,7 +439,7 @@ public class MasterRegistryClient {
if (taskInstance.getState().typeIsFinished()) {
continue;
}
- if (!checkTaskInstanceNeedFailover(workerServers,
taskInstance)) {
+ if (!checkTaskInstanceNeedFailover(servers, taskInstance)) {
continue;
}
logger.info("failover task instance id: {}, process instance
id: {}", taskInstance.getId(), taskInstance.getProcessInstanceId());