This is an automated email from the ASF dual-hosted git repository.
caishunfeng pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git
The following commit(s) were added to refs/heads/dev by this push:
new b6b04617a2 [Fix-16942] Fix gloval master failover might cause master
dead (#16953)
b6b04617a2 is described below
commit b6b04617a21afa2166e7076c6fb89458b7b1fc8f
Author: Wenjun Ruan <[email protected]>
AuthorDate: Fri Jan 10 16:58:04 2025 +0800
[Fix-16942] Fix gloval master failover might cause master dead (#16953)
Co-authored-by: xiangzihao <[email protected]>
---
.../server/master/cluster/BaseServerMetadata.java | 2 +
.../master/cluster/MasterServerMetadata.java | 1 +
.../master/cluster/WorkerServerMetadata.java | 1 +
.../engine/system/event/MasterFailoverEvent.java | 1 +
.../master/failover/FailoverCoordinator.java | 90 ++++++++++++----------
.../master/registry/MasterHeartBeatTask.java | 2 +-
.../registry/api/enums/RegistryNodeType.java | 1 +
.../registry/api/utils/RegistryUtils.java | 26 +++++--
.../server/worker/task/WorkerHeartBeatTask.java | 2 +-
9 files changed, 78 insertions(+), 48 deletions(-)
diff --git
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/cluster/BaseServerMetadata.java
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/cluster/BaseServerMetadata.java
index c9b89eee70..04c5954126 100644
---
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/cluster/BaseServerMetadata.java
+++
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/cluster/BaseServerMetadata.java
@@ -28,6 +28,8 @@ import lombok.experimental.SuperBuilder;
@SuperBuilder
public abstract class BaseServerMetadata implements IClusters.IServerMetadata {
+ private final int processId;
+
// The server startup time in milliseconds.
private final long serverStartupTime;
diff --git
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/cluster/MasterServerMetadata.java
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/cluster/MasterServerMetadata.java
index c49833d5da..f68e13d7e9 100644
---
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/cluster/MasterServerMetadata.java
+++
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/cluster/MasterServerMetadata.java
@@ -33,6 +33,7 @@ public class MasterServerMetadata extends BaseServerMetadata
implements Comparab
public static MasterServerMetadata parseFromHeartBeat(final
MasterHeartBeat masterHeartBeat) {
return MasterServerMetadata.builder()
+ .processId(masterHeartBeat.getProcessId())
.serverStartupTime(masterHeartBeat.getStartupTime())
.address(masterHeartBeat.getHost() + Constants.COLON +
masterHeartBeat.getPort())
.cpuUsage(masterHeartBeat.getCpuUsage())
diff --git
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/cluster/WorkerServerMetadata.java
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/cluster/WorkerServerMetadata.java
index d853c7d061..c4d196718b 100644
---
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/cluster/WorkerServerMetadata.java
+++
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/cluster/WorkerServerMetadata.java
@@ -41,6 +41,7 @@ public class WorkerServerMetadata extends BaseServerMetadata {
public static WorkerServerMetadata parseFromHeartBeat(final
WorkerHeartBeat workerHeartBeat) {
return WorkerServerMetadata.builder()
+ .processId(workerHeartBeat.getProcessId())
.serverStartupTime(workerHeartBeat.getStartupTime())
.address(workerHeartBeat.getHost() + Constants.COLON +
workerHeartBeat.getPort())
.workerGroup(workerHeartBeat.getWorkerGroup())
diff --git
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/system/event/MasterFailoverEvent.java
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/system/event/MasterFailoverEvent.java
index f2086a6569..8e18477675 100644
---
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/system/event/MasterFailoverEvent.java
+++
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/system/event/MasterFailoverEvent.java
@@ -29,6 +29,7 @@ import lombok.Getter;
public class MasterFailoverEvent extends AbstractSystemEvent {
private final MasterServerMetadata masterServerMetadata;
+ // The time when the event occurred. This might be different at different
nodes.
private final Date eventTime;
private MasterFailoverEvent(final MasterServerMetadata
masterServerMetadata,
diff --git
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/failover/FailoverCoordinator.java
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/failover/FailoverCoordinator.java
index 7bb24907ca..30a9ae773b 100644
---
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/failover/FailoverCoordinator.java
+++
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/failover/FailoverCoordinator.java
@@ -44,7 +44,6 @@ import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
-import org.springframework.transaction.PlatformTransactionManager;
@Slf4j
@Component
@@ -65,9 +64,6 @@ public class FailoverCoordinator implements
IFailoverCoordinator {
@Autowired
private WorkflowInstanceDao workflowInstanceDao;
- @Autowired
- private PlatformTransactionManager platformTransactionManager;
-
@Autowired
private WorkflowFailover workflowFailover;
@@ -81,13 +77,21 @@ public class FailoverCoordinator implements
IFailoverCoordinator {
final Optional<MasterServerMetadata> aliveMasterOptional =
clusterManager.getMasterClusters().getServer(masterAddress);
if (aliveMasterOptional.isPresent()) {
+ // If the master is alive, then we use the alive master's
startup time as the failover deadline.
final MasterServerMetadata aliveMasterServerMetadata =
aliveMasterOptional.get();
log.info("The master[{}] is alive, do global master failover
on it", aliveMasterServerMetadata);
- doMasterFailover(aliveMasterServerMetadata.getAddress(),
- aliveMasterServerMetadata.getServerStartupTime());
+ doMasterFailover(
+ masterAddress,
+ aliveMasterServerMetadata.getServerStartupTime(),
+
RegistryUtils.getFailoveredNodePathWhichStartupTimeIsUnknown(
+ masterAddress));
} else {
+ // If the master is not alive, then we use the event time as
the failover deadline.
log.info("The master[{}] is not alive, do global master
failover on it", masterAddress);
- doMasterFailover(masterAddress,
globalMasterFailoverEvent.getEventTime().getTime());
+ doMasterFailover(
+ masterAddress,
+ globalMasterFailoverEvent.getEventTime().getTime(),
+
RegistryUtils.getFailoveredNodePathWhichStartupTimeIsUnknown(masterAddress));
}
}
@@ -99,53 +103,55 @@ public class FailoverCoordinator implements
IFailoverCoordinator {
public void failoverMaster(final MasterFailoverEvent masterFailoverEvent) {
final MasterServerMetadata masterServerMetadata =
masterFailoverEvent.getMasterServerMetadata();
log.info("Master[{}] failover starting", masterServerMetadata);
+ final String masterAddress = masterServerMetadata.getAddress();
final Optional<MasterServerMetadata> aliveMasterOptional =
-
clusterManager.getMasterClusters().getServer(masterServerMetadata.getAddress());
+ clusterManager.getMasterClusters().getServer(masterAddress);
if (aliveMasterOptional.isPresent()) {
final MasterServerMetadata aliveMasterServerMetadata =
aliveMasterOptional.get();
if (aliveMasterServerMetadata.getServerStartupTime() ==
masterServerMetadata.getServerStartupTime()) {
log.info("The master[{}] is alive, maybe it reconnect to
registry skip failover", masterServerMetadata);
- } else {
- log.info("The master[{}] is alive, but the startup time is
different, will failover on {}",
- masterServerMetadata,
- aliveMasterServerMetadata);
- doMasterFailover(aliveMasterServerMetadata.getAddress(),
- aliveMasterServerMetadata.getServerStartupTime());
+ return;
}
- } else {
- log.info("The master[{}] is not alive, will failover",
masterServerMetadata);
- doMasterFailover(masterServerMetadata.getAddress(),
masterServerMetadata.getServerStartupTime());
}
+ doMasterFailover(
+ masterServerMetadata.getAddress(),
+ masterFailoverEvent.getEventTime().getTime(),
+ RegistryUtils.getFailoveredNodePath(
+ masterServerMetadata.getAddress(),
+ masterServerMetadata.getServerStartupTime(),
+ masterServerMetadata.getProcessId()));
}
/**
* Do master failover.
* <p> Will failover the workflow which is scheduled by the master and the
workflow's fire time is before the maxWorkflowFireTime.
*/
- private void doMasterFailover(final String masterAddress, final long
masterStartupTime) {
+ private void doMasterFailover(final String masterAddress,
+ final long workflowFailoverDeadline,
+ final String masterFailoverNodePath) {
// We use lock to avoid multiple master failover at the same time.
// Once the workflow has been failovered, then it's state will be
changed to FAILOVER
// Once the FAILOVER workflow has been refired, then it's host will be
changed to the new master and have a new
// start time.
// So if a master has been failovered multiple times, there is no
problem.
final StopWatch failoverTimeCost = StopWatch.createStarted();
-
registryClient.getLock(RegistryNodeType.MASTER_FAILOVER_LOCK.getRegistryPath());
+
registryClient.getLock(RegistryUtils.getMasterFailoverLockPath(masterAddress));
try {
- final String failoverFinishedNodePath =
- RegistryUtils.getFailoverFinishedNodePath(masterAddress,
masterStartupTime);
- if (registryClient.exists(failoverFinishedNodePath)) {
- log.error("The master[{}-{}] is exist at: {}, means it has
already been failovered, skip failover",
+ // If the master has already been failovered, then we skip the
failover.
+ if (registryClient.exists(masterFailoverNodePath)
+ &&
String.valueOf(workflowFailoverDeadline).equals(registryClient.get(masterFailoverNodePath)))
{
+ log.error("The master[{}/{}] is exist at: {}, means it has
already been failovered, skip failover",
masterAddress,
- masterStartupTime,
- failoverFinishedNodePath);
+ workflowFailoverDeadline,
+ masterFailoverNodePath);
return;
}
final List<WorkflowInstance> needFailoverWorkflows =
- getFailoverWorkflowsForMaster(masterAddress, new
Date(masterStartupTime));
+ getFailoverWorkflowsForMaster(masterAddress, new
Date(workflowFailoverDeadline));
needFailoverWorkflows.forEach(workflowFailover::failoverWorkflow);
+ registryClient.persist(masterFailoverNodePath,
String.valueOf(workflowFailoverDeadline));
failoverTimeCost.stop();
- registryClient.persist(failoverFinishedNodePath,
String.valueOf(System.currentTimeMillis()));
log.info("Master[{}] failover {} workflows finished, cost: {}/ms",
masterAddress,
needFailoverWorkflows.size(),
@@ -190,28 +196,30 @@ public class FailoverCoordinator implements
IFailoverCoordinator {
final WorkerServerMetadata aliveWorkerServerMetadata =
aliveWorkerOptional.get();
if (aliveWorkerServerMetadata.getServerStartupTime() ==
workerServerMetadata.getServerStartupTime()) {
log.info("The worker[{}] is alive, maybe it reconnect to
registry skip failover", workerServerMetadata);
- } else {
- log.info("The worker[{}] is alive, but the startup time is
different, will failover on {}",
- workerServerMetadata,
- aliveWorkerServerMetadata);
- doWorkerFailover(aliveWorkerServerMetadata.getAddress(),
- aliveWorkerServerMetadata.getServerStartupTime());
+ return;
}
- } else {
- log.info("The worker[{}] is not alive, will failover",
workerServerMetadata);
- doWorkerFailover(workerServerMetadata.getAddress(),
workerServerMetadata.getServerStartupTime());
}
+ doWorkerFailover(
+ workerServerMetadata.getAddress(),
+ System.currentTimeMillis(),
+ RegistryUtils.getFailoveredNodePath(
+ workerServerMetadata.getAddress(),
+ workerServerMetadata.getServerStartupTime(),
+ workerServerMetadata.getProcessId()));
}
- private void doWorkerFailover(final String workerAddress, final long
workerCrashTime) {
+ private void doWorkerFailover(final String workerAddress,
+ final long taskFailoverDeadline,
+ final String workerFailoverNodePath) {
final StopWatch failoverTimeCost = StopWatch.createStarted();
+ // we don't check the workerFailoverNodePath exist, since the worker
may be failovered multiple master
final List<ITaskExecutionRunnable> needFailoverTasks =
- getFailoverTaskForWorker(workerAddress, new
Date(workerCrashTime));
+ getFailoverTaskForWorker(workerAddress, new
Date(taskFailoverDeadline));
needFailoverTasks.forEach(taskFailover::failoverTask);
registryClient.persist(
- RegistryUtils.getFailoverFinishedNodePath(workerAddress,
workerCrashTime),
+ workerFailoverNodePath,
String.valueOf(System.currentTimeMillis()));
failoverTimeCost.stop();
log.info("Worker[{}] failover {} tasks finished, cost: {}/ms",
@@ -221,7 +229,7 @@ public class FailoverCoordinator implements
IFailoverCoordinator {
}
private List<ITaskExecutionRunnable> getFailoverTaskForWorker(final String
workerAddress,
- final Date
workerCrashTime) {
+ final Date
taskFailoverDeadline) {
return workflowRepository.getAll()
.stream()
.map(IWorkflowExecutionRunnable::getWorkflowExecutionGraph)
@@ -237,7 +245,7 @@ public class FailoverCoordinator implements
IFailoverCoordinator {
// The submitTime should not be null.
// This is a bad case unless someone manually set the
submitTime to null.
final Date submitTime =
taskExecutionRunnable.getTaskInstance().getSubmitTime();
- return submitTime != null &&
submitTime.before(workerCrashTime);
+ return submitTime != null &&
submitTime.before(taskFailoverDeadline);
})
.collect(Collectors.toList());
}
diff --git
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterHeartBeatTask.java
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterHeartBeatTask.java
index cf33c4824a..59174bac30 100644
---
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterHeartBeatTask.java
+++
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterHeartBeatTask.java
@@ -86,7 +86,7 @@ public class MasterHeartBeatTask extends
BaseHeartBeatTask<MasterHeartBeat> {
@Override
public void writeHeartBeat(final MasterHeartBeat masterHeartBeat) {
- final String failoverNodePath =
RegistryUtils.getFailoverFinishedNodePath(masterHeartBeat);
+ final String failoverNodePath =
RegistryUtils.getFailoveredNodePath(masterHeartBeat);
if (registryClient.exists(failoverNodePath)) {
log.warn("The master: {} is under {}, means it has been failover
will close myself",
masterHeartBeat,
diff --git
a/dolphinscheduler-registry/dolphinscheduler-registry-api/src/main/java/org/apache/dolphinscheduler/registry/api/enums/RegistryNodeType.java
b/dolphinscheduler-registry/dolphinscheduler-registry-api/src/main/java/org/apache/dolphinscheduler/registry/api/enums/RegistryNodeType.java
index 75deb8a02d..c026231562 100644
---
a/dolphinscheduler-registry/dolphinscheduler-registry-api/src/main/java/org/apache/dolphinscheduler/registry/api/enums/RegistryNodeType.java
+++
b/dolphinscheduler-registry/dolphinscheduler-registry-api/src/main/java/org/apache/dolphinscheduler/registry/api/enums/RegistryNodeType.java
@@ -26,6 +26,7 @@ public enum RegistryNodeType {
FAILOVER_FINISH_NODES("FailoverFinishNodes",
"/nodes/failover-finish-nodes"),
+ GLOBAL_MASTER_FAILOVER_LOCK("GlobalMasterFailoverLock",
"/lock/global-master-failover"),
MASTER("Master", "/nodes/master"),
MASTER_FAILOVER_LOCK("MasterFailoverLock", "/lock/master-failover"),
MASTER_COORDINATOR("MasterCoordinator", "/nodes/master-coordinator"),
diff --git
a/dolphinscheduler-registry/dolphinscheduler-registry-api/src/main/java/org/apache/dolphinscheduler/registry/api/utils/RegistryUtils.java
b/dolphinscheduler-registry/dolphinscheduler-registry-api/src/main/java/org/apache/dolphinscheduler/registry/api/utils/RegistryUtils.java
index 25ef976ed5..7edcce3c84 100644
---
a/dolphinscheduler-registry/dolphinscheduler-registry-api/src/main/java/org/apache/dolphinscheduler/registry/api/utils/RegistryUtils.java
+++
b/dolphinscheduler-registry/dolphinscheduler-registry-api/src/main/java/org/apache/dolphinscheduler/registry/api/utils/RegistryUtils.java
@@ -20,14 +20,30 @@ package org.apache.dolphinscheduler.registry.api.utils;
import org.apache.dolphinscheduler.common.model.BaseHeartBeat;
import org.apache.dolphinscheduler.registry.api.enums.RegistryNodeType;
+import com.google.common.base.Preconditions;
+
public class RegistryUtils {
- public static String getFailoverFinishedNodePath(final BaseHeartBeat
baseHeartBeat) {
- return getFailoverFinishedNodePath(baseHeartBeat.getHost() + ":" +
baseHeartBeat.getPort(),
- baseHeartBeat.getStartupTime());
+ public static String getMasterFailoverLockPath(final String masterAddress)
{
+ Preconditions.checkNotNull(masterAddress, "master address cannot be
null");
+ return RegistryNodeType.MASTER_FAILOVER_LOCK.getRegistryPath() + "/" +
masterAddress;
+ }
+
+ public static String getFailoveredNodePathWhichStartupTimeIsUnknown(final
String serverAddress) {
+ return RegistryNodeType.FAILOVER_FINISH_NODES.getRegistryPath() + "/"
+ serverAddress + "-" + "unknown" + "-"
+ + "unknown";
+ }
+
+ public static String getFailoveredNodePath(final BaseHeartBeat
baseHeartBeat) {
+ return getFailoveredNodePath(
+ baseHeartBeat.getHost() + ":" + baseHeartBeat.getPort(),
+ baseHeartBeat.getStartupTime(),
+ baseHeartBeat.getProcessId());
}
- public static String getFailoverFinishedNodePath(final String
masterAddress, final long masterStartupTime) {
- return RegistryNodeType.FAILOVER_FINISH_NODES.getRegistryPath() + "/"
+ masterAddress + "-" + masterStartupTime;
+ public static String getFailoveredNodePath(final String serverAddress,
final long serverStartupTime,
+ final int processId) {
+ return RegistryNodeType.FAILOVER_FINISH_NODES.getRegistryPath() + "/"
+ serverAddress + "-" + serverStartupTime
+ + "-" + processId;
}
}
diff --git
a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/task/WorkerHeartBeatTask.java
b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/task/WorkerHeartBeatTask.java
index 739cf95e22..01ed2f541e 100644
---
a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/task/WorkerHeartBeatTask.java
+++
b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/task/WorkerHeartBeatTask.java
@@ -85,7 +85,7 @@ public class WorkerHeartBeatTask extends
BaseHeartBeatTask<WorkerHeartBeat> {
@Override
public void writeHeartBeat(final WorkerHeartBeat workerHeartBeat) {
- final String failoverNodePath =
RegistryUtils.getFailoverFinishedNodePath(workerHeartBeat);
+ final String failoverNodePath =
RegistryUtils.getFailoveredNodePath(workerHeartBeat);
if (registryClient.exists(failoverNodePath)) {
log.warn("The worker: {} is under {}, means it has been failover
will close myself",
workerHeartBeat,