This is an automated email from the ASF dual-hosted git repository.

caishunfeng pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git


The following commit(s) were added to refs/heads/dev by this push:
     new 3cb9330017 [Improvement] Optimize the AbstractHAServer implementation 
(#16810)
3cb9330017 is described below

commit 3cb93300177c246e2a903a4e8f6ea18f9fad1aa4
Author: Wenjun Ruan <[email protected]>
AuthorDate: Mon Nov 18 09:21:04 2024 +0800

    [Improvement] Optimize the AbstractHAServer implementation (#16810)
    
    Co-authored-by: xiangzihao <[email protected]>
---
 .../apache/dolphinscheduler/alert/AlertServer.java | 44 ++++++++++++-
 .../alert/service/AlertBootstrapService.java       | 30 +--------
 .../alert/service/AlertHAServer.java               | 15 ++++-
 .../registry/api/enums/RegistryNodeType.java       |  6 +-
 .../registry/api/ha/AbstractHAServer.java          | 72 +++++++++++++---------
 .../api/ha/AbstractServerStatusChangeListener.java |  1 -
 .../dolphinscheduler/registry/api/ha/HAServer.java |  5 +-
 7 files changed, 106 insertions(+), 67 deletions(-)

diff --git 
a/dolphinscheduler-alert/dolphinscheduler-alert-server/src/main/java/org/apache/dolphinscheduler/alert/AlertServer.java
 
b/dolphinscheduler-alert/dolphinscheduler-alert-server/src/main/java/org/apache/dolphinscheduler/alert/AlertServer.java
index ff2e985426..7a6d941b70 100644
--- 
a/dolphinscheduler-alert/dolphinscheduler-alert-server/src/main/java/org/apache/dolphinscheduler/alert/AlertServer.java
+++ 
b/dolphinscheduler-alert/dolphinscheduler-alert-server/src/main/java/org/apache/dolphinscheduler/alert/AlertServer.java
@@ -18,7 +18,11 @@
 package org.apache.dolphinscheduler.alert;
 
 import org.apache.dolphinscheduler.alert.metrics.AlertServerMetrics;
+import org.apache.dolphinscheduler.alert.plugin.AlertPluginManager;
+import org.apache.dolphinscheduler.alert.registry.AlertRegistryClient;
+import org.apache.dolphinscheduler.alert.rpc.AlertRpcServer;
 import org.apache.dolphinscheduler.alert.service.AlertBootstrapService;
+import org.apache.dolphinscheduler.alert.service.AlertHAServer;
 import org.apache.dolphinscheduler.common.CommonConfiguration;
 import org.apache.dolphinscheduler.common.constants.Constants;
 import org.apache.dolphinscheduler.common.lifecycle.ServerLifeCycleManager;
@@ -26,6 +30,7 @@ import 
org.apache.dolphinscheduler.common.thread.DefaultUncaughtExceptionHandler
 import org.apache.dolphinscheduler.common.thread.ThreadUtils;
 import org.apache.dolphinscheduler.dao.DaoConfiguration;
 import org.apache.dolphinscheduler.registry.api.RegistryConfiguration;
+import 
org.apache.dolphinscheduler.registry.api.ha.AbstractServerStatusChangeListener;
 
 import javax.annotation.PostConstruct;
 import javax.annotation.PreDestroy;
@@ -44,6 +49,18 @@ import org.springframework.context.annotation.Import;
 @SpringBootApplication
 public class AlertServer {
 
+    @Autowired
+    private AlertRpcServer alertRpcServer;
+
+    @Autowired
+    private AlertPluginManager alertPluginManager;
+
+    @Autowired
+    private AlertRegistryClient alertRegistryClient;
+
+    @Autowired
+    private AlertHAServer alertHAServer;
+
     @Autowired
     private AlertBootstrapService alertBootstrapService;
 
@@ -58,7 +75,25 @@ public class AlertServer {
     public void run() {
         ServerLifeCycleManager.toRunning();
         log.info("AlertServer is staring ...");
-        alertBootstrapService.start();
+        alertPluginManager.start();
+        alertRpcServer.start();
+        alertRegistryClient.start();
+
+        alertHAServer.addServerStatusChangeListener(new 
AbstractServerStatusChangeListener() {
+
+            @Override
+            public void changeToActive() {
+                alertBootstrapService.start();
+            }
+
+            @Override
+            public void changeToStandBy() {
+                close();
+            }
+        });
+
+        alertHAServer.start();
+
         log.info("AlertServer is started ...");
     }
 
@@ -73,7 +108,12 @@ public class AlertServer {
                 return;
             }
             log.info("AlertServer is stopping, cause: {}", cause);
-            alertBootstrapService.close();
+            try (
+                    final AlertRpcServer ignore = alertRpcServer;
+                    final AlertRegistryClient ignore1 = alertRegistryClient;
+                    final AlertHAServer ignore2 = alertHAServer;
+                    final AlertBootstrapService ignore3 = 
alertBootstrapService;) {
+            }
             // thread sleep 3 seconds for thread quietly stop
             ThreadUtils.sleep(Constants.SERVER_CLOSE_WAIT_TIME.toMillis());
             log.info("AlertServer stopped, cause: {}", cause);
diff --git 
a/dolphinscheduler-alert/dolphinscheduler-alert-server/src/main/java/org/apache/dolphinscheduler/alert/service/AlertBootstrapService.java
 
b/dolphinscheduler-alert/dolphinscheduler-alert-server/src/main/java/org/apache/dolphinscheduler/alert/service/AlertBootstrapService.java
index 1da1b8180f..2cefe896d3 100644
--- 
a/dolphinscheduler-alert/dolphinscheduler-alert-server/src/main/java/org/apache/dolphinscheduler/alert/service/AlertBootstrapService.java
+++ 
b/dolphinscheduler-alert/dolphinscheduler-alert-server/src/main/java/org/apache/dolphinscheduler/alert/service/AlertBootstrapService.java
@@ -17,7 +17,6 @@
 
 package org.apache.dolphinscheduler.alert.service;
 
-import org.apache.dolphinscheduler.alert.plugin.AlertPluginManager;
 import org.apache.dolphinscheduler.alert.registry.AlertRegistryClient;
 import org.apache.dolphinscheduler.alert.rpc.AlertRpcServer;
 
@@ -32,39 +31,21 @@ import org.springframework.stereotype.Service;
 @Service
 public final class AlertBootstrapService implements AutoCloseable {
 
-    private final AlertRpcServer alertRpcServer;
-
-    private final AlertRegistryClient alertRegistryClient;
-
-    private final AlertPluginManager alertPluginManager;
-
-    private final AlertHAServer alertHAServer;
-
     private final AlertEventFetcher alertEventFetcher;
 
     private final AlertEventLoop alertEventLoop;
 
     public AlertBootstrapService(AlertRpcServer alertRpcServer,
                                  AlertRegistryClient alertRegistryClient,
-                                 AlertPluginManager alertPluginManager,
                                  AlertHAServer alertHAServer,
                                  AlertEventFetcher alertEventFetcher,
                                  AlertEventLoop alertEventLoop) {
-        this.alertRpcServer = alertRpcServer;
-        this.alertRegistryClient = alertRegistryClient;
-        this.alertPluginManager = alertPluginManager;
-        this.alertHAServer = alertHAServer;
         this.alertEventFetcher = alertEventFetcher;
         this.alertEventLoop = alertEventLoop;
     }
 
     public void start() {
         log.info("AlertBootstrapService starting...");
-        alertPluginManager.start();
-        alertRpcServer.start();
-        alertRegistryClient.start();
-        alertHAServer.start();
-
         alertEventFetcher.start();
         alertEventLoop.start();
         log.info("AlertBootstrapService started...");
@@ -73,15 +54,8 @@ public final class AlertBootstrapService implements 
AutoCloseable {
     @Override
     public void close() {
         log.info("AlertBootstrapService stopping...");
-        try (
-                AlertRpcServer closedAlertRpcServer = alertRpcServer;
-                AlertRegistryClient closedAlertRegistryClient = 
alertRegistryClient) {
-            // close resource
-            alertEventFetcher.shutdown();
-
-            alertEventLoop.shutdown();
-            alertHAServer.shutdown();
-        }
+        alertEventFetcher.shutdown();
+        alertEventLoop.shutdown();
         log.info("AlertBootstrapService stopped...");
     }
 }
diff --git 
a/dolphinscheduler-alert/dolphinscheduler-alert-server/src/main/java/org/apache/dolphinscheduler/alert/service/AlertHAServer.java
 
b/dolphinscheduler-alert/dolphinscheduler-alert-server/src/main/java/org/apache/dolphinscheduler/alert/service/AlertHAServer.java
index 998bc655c4..67382ac470 100644
--- 
a/dolphinscheduler-alert/dolphinscheduler-alert-server/src/main/java/org/apache/dolphinscheduler/alert/service/AlertHAServer.java
+++ 
b/dolphinscheduler-alert/dolphinscheduler-alert-server/src/main/java/org/apache/dolphinscheduler/alert/service/AlertHAServer.java
@@ -17,6 +17,7 @@
 
 package org.apache.dolphinscheduler.alert.service;
 
+import org.apache.dolphinscheduler.alert.config.AlertConfig;
 import org.apache.dolphinscheduler.registry.api.Registry;
 import org.apache.dolphinscheduler.registry.api.enums.RegistryNodeType;
 import org.apache.dolphinscheduler.registry.api.ha.AbstractHAServer;
@@ -29,8 +30,18 @@ import org.springframework.stereotype.Component;
 @Component
 public class AlertHAServer extends AbstractHAServer {
 
-    public AlertHAServer(Registry registry) {
-        super(registry, RegistryNodeType.ALERT_LOCK.getRegistryPath());
+    public AlertHAServer(final Registry registry, final AlertConfig 
alertConfig) {
+        super(registry, RegistryNodeType.ALERT_HA_LEADER.getRegistryPath(), 
alertConfig.getAlertServerAddress());
     }
 
+    @Override
+    public void start() {
+        super.start();
+        log.info("AlertHAServer started...");
+    }
+
+    @Override
+    public void close() {
+        log.info("AlertHAServer shutdown...");
+    }
 }
diff --git 
a/dolphinscheduler-registry/dolphinscheduler-registry-api/src/main/java/org/apache/dolphinscheduler/registry/api/enums/RegistryNodeType.java
 
b/dolphinscheduler-registry/dolphinscheduler-registry-api/src/main/java/org/apache/dolphinscheduler/registry/api/enums/RegistryNodeType.java
index 7aaa5baa40..31a0f8d5aa 100644
--- 
a/dolphinscheduler-registry/dolphinscheduler-registry-api/src/main/java/org/apache/dolphinscheduler/registry/api/enums/RegistryNodeType.java
+++ 
b/dolphinscheduler-registry/dolphinscheduler-registry-api/src/main/java/org/apache/dolphinscheduler/registry/api/enums/RegistryNodeType.java
@@ -24,15 +24,15 @@ import lombok.Getter;
 @AllArgsConstructor
 public enum RegistryNodeType {
 
-    ALL_SERVERS("nodes", "/nodes"),
     MASTER("Master", "/nodes/master"),
-    MASTER_NODE_LOCK("MasterNodeLock", "/lock/master-node"),
     MASTER_FAILOVER_LOCK("MasterFailoverLock", "/lock/master-failover"),
     MASTER_TASK_GROUP_COORDINATOR_LOCK("TaskGroupCoordinatorLock", 
"/lock/master-task-group-coordinator"),
     MASTER_SERIAL_COORDINATOR_LOCK("SerialWorkflowCoordinator", 
"/lock/master-serial-workflow-coordinator"),
+
     WORKER("Worker", "/nodes/worker"),
+
     ALERT_SERVER("AlertServer", "/nodes/alert-server"),
-    ALERT_LOCK("AlertNodeLock", "/lock/alert");
+    ALERT_HA_LEADER("AlertHALeader", "/nodes/alert-server-ha-leader");
 
     private final String name;
 
diff --git 
a/dolphinscheduler-registry/dolphinscheduler-registry-api/src/main/java/org/apache/dolphinscheduler/registry/api/ha/AbstractHAServer.java
 
b/dolphinscheduler-registry/dolphinscheduler-registry-api/src/main/java/org/apache/dolphinscheduler/registry/api/ha/AbstractHAServer.java
index 5dca5552b3..822b4d93f4 100644
--- 
a/dolphinscheduler-registry/dolphinscheduler-registry-api/src/main/java/org/apache/dolphinscheduler/registry/api/ha/AbstractHAServer.java
+++ 
b/dolphinscheduler-registry/dolphinscheduler-registry-api/src/main/java/org/apache/dolphinscheduler/registry/api/ha/AbstractHAServer.java
@@ -17,13 +17,12 @@
 
 package org.apache.dolphinscheduler.registry.api.ha;
 
-import org.apache.dolphinscheduler.common.thread.ThreadUtils;
+import static com.google.common.base.Preconditions.checkNotNull;
+
 import org.apache.dolphinscheduler.registry.api.Event;
 import org.apache.dolphinscheduler.registry.api.Registry;
 
 import java.util.List;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
 
 import lombok.extern.slf4j.Slf4j;
 
@@ -34,38 +33,41 @@ public abstract class AbstractHAServer implements HAServer {
 
     private final Registry registry;
 
-    private final String serverPath;
+    private final String selectorPath;
+
+    private final String serverIdentify;
 
     private ServerStatus serverStatus;
 
     private final List<ServerStatusChangeListener> serverStatusChangeListeners;
 
-    public AbstractHAServer(Registry registry, String serverPath) {
+    public AbstractHAServer(final Registry registry, final String 
selectorPath, final String serverIdentify) {
         this.registry = registry;
-        this.serverPath = serverPath;
+        this.selectorPath = checkNotNull(selectorPath);
+        this.serverIdentify = checkNotNull(serverIdentify);
         this.serverStatus = ServerStatus.STAND_BY;
         this.serverStatusChangeListeners = Lists.newArrayList(new 
DefaultServerStatusChangeListener());
     }
 
     @Override
     public void start() {
-        registry.subscribe(serverPath, event -> {
+        registry.subscribe(selectorPath, event -> {
             if (Event.Type.REMOVE.equals(event.type())) {
-                if (isActive() && !participateElection()) {
+                if (serverIdentify.equals(event.data())) {
                     statusChange(ServerStatus.STAND_BY);
+                } else {
+                    if (participateElection()) {
+                        statusChange(ServerStatus.ACTIVE);
+                    }
                 }
             }
         });
-        ScheduledExecutorService electionSelectionThread =
-                
ThreadUtils.newSingleDaemonScheduledExecutorService("election-selection-thread");
-        electionSelectionThread.schedule(() -> {
-            if (isActive()) {
-                return;
-            }
-            if (participateElection()) {
-                statusChange(ServerStatus.ACTIVE);
-            }
-        }, 10, TimeUnit.SECONDS);
+
+        if (participateElection()) {
+            statusChange(ServerStatus.ACTIVE);
+        } else {
+            log.info("Server {} is standby", serverIdentify);
+        }
     }
 
     @Override
@@ -75,7 +77,22 @@ public abstract class AbstractHAServer implements HAServer {
 
     @Override
     public boolean participateElection() {
-        return registry.acquireLock(serverPath, 3_000);
+        final String electionLock = selectorPath + "-lock";
+        try {
+            if (registry.acquireLock(electionLock)) {
+                if (!registry.exists(selectorPath)) {
+                    registry.put(selectorPath, serverIdentify, true);
+                    return true;
+                }
+                return serverIdentify.equals(registry.get(selectorPath));
+            }
+            return false;
+        } catch (Exception e) {
+            log.error("participate election error", e);
+            return false;
+        } finally {
+            registry.releaseLock(electionLock);
+        }
     }
 
     @Override
@@ -88,18 +105,15 @@ public abstract class AbstractHAServer implements HAServer 
{
         return serverStatus;
     }
 
-    @Override
-    public void shutdown() {
-        if (isActive()) {
-            registry.releaseLock(serverPath);
-        }
-    }
-
     private void statusChange(ServerStatus targetStatus) {
+        final ServerStatus originStatus = serverStatus;
+        serverStatus = targetStatus;
         synchronized (this) {
-            ServerStatus originStatus = serverStatus;
-            serverStatus = targetStatus;
-            serverStatusChangeListeners.forEach(listener -> 
listener.change(originStatus, serverStatus));
+            try {
+                serverStatusChangeListeners.forEach(listener -> 
listener.change(originStatus, serverStatus));
+            } catch (Exception ex) {
+                log.error("Trigger ServerStatusChangeListener from {} -> {} 
error", originStatus, targetStatus, ex);
+            }
         }
     }
 }
diff --git 
a/dolphinscheduler-registry/dolphinscheduler-registry-api/src/main/java/org/apache/dolphinscheduler/registry/api/ha/AbstractServerStatusChangeListener.java
 
b/dolphinscheduler-registry/dolphinscheduler-registry-api/src/main/java/org/apache/dolphinscheduler/registry/api/ha/AbstractServerStatusChangeListener.java
index f2e332ea20..8b74e08fcc 100644
--- 
a/dolphinscheduler-registry/dolphinscheduler-registry-api/src/main/java/org/apache/dolphinscheduler/registry/api/ha/AbstractServerStatusChangeListener.java
+++ 
b/dolphinscheduler-registry/dolphinscheduler-registry-api/src/main/java/org/apache/dolphinscheduler/registry/api/ha/AbstractServerStatusChangeListener.java
@@ -24,7 +24,6 @@ public abstract class AbstractServerStatusChangeListener 
implements ServerStatus
 
     @Override
     public void change(HAServer.ServerStatus originStatus, 
HAServer.ServerStatus currentStatus) {
-        log.info("The status change from {} to {}.", originStatus, 
currentStatus);
         if (originStatus == HAServer.ServerStatus.ACTIVE) {
             if (currentStatus == HAServer.ServerStatus.STAND_BY) {
                 changeToStandBy();
diff --git 
a/dolphinscheduler-registry/dolphinscheduler-registry-api/src/main/java/org/apache/dolphinscheduler/registry/api/ha/HAServer.java
 
b/dolphinscheduler-registry/dolphinscheduler-registry-api/src/main/java/org/apache/dolphinscheduler/registry/api/ha/HAServer.java
index 6a79e6eb84..4fc7929cc8 100644
--- 
a/dolphinscheduler-registry/dolphinscheduler-registry-api/src/main/java/org/apache/dolphinscheduler/registry/api/ha/HAServer.java
+++ 
b/dolphinscheduler-registry/dolphinscheduler-registry-api/src/main/java/org/apache/dolphinscheduler/registry/api/ha/HAServer.java
@@ -21,7 +21,7 @@ package org.apache.dolphinscheduler.registry.api.ha;
  * Interface for HA server, used to select a active server from multiple 
servers.
  * In HA mode, there are multiple servers, only one server is active, others 
are standby.
  */
-public interface HAServer {
+public interface HAServer extends AutoCloseable {
 
     /**
      * Start the server.
@@ -57,7 +57,8 @@ public interface HAServer {
     /**
      * Shutdown the server, release resources.
      */
-    void shutdown();
+    @Override
+    void close();
 
     enum ServerStatus {
         ACTIVE,

Reply via email to