This is an automated email from the ASF dual-hosted git repository.

zihaoxiang pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git


The following commit(s) were added to refs/heads/dev by this push:
     new 92edbb416d [Improvement-17664][Master] Regularly clean the the 
failover mark in registry (#17667)
92edbb416d is described below

commit 92edbb416da8d4dcfd8fe8ceea7c6838c4f55d01
Author: Wenjun Ruan <[email protected]>
AuthorDate: Sat Nov 22 16:55:29 2025 +0800

    [Improvement-17664][Master] Regularly clean the the failover mark in 
registry (#17667)
---
 .../server/master/engine/MasterCoordinator.java    | 25 +++++++++++++++---
 .../master/failover/FailoverCoordinator.java       | 30 ++++++++++++++++++++++
 .../master/failover/IFailoverCoordinator.java      |  6 +++++
 .../registry/api/RegistryClient.java               | 26 -------------------
 4 files changed, 58 insertions(+), 29 deletions(-)

diff --git 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/MasterCoordinator.java
 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/MasterCoordinator.java
index eaaa6187c9..da2f9c156c 100644
--- 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/MasterCoordinator.java
+++ 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/MasterCoordinator.java
@@ -24,6 +24,10 @@ import 
org.apache.dolphinscheduler.registry.api.enums.RegistryNodeType;
 import org.apache.dolphinscheduler.registry.api.ha.AbstractHAServer;
 import 
org.apache.dolphinscheduler.registry.api.ha.AbstractServerStatusChangeListener;
 import org.apache.dolphinscheduler.server.master.config.MasterConfig;
+import org.apache.dolphinscheduler.server.master.failover.IFailoverCoordinator;
+import org.apache.dolphinscheduler.server.master.utils.MasterThreadFactory;
+
+import java.util.concurrent.TimeUnit;
 
 import lombok.extern.slf4j.Slf4j;
 
@@ -38,15 +42,19 @@ public class MasterCoordinator extends AbstractHAServer {
 
     private final ITaskGroupCoordinator taskGroupCoordinator;
 
+    private final IFailoverCoordinator failoverCoordinator;
+
     public MasterCoordinator(final Registry registry,
                              final MasterConfig masterConfig,
-                             final ITaskGroupCoordinator taskGroupCoordinator) 
{
+                             final ITaskGroupCoordinator taskGroupCoordinator,
+                             final IFailoverCoordinator failoverCoordinator) {
         super(
                 registry,
                 RegistryNodeType.MASTER_COORDINATOR.getRegistryPath(),
                 masterConfig.getMasterAddress());
         this.taskGroupCoordinator = taskGroupCoordinator;
-        addServerStatusChangeListener(new 
MasterCoordinatorListener(taskGroupCoordinator));
+        this.failoverCoordinator = failoverCoordinator;
+        addServerStatusChangeListener(new 
MasterCoordinatorListener(taskGroupCoordinator, failoverCoordinator));
     }
 
     @Override
@@ -65,13 +73,24 @@ public class MasterCoordinator extends AbstractHAServer {
 
         private final ITaskGroupCoordinator taskGroupCoordinator;
 
-        public MasterCoordinatorListener(ITaskGroupCoordinator 
taskGroupCoordinator) {
+        private final IFailoverCoordinator failoverCoordinator;
+
+        public MasterCoordinatorListener(ITaskGroupCoordinator 
taskGroupCoordinator,
+                                         IFailoverCoordinator 
failoverCoordinator) {
             this.taskGroupCoordinator = checkNotNull(taskGroupCoordinator);
+            this.failoverCoordinator = checkNotNull(failoverCoordinator);
         }
 
         @Override
         public void changeToActive() {
             taskGroupCoordinator.start();
+            
MasterThreadFactory.getDefaultSchedulerThreadExecutor().scheduleWithFixedDelay(()
 -> {
+                try {
+                    failoverCoordinator.cleanHistoryFailoverFinishedMarks();
+                } catch (Exception e) {
+                    log.error("FailoverCoordinator 
cleanHistoryFailoverFinishedMarks failed", e);
+                }
+            }, 0, 1, TimeUnit.DAYS);
         }
 
         @Override
diff --git 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/failover/FailoverCoordinator.java
 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/failover/FailoverCoordinator.java
index 6935e41622..bbbc7671be 100644
--- 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/failover/FailoverCoordinator.java
+++ 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/failover/FailoverCoordinator.java
@@ -17,6 +17,7 @@
 
 package org.apache.dolphinscheduler.server.master.failover;
 
+import org.apache.dolphinscheduler.common.constants.Constants;
 import org.apache.dolphinscheduler.dao.entity.WorkflowInstance;
 import org.apache.dolphinscheduler.dao.repository.WorkflowInstanceDao;
 import org.apache.dolphinscheduler.plugin.task.api.enums.TaskExecutionStatus;
@@ -33,11 +34,14 @@ import 
org.apache.dolphinscheduler.server.master.engine.system.event.WorkerFailo
 import 
org.apache.dolphinscheduler.server.master.engine.task.runnable.ITaskExecutionRunnable;
 import 
org.apache.dolphinscheduler.server.master.engine.workflow.runnable.IWorkflowExecutionRunnable;
 
+import org.apache.commons.collections4.CollectionUtils;
 import org.apache.commons.lang3.time.StopWatch;
 
+import java.util.Collection;
 import java.util.Date;
 import java.util.List;
 import java.util.Optional;
+import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
 
 import lombok.extern.slf4j.Slf4j;
@@ -208,6 +212,32 @@ public class FailoverCoordinator implements 
IFailoverCoordinator {
                         workerServerMetadata.getProcessId()));
     }
 
+    @Override
+    public void cleanHistoryFailoverFinishedMarks() {
+        // Clean the history failover finished nodes
+        // which failover is before the current time minus 1 week
+        final Collection<String> failoverFinishedMarkSuffixes =
+                
registryClient.getChildrenKeys(RegistryNodeType.FAILOVER_FINISH_NODES.getRegistryPath());
+        if (CollectionUtils.isEmpty(failoverFinishedMarkSuffixes)) {
+            return;
+        }
+        for (final String failoverFinishedMarkSuffix : 
failoverFinishedMarkSuffixes) {
+            final String failoverFinishedMarkFullPath = 
RegistryNodeType.FAILOVER_FINISH_NODES.getRegistryPath()
+                    + Constants.SINGLE_SLASH + failoverFinishedMarkSuffix;
+            try {
+                final String failoverFinishTime = 
registryClient.get(failoverFinishedMarkFullPath);
+                if (System.currentTimeMillis() - 
Long.parseLong(failoverFinishTime) > TimeUnit.DAYS.toMillis(7)) {
+                    registryClient.remove(failoverFinishedMarkFullPath);
+                    log.info(
+                            "Clear the failover finished node: {} which 
failover time is before the current time minus 1 week",
+                            failoverFinishedMarkFullPath);
+                }
+            } catch (Exception ex) {
+                log.error("Failed to clean the failoverFinishedNode: {}", 
failoverFinishedMarkFullPath, ex);
+            }
+        }
+    }
+
     private void doWorkerFailover(final String workerAddress,
                                   final long taskFailoverDeadline,
                                   final String workerFailoverNodePath) {
diff --git 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/failover/IFailoverCoordinator.java
 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/failover/IFailoverCoordinator.java
index f578eb75f1..bec96792f6 100644
--- 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/failover/IFailoverCoordinator.java
+++ 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/failover/IFailoverCoordinator.java
@@ -47,4 +47,10 @@ public interface IFailoverCoordinator {
      * <p> This method is called when a worker server is removed from the 
cluster.
      */
     void failoverWorker(final WorkerFailoverEvent workerFailoverEvent);
+
+    /**
+     * Once the failover is finished, we will persist a mark in registry 
center.
+     * <p> This method is used to clean the finished marks which has expired 
to avoid the data accumulation.
+     */
+    void cleanHistoryFailoverFinishedMarks();
 }
diff --git 
a/dolphinscheduler-registry/dolphinscheduler-registry-api/src/main/java/org/apache/dolphinscheduler/registry/api/RegistryClient.java
 
b/dolphinscheduler-registry/dolphinscheduler-registry-api/src/main/java/org/apache/dolphinscheduler/registry/api/RegistryClient.java
index 86ab82ea31..92a8cd7be5 100644
--- 
a/dolphinscheduler-registry/dolphinscheduler-registry-api/src/main/java/org/apache/dolphinscheduler/registry/api/RegistryClient.java
+++ 
b/dolphinscheduler-registry/dolphinscheduler-registry-api/src/main/java/org/apache/dolphinscheduler/registry/api/RegistryClient.java
@@ -41,7 +41,6 @@ import java.util.List;
 import java.util.Map;
 import java.util.Optional;
 import java.util.Set;
-import java.util.concurrent.TimeUnit;
 
 import lombok.NonNull;
 import lombok.extern.slf4j.Slf4j;
@@ -71,7 +70,6 @@ public class RegistryClient {
         if 
(!registry.exists(RegistryNodeType.FAILOVER_FINISH_NODES.getRegistryPath())) {
             
registry.put(RegistryNodeType.FAILOVER_FINISH_NODES.getRegistryPath(), EMPTY, 
false);
         }
-        cleanHistoryFailoverFinishedNodes();
     }
 
     public boolean isConnected() {
@@ -233,28 +231,4 @@ public class RegistryClient {
         return getChildrenKeys(nodeType.getRegistryPath());
     }
 
-    private void cleanHistoryFailoverFinishedNodes() {
-        // Clean the history failover finished nodes
-        // which failover is before the current time minus 1 week
-        final Collection<String> failoverFinishedNodes =
-                
registry.children(RegistryNodeType.FAILOVER_FINISH_NODES.getRegistryPath());
-        if (CollectionUtils.isEmpty(failoverFinishedNodes)) {
-            return;
-        }
-        for (final String failoverFinishedNode : failoverFinishedNodes) {
-            final String failoverFinishedNodePath = 
RegistryNodeType.FAILOVER_FINISH_NODES.getRegistryPath()
-                    + Constants.SINGLE_SLASH + failoverFinishedNode;
-            try {
-                final String failoverFinishTime = 
registry.get(failoverFinishedNodePath);
-                if (System.currentTimeMillis() - 
Long.parseLong(failoverFinishTime) > TimeUnit.DAYS.toMillis(7)) {
-                    registry.delete(failoverFinishedNodePath);
-                    log.info(
-                            "Clear the failover finished node: {} which 
failover time is before the current time minus 1 week",
-                            failoverFinishedNodePath);
-                }
-            } catch (Exception ex) {
-                log.error("Failed to clean the failoverFinishedNode: {}", 
failoverFinishedNodePath, ex);
-            }
-        }
-    }
 }

Reply via email to