This is an automated email from the ASF dual-hosted git repository.

caishunfeng pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git


The following commit(s) were added to refs/heads/dev by this push:
     new b6b04617a2 [Fix-16942] Fix gloval master failover might cause master 
dead (#16953)
b6b04617a2 is described below

commit b6b04617a21afa2166e7076c6fb89458b7b1fc8f
Author: Wenjun Ruan <[email protected]>
AuthorDate: Fri Jan 10 16:58:04 2025 +0800

    [Fix-16942] Fix gloval master failover might cause master dead (#16953)
    
    Co-authored-by: xiangzihao <[email protected]>
---
 .../server/master/cluster/BaseServerMetadata.java  |  2 +
 .../master/cluster/MasterServerMetadata.java       |  1 +
 .../master/cluster/WorkerServerMetadata.java       |  1 +
 .../engine/system/event/MasterFailoverEvent.java   |  1 +
 .../master/failover/FailoverCoordinator.java       | 90 ++++++++++++----------
 .../master/registry/MasterHeartBeatTask.java       |  2 +-
 .../registry/api/enums/RegistryNodeType.java       |  1 +
 .../registry/api/utils/RegistryUtils.java          | 26 +++++--
 .../server/worker/task/WorkerHeartBeatTask.java    |  2 +-
 9 files changed, 78 insertions(+), 48 deletions(-)

diff --git 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/cluster/BaseServerMetadata.java
 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/cluster/BaseServerMetadata.java
index c9b89eee70..04c5954126 100644
--- 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/cluster/BaseServerMetadata.java
+++ 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/cluster/BaseServerMetadata.java
@@ -28,6 +28,8 @@ import lombok.experimental.SuperBuilder;
 @SuperBuilder
 public abstract class BaseServerMetadata implements IClusters.IServerMetadata {
 
+    private final int processId;
+
     // The server startup time in milliseconds.
     private final long serverStartupTime;
 
diff --git 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/cluster/MasterServerMetadata.java
 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/cluster/MasterServerMetadata.java
index c49833d5da..f68e13d7e9 100644
--- 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/cluster/MasterServerMetadata.java
+++ 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/cluster/MasterServerMetadata.java
@@ -33,6 +33,7 @@ public class MasterServerMetadata extends BaseServerMetadata 
implements Comparab
 
     public static MasterServerMetadata parseFromHeartBeat(final 
MasterHeartBeat masterHeartBeat) {
         return MasterServerMetadata.builder()
+                .processId(masterHeartBeat.getProcessId())
                 .serverStartupTime(masterHeartBeat.getStartupTime())
                 .address(masterHeartBeat.getHost() + Constants.COLON + 
masterHeartBeat.getPort())
                 .cpuUsage(masterHeartBeat.getCpuUsage())
diff --git 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/cluster/WorkerServerMetadata.java
 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/cluster/WorkerServerMetadata.java
index d853c7d061..c4d196718b 100644
--- 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/cluster/WorkerServerMetadata.java
+++ 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/cluster/WorkerServerMetadata.java
@@ -41,6 +41,7 @@ public class WorkerServerMetadata extends BaseServerMetadata {
 
     public static WorkerServerMetadata parseFromHeartBeat(final 
WorkerHeartBeat workerHeartBeat) {
         return WorkerServerMetadata.builder()
+                .processId(workerHeartBeat.getProcessId())
                 .serverStartupTime(workerHeartBeat.getStartupTime())
                 .address(workerHeartBeat.getHost() + Constants.COLON + 
workerHeartBeat.getPort())
                 .workerGroup(workerHeartBeat.getWorkerGroup())
diff --git 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/system/event/MasterFailoverEvent.java
 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/system/event/MasterFailoverEvent.java
index f2086a6569..8e18477675 100644
--- 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/system/event/MasterFailoverEvent.java
+++ 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/system/event/MasterFailoverEvent.java
@@ -29,6 +29,7 @@ import lombok.Getter;
 public class MasterFailoverEvent extends AbstractSystemEvent {
 
     private final MasterServerMetadata masterServerMetadata;
+    // The time when the event occurred. This might be different at different 
nodes.
     private final Date eventTime;
 
     private MasterFailoverEvent(final MasterServerMetadata 
masterServerMetadata,
diff --git 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/failover/FailoverCoordinator.java
 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/failover/FailoverCoordinator.java
index 7bb24907ca..30a9ae773b 100644
--- 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/failover/FailoverCoordinator.java
+++ 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/failover/FailoverCoordinator.java
@@ -44,7 +44,6 @@ import lombok.extern.slf4j.Slf4j;
 
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Component;
-import org.springframework.transaction.PlatformTransactionManager;
 
 @Slf4j
 @Component
@@ -65,9 +64,6 @@ public class FailoverCoordinator implements 
IFailoverCoordinator {
     @Autowired
     private WorkflowInstanceDao workflowInstanceDao;
 
-    @Autowired
-    private PlatformTransactionManager platformTransactionManager;
-
     @Autowired
     private WorkflowFailover workflowFailover;
 
@@ -81,13 +77,21 @@ public class FailoverCoordinator implements 
IFailoverCoordinator {
             final Optional<MasterServerMetadata> aliveMasterOptional =
                     
clusterManager.getMasterClusters().getServer(masterAddress);
             if (aliveMasterOptional.isPresent()) {
+                // If the master is alive, then we use the alive master's 
startup time as the failover deadline.
                 final MasterServerMetadata aliveMasterServerMetadata = 
aliveMasterOptional.get();
                 log.info("The master[{}] is alive, do global master failover 
on it", aliveMasterServerMetadata);
-                doMasterFailover(aliveMasterServerMetadata.getAddress(),
-                        aliveMasterServerMetadata.getServerStartupTime());
+                doMasterFailover(
+                        masterAddress,
+                        aliveMasterServerMetadata.getServerStartupTime(),
+                        
RegistryUtils.getFailoveredNodePathWhichStartupTimeIsUnknown(
+                                masterAddress));
             } else {
+                // If the master is not alive, then we use the event time as 
the failover deadline.
                 log.info("The master[{}] is not alive, do global master 
failover on it", masterAddress);
-                doMasterFailover(masterAddress, 
globalMasterFailoverEvent.getEventTime().getTime());
+                doMasterFailover(
+                        masterAddress,
+                        globalMasterFailoverEvent.getEventTime().getTime(),
+                        
RegistryUtils.getFailoveredNodePathWhichStartupTimeIsUnknown(masterAddress));
             }
         }
 
@@ -99,53 +103,55 @@ public class FailoverCoordinator implements 
IFailoverCoordinator {
     public void failoverMaster(final MasterFailoverEvent masterFailoverEvent) {
         final MasterServerMetadata masterServerMetadata = 
masterFailoverEvent.getMasterServerMetadata();
         log.info("Master[{}] failover starting", masterServerMetadata);
+        final String masterAddress = masterServerMetadata.getAddress();
 
         final Optional<MasterServerMetadata> aliveMasterOptional =
-                
clusterManager.getMasterClusters().getServer(masterServerMetadata.getAddress());
+                clusterManager.getMasterClusters().getServer(masterAddress);
         if (aliveMasterOptional.isPresent()) {
             final MasterServerMetadata aliveMasterServerMetadata = 
aliveMasterOptional.get();
             if (aliveMasterServerMetadata.getServerStartupTime() == 
masterServerMetadata.getServerStartupTime()) {
                 log.info("The master[{}] is alive, maybe it reconnect to 
registry skip failover", masterServerMetadata);
-            } else {
-                log.info("The master[{}] is alive, but the startup time is 
different, will failover on {}",
-                        masterServerMetadata,
-                        aliveMasterServerMetadata);
-                doMasterFailover(aliveMasterServerMetadata.getAddress(),
-                        aliveMasterServerMetadata.getServerStartupTime());
+                return;
             }
-        } else {
-            log.info("The master[{}] is not alive, will failover", 
masterServerMetadata);
-            doMasterFailover(masterServerMetadata.getAddress(), 
masterServerMetadata.getServerStartupTime());
         }
+        doMasterFailover(
+                masterServerMetadata.getAddress(),
+                masterFailoverEvent.getEventTime().getTime(),
+                RegistryUtils.getFailoveredNodePath(
+                        masterServerMetadata.getAddress(),
+                        masterServerMetadata.getServerStartupTime(),
+                        masterServerMetadata.getProcessId()));
     }
 
     /**
      * Do master failover.
      * <p> Will failover the workflow which is scheduled by the master and the 
workflow's fire time is before the maxWorkflowFireTime.
      */
-    private void doMasterFailover(final String masterAddress, final long 
masterStartupTime) {
+    private void doMasterFailover(final String masterAddress,
+                                  final long workflowFailoverDeadline,
+                                  final String masterFailoverNodePath) {
         // We use lock to avoid multiple master failover at the same time.
         // Once the workflow has been failovered, then it's state will be 
changed to FAILOVER
         // Once the FAILOVER workflow has been refired, then it's host will be 
changed to the new master and have a new
         // start time.
         // So if a master has been failovered multiple times, there is no 
problem.
         final StopWatch failoverTimeCost = StopWatch.createStarted();
-        
registryClient.getLock(RegistryNodeType.MASTER_FAILOVER_LOCK.getRegistryPath());
+        
registryClient.getLock(RegistryUtils.getMasterFailoverLockPath(masterAddress));
         try {
-            final String failoverFinishedNodePath =
-                    RegistryUtils.getFailoverFinishedNodePath(masterAddress, 
masterStartupTime);
-            if (registryClient.exists(failoverFinishedNodePath)) {
-                log.error("The master[{}-{}] is exist at: {}, means it has 
already been failovered, skip failover",
+            // If the master has already been failovered, then we skip the 
failover.
+            if (registryClient.exists(masterFailoverNodePath)
+                    && 
String.valueOf(workflowFailoverDeadline).equals(registryClient.get(masterFailoverNodePath)))
 {
+                log.error("The master[{}/{}] is exist at: {}, means it has 
already been failovered, skip failover",
                         masterAddress,
-                        masterStartupTime,
-                        failoverFinishedNodePath);
+                        workflowFailoverDeadline,
+                        masterFailoverNodePath);
                 return;
             }
             final List<WorkflowInstance> needFailoverWorkflows =
-                    getFailoverWorkflowsForMaster(masterAddress, new 
Date(masterStartupTime));
+                    getFailoverWorkflowsForMaster(masterAddress, new 
Date(workflowFailoverDeadline));
             needFailoverWorkflows.forEach(workflowFailover::failoverWorkflow);
+            registryClient.persist(masterFailoverNodePath, 
String.valueOf(workflowFailoverDeadline));
             failoverTimeCost.stop();
-            registryClient.persist(failoverFinishedNodePath, 
String.valueOf(System.currentTimeMillis()));
             log.info("Master[{}] failover {} workflows finished, cost: {}/ms",
                     masterAddress,
                     needFailoverWorkflows.size(),
@@ -190,28 +196,30 @@ public class FailoverCoordinator implements 
IFailoverCoordinator {
             final WorkerServerMetadata aliveWorkerServerMetadata = 
aliveWorkerOptional.get();
             if (aliveWorkerServerMetadata.getServerStartupTime() == 
workerServerMetadata.getServerStartupTime()) {
                 log.info("The worker[{}] is alive, maybe it reconnect to 
registry skip failover", workerServerMetadata);
-            } else {
-                log.info("The worker[{}] is alive, but the startup time is 
different, will failover on {}",
-                        workerServerMetadata,
-                        aliveWorkerServerMetadata);
-                doWorkerFailover(aliveWorkerServerMetadata.getAddress(),
-                        aliveWorkerServerMetadata.getServerStartupTime());
+                return;
             }
-        } else {
-            log.info("The worker[{}] is not alive, will failover", 
workerServerMetadata);
-            doWorkerFailover(workerServerMetadata.getAddress(), 
workerServerMetadata.getServerStartupTime());
         }
+        doWorkerFailover(
+                workerServerMetadata.getAddress(),
+                System.currentTimeMillis(),
+                RegistryUtils.getFailoveredNodePath(
+                        workerServerMetadata.getAddress(),
+                        workerServerMetadata.getServerStartupTime(),
+                        workerServerMetadata.getProcessId()));
     }
 
-    private void doWorkerFailover(final String workerAddress, final long 
workerCrashTime) {
+    private void doWorkerFailover(final String workerAddress,
+                                  final long taskFailoverDeadline,
+                                  final String workerFailoverNodePath) {
         final StopWatch failoverTimeCost = StopWatch.createStarted();
+        // we don't check the workerFailoverNodePath exist, since the worker 
may be failovered multiple master
 
         final List<ITaskExecutionRunnable> needFailoverTasks =
-                getFailoverTaskForWorker(workerAddress, new 
Date(workerCrashTime));
+                getFailoverTaskForWorker(workerAddress, new 
Date(taskFailoverDeadline));
         needFailoverTasks.forEach(taskFailover::failoverTask);
 
         registryClient.persist(
-                RegistryUtils.getFailoverFinishedNodePath(workerAddress, 
workerCrashTime),
+                workerFailoverNodePath,
                 String.valueOf(System.currentTimeMillis()));
         failoverTimeCost.stop();
         log.info("Worker[{}] failover {} tasks finished, cost: {}/ms",
@@ -221,7 +229,7 @@ public class FailoverCoordinator implements 
IFailoverCoordinator {
     }
 
     private List<ITaskExecutionRunnable> getFailoverTaskForWorker(final String 
workerAddress,
-                                                                  final Date 
workerCrashTime) {
+                                                                  final Date 
taskFailoverDeadline) {
         return workflowRepository.getAll()
                 .stream()
                 .map(IWorkflowExecutionRunnable::getWorkflowExecutionGraph)
@@ -237,7 +245,7 @@ public class FailoverCoordinator implements 
IFailoverCoordinator {
                     // The submitTime should not be null.
                     // This is a bad case unless someone manually set the 
submitTime to null.
                     final Date submitTime = 
taskExecutionRunnable.getTaskInstance().getSubmitTime();
-                    return submitTime != null && 
submitTime.before(workerCrashTime);
+                    return submitTime != null && 
submitTime.before(taskFailoverDeadline);
                 })
                 .collect(Collectors.toList());
     }
diff --git 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterHeartBeatTask.java
 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterHeartBeatTask.java
index cf33c4824a..59174bac30 100644
--- 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterHeartBeatTask.java
+++ 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterHeartBeatTask.java
@@ -86,7 +86,7 @@ public class MasterHeartBeatTask extends 
BaseHeartBeatTask<MasterHeartBeat> {
 
     @Override
     public void writeHeartBeat(final MasterHeartBeat masterHeartBeat) {
-        final String failoverNodePath = 
RegistryUtils.getFailoverFinishedNodePath(masterHeartBeat);
+        final String failoverNodePath = 
RegistryUtils.getFailoveredNodePath(masterHeartBeat);
         if (registryClient.exists(failoverNodePath)) {
             log.warn("The master: {} is under {}, means it has been failover 
will close myself",
                     masterHeartBeat,
diff --git 
a/dolphinscheduler-registry/dolphinscheduler-registry-api/src/main/java/org/apache/dolphinscheduler/registry/api/enums/RegistryNodeType.java
 
b/dolphinscheduler-registry/dolphinscheduler-registry-api/src/main/java/org/apache/dolphinscheduler/registry/api/enums/RegistryNodeType.java
index 75deb8a02d..c026231562 100644
--- 
a/dolphinscheduler-registry/dolphinscheduler-registry-api/src/main/java/org/apache/dolphinscheduler/registry/api/enums/RegistryNodeType.java
+++ 
b/dolphinscheduler-registry/dolphinscheduler-registry-api/src/main/java/org/apache/dolphinscheduler/registry/api/enums/RegistryNodeType.java
@@ -26,6 +26,7 @@ public enum RegistryNodeType {
 
     FAILOVER_FINISH_NODES("FailoverFinishNodes", 
"/nodes/failover-finish-nodes"),
 
+    GLOBAL_MASTER_FAILOVER_LOCK("GlobalMasterFailoverLock", 
"/lock/global-master-failover"),
     MASTER("Master", "/nodes/master"),
     MASTER_FAILOVER_LOCK("MasterFailoverLock", "/lock/master-failover"),
     MASTER_COORDINATOR("MasterCoordinator", "/nodes/master-coordinator"),
diff --git 
a/dolphinscheduler-registry/dolphinscheduler-registry-api/src/main/java/org/apache/dolphinscheduler/registry/api/utils/RegistryUtils.java
 
b/dolphinscheduler-registry/dolphinscheduler-registry-api/src/main/java/org/apache/dolphinscheduler/registry/api/utils/RegistryUtils.java
index 25ef976ed5..7edcce3c84 100644
--- 
a/dolphinscheduler-registry/dolphinscheduler-registry-api/src/main/java/org/apache/dolphinscheduler/registry/api/utils/RegistryUtils.java
+++ 
b/dolphinscheduler-registry/dolphinscheduler-registry-api/src/main/java/org/apache/dolphinscheduler/registry/api/utils/RegistryUtils.java
@@ -20,14 +20,30 @@ package org.apache.dolphinscheduler.registry.api.utils;
 import org.apache.dolphinscheduler.common.model.BaseHeartBeat;
 import org.apache.dolphinscheduler.registry.api.enums.RegistryNodeType;
 
+import com.google.common.base.Preconditions;
+
 public class RegistryUtils {
 
-    public static String getFailoverFinishedNodePath(final BaseHeartBeat 
baseHeartBeat) {
-        return getFailoverFinishedNodePath(baseHeartBeat.getHost() + ":" + 
baseHeartBeat.getPort(),
-                baseHeartBeat.getStartupTime());
+    public static String getMasterFailoverLockPath(final String masterAddress) 
{
+        Preconditions.checkNotNull(masterAddress, "master address cannot be 
null");
+        return RegistryNodeType.MASTER_FAILOVER_LOCK.getRegistryPath() + "/" + 
masterAddress;
+    }
+
+    public static String getFailoveredNodePathWhichStartupTimeIsUnknown(final 
String serverAddress) {
+        return RegistryNodeType.FAILOVER_FINISH_NODES.getRegistryPath() + "/" 
+ serverAddress + "-" + "unknown" + "-"
+                + "unknown";
+    }
+
+    public static String getFailoveredNodePath(final BaseHeartBeat 
baseHeartBeat) {
+        return getFailoveredNodePath(
+                baseHeartBeat.getHost() + ":" + baseHeartBeat.getPort(),
+                baseHeartBeat.getStartupTime(),
+                baseHeartBeat.getProcessId());
     }
 
-    public static String getFailoverFinishedNodePath(final String 
masterAddress, final long masterStartupTime) {
-        return RegistryNodeType.FAILOVER_FINISH_NODES.getRegistryPath() + "/" 
+ masterAddress + "-" + masterStartupTime;
+    public static String getFailoveredNodePath(final String serverAddress, 
final long serverStartupTime,
+                                               final int processId) {
+        return RegistryNodeType.FAILOVER_FINISH_NODES.getRegistryPath() + "/" 
+ serverAddress + "-" + serverStartupTime
+                + "-" + processId;
     }
 }
diff --git 
a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/task/WorkerHeartBeatTask.java
 
b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/task/WorkerHeartBeatTask.java
index 739cf95e22..01ed2f541e 100644
--- 
a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/task/WorkerHeartBeatTask.java
+++ 
b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/task/WorkerHeartBeatTask.java
@@ -85,7 +85,7 @@ public class WorkerHeartBeatTask extends 
BaseHeartBeatTask<WorkerHeartBeat> {
 
     @Override
     public void writeHeartBeat(final WorkerHeartBeat workerHeartBeat) {
-        final String failoverNodePath = 
RegistryUtils.getFailoverFinishedNodePath(workerHeartBeat);
+        final String failoverNodePath = 
RegistryUtils.getFailoveredNodePath(workerHeartBeat);
         if (registryClient.exists(failoverNodePath)) {
             log.warn("The worker: {} is under {}, means it has been failover 
will close myself",
                     workerHeartBeat,

Reply via email to