This is an automated email from the ASF dual-hosted git repository.
zihaoxiang pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git
The following commit(s) were added to refs/heads/dev by this push:
new 92edbb416d [Improvement-17664][Master] Regularly clean the the
failover mark in registry (#17667)
92edbb416d is described below
commit 92edbb416da8d4dcfd8fe8ceea7c6838c4f55d01
Author: Wenjun Ruan <[email protected]>
AuthorDate: Sat Nov 22 16:55:29 2025 +0800
[Improvement-17664][Master] Regularly clean the the failover mark in
registry (#17667)
---
.../server/master/engine/MasterCoordinator.java | 25 +++++++++++++++---
.../master/failover/FailoverCoordinator.java | 30 ++++++++++++++++++++++
.../master/failover/IFailoverCoordinator.java | 6 +++++
.../registry/api/RegistryClient.java | 26 -------------------
4 files changed, 58 insertions(+), 29 deletions(-)
diff --git
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/MasterCoordinator.java
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/MasterCoordinator.java
index eaaa6187c9..da2f9c156c 100644
---
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/MasterCoordinator.java
+++
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/MasterCoordinator.java
@@ -24,6 +24,10 @@ import
org.apache.dolphinscheduler.registry.api.enums.RegistryNodeType;
import org.apache.dolphinscheduler.registry.api.ha.AbstractHAServer;
import
org.apache.dolphinscheduler.registry.api.ha.AbstractServerStatusChangeListener;
import org.apache.dolphinscheduler.server.master.config.MasterConfig;
+import org.apache.dolphinscheduler.server.master.failover.IFailoverCoordinator;
+import org.apache.dolphinscheduler.server.master.utils.MasterThreadFactory;
+
+import java.util.concurrent.TimeUnit;
import lombok.extern.slf4j.Slf4j;
@@ -38,15 +42,19 @@ public class MasterCoordinator extends AbstractHAServer {
private final ITaskGroupCoordinator taskGroupCoordinator;
+ private final IFailoverCoordinator failoverCoordinator;
+
public MasterCoordinator(final Registry registry,
final MasterConfig masterConfig,
- final ITaskGroupCoordinator taskGroupCoordinator)
{
+ final ITaskGroupCoordinator taskGroupCoordinator,
+ final IFailoverCoordinator failoverCoordinator) {
super(
registry,
RegistryNodeType.MASTER_COORDINATOR.getRegistryPath(),
masterConfig.getMasterAddress());
this.taskGroupCoordinator = taskGroupCoordinator;
- addServerStatusChangeListener(new
MasterCoordinatorListener(taskGroupCoordinator));
+ this.failoverCoordinator = failoverCoordinator;
+ addServerStatusChangeListener(new
MasterCoordinatorListener(taskGroupCoordinator, failoverCoordinator));
}
@Override
@@ -65,13 +73,24 @@ public class MasterCoordinator extends AbstractHAServer {
private final ITaskGroupCoordinator taskGroupCoordinator;
- public MasterCoordinatorListener(ITaskGroupCoordinator
taskGroupCoordinator) {
+ private final IFailoverCoordinator failoverCoordinator;
+
+ public MasterCoordinatorListener(ITaskGroupCoordinator
taskGroupCoordinator,
+ IFailoverCoordinator
failoverCoordinator) {
this.taskGroupCoordinator = checkNotNull(taskGroupCoordinator);
+ this.failoverCoordinator = checkNotNull(failoverCoordinator);
}
@Override
public void changeToActive() {
taskGroupCoordinator.start();
+
MasterThreadFactory.getDefaultSchedulerThreadExecutor().scheduleWithFixedDelay(()
-> {
+ try {
+ failoverCoordinator.cleanHistoryFailoverFinishedMarks();
+ } catch (Exception e) {
+ log.error("FailoverCoordinator
cleanHistoryFailoverFinishedMarks failed", e);
+ }
+ }, 0, 1, TimeUnit.DAYS);
}
@Override
diff --git
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/failover/FailoverCoordinator.java
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/failover/FailoverCoordinator.java
index 6935e41622..bbbc7671be 100644
---
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/failover/FailoverCoordinator.java
+++
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/failover/FailoverCoordinator.java
@@ -17,6 +17,7 @@
package org.apache.dolphinscheduler.server.master.failover;
+import org.apache.dolphinscheduler.common.constants.Constants;
import org.apache.dolphinscheduler.dao.entity.WorkflowInstance;
import org.apache.dolphinscheduler.dao.repository.WorkflowInstanceDao;
import org.apache.dolphinscheduler.plugin.task.api.enums.TaskExecutionStatus;
@@ -33,11 +34,14 @@ import
org.apache.dolphinscheduler.server.master.engine.system.event.WorkerFailo
import
org.apache.dolphinscheduler.server.master.engine.task.runnable.ITaskExecutionRunnable;
import
org.apache.dolphinscheduler.server.master.engine.workflow.runnable.IWorkflowExecutionRunnable;
+import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.lang3.time.StopWatch;
+import java.util.Collection;
import java.util.Date;
import java.util.List;
import java.util.Optional;
+import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import lombok.extern.slf4j.Slf4j;
@@ -208,6 +212,32 @@ public class FailoverCoordinator implements
IFailoverCoordinator {
workerServerMetadata.getProcessId()));
}
+ @Override
+ public void cleanHistoryFailoverFinishedMarks() {
+ // Clean the history failover finished nodes
+ // which failover is before the current time minus 1 week
+ final Collection<String> failoverFinishedMarkSuffixes =
+
registryClient.getChildrenKeys(RegistryNodeType.FAILOVER_FINISH_NODES.getRegistryPath());
+ if (CollectionUtils.isEmpty(failoverFinishedMarkSuffixes)) {
+ return;
+ }
+ for (final String failoverFinishedMarkSuffix :
failoverFinishedMarkSuffixes) {
+ final String failoverFinishedMarkFullPath =
RegistryNodeType.FAILOVER_FINISH_NODES.getRegistryPath()
+ + Constants.SINGLE_SLASH + failoverFinishedMarkSuffix;
+ try {
+ final String failoverFinishTime =
registryClient.get(failoverFinishedMarkFullPath);
+ if (System.currentTimeMillis() -
Long.parseLong(failoverFinishTime) > TimeUnit.DAYS.toMillis(7)) {
+ registryClient.remove(failoverFinishedMarkFullPath);
+ log.info(
+ "Clear the failover finished node: {} which
failover time is before the current time minus 1 week",
+ failoverFinishedMarkFullPath);
+ }
+ } catch (Exception ex) {
+ log.error("Failed to clean the failoverFinishedNode: {}",
failoverFinishedMarkFullPath, ex);
+ }
+ }
+ }
+
private void doWorkerFailover(final String workerAddress,
final long taskFailoverDeadline,
final String workerFailoverNodePath) {
diff --git
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/failover/IFailoverCoordinator.java
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/failover/IFailoverCoordinator.java
index f578eb75f1..bec96792f6 100644
---
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/failover/IFailoverCoordinator.java
+++
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/failover/IFailoverCoordinator.java
@@ -47,4 +47,10 @@ public interface IFailoverCoordinator {
* <p> This method is called when a worker server is removed from the
cluster.
*/
void failoverWorker(final WorkerFailoverEvent workerFailoverEvent);
+
+ /**
+ * Once the failover is finished, we will persist a mark in registry
center.
+ * <p> This method is used to clean the finished marks which has expired
to avoid the data accumulation.
+ */
+ void cleanHistoryFailoverFinishedMarks();
}
diff --git
a/dolphinscheduler-registry/dolphinscheduler-registry-api/src/main/java/org/apache/dolphinscheduler/registry/api/RegistryClient.java
b/dolphinscheduler-registry/dolphinscheduler-registry-api/src/main/java/org/apache/dolphinscheduler/registry/api/RegistryClient.java
index 86ab82ea31..92a8cd7be5 100644
---
a/dolphinscheduler-registry/dolphinscheduler-registry-api/src/main/java/org/apache/dolphinscheduler/registry/api/RegistryClient.java
+++
b/dolphinscheduler-registry/dolphinscheduler-registry-api/src/main/java/org/apache/dolphinscheduler/registry/api/RegistryClient.java
@@ -41,7 +41,6 @@ import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
-import java.util.concurrent.TimeUnit;
import lombok.NonNull;
import lombok.extern.slf4j.Slf4j;
@@ -71,7 +70,6 @@ public class RegistryClient {
if
(!registry.exists(RegistryNodeType.FAILOVER_FINISH_NODES.getRegistryPath())) {
registry.put(RegistryNodeType.FAILOVER_FINISH_NODES.getRegistryPath(), EMPTY,
false);
}
- cleanHistoryFailoverFinishedNodes();
}
public boolean isConnected() {
@@ -233,28 +231,4 @@ public class RegistryClient {
return getChildrenKeys(nodeType.getRegistryPath());
}
- private void cleanHistoryFailoverFinishedNodes() {
- // Clean the history failover finished nodes
- // which failover is before the current time minus 1 week
- final Collection<String> failoverFinishedNodes =
-
registry.children(RegistryNodeType.FAILOVER_FINISH_NODES.getRegistryPath());
- if (CollectionUtils.isEmpty(failoverFinishedNodes)) {
- return;
- }
- for (final String failoverFinishedNode : failoverFinishedNodes) {
- final String failoverFinishedNodePath =
RegistryNodeType.FAILOVER_FINISH_NODES.getRegistryPath()
- + Constants.SINGLE_SLASH + failoverFinishedNode;
- try {
- final String failoverFinishTime =
registry.get(failoverFinishedNodePath);
- if (System.currentTimeMillis() -
Long.parseLong(failoverFinishTime) > TimeUnit.DAYS.toMillis(7)) {
- registry.delete(failoverFinishedNodePath);
- log.info(
- "Clear the failover finished node: {} which
failover time is before the current time minus 1 week",
- failoverFinishedNodePath);
- }
- } catch (Exception ex) {
- log.error("Failed to clean the failoverFinishedNode: {}",
failoverFinishedNodePath, ex);
- }
- }
- }
}