This is an automated email from the ASF dual-hosted git repository.
caishunfeng pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git
The following commit(s) were added to refs/heads/dev by this push:
new 3cb9330017 [Improvement] Optimize the AbstractHAServer implementation
(#16810)
3cb9330017 is described below
commit 3cb93300177c246e2a903a4e8f6ea18f9fad1aa4
Author: Wenjun Ruan <[email protected]>
AuthorDate: Mon Nov 18 09:21:04 2024 +0800
[Improvement] Optimize the AbstractHAServer implementation (#16810)
Co-authored-by: xiangzihao <[email protected]>
---
.../apache/dolphinscheduler/alert/AlertServer.java | 44 ++++++++++++-
.../alert/service/AlertBootstrapService.java | 30 +--------
.../alert/service/AlertHAServer.java | 15 ++++-
.../registry/api/enums/RegistryNodeType.java | 6 +-
.../registry/api/ha/AbstractHAServer.java | 72 +++++++++++++---------
.../api/ha/AbstractServerStatusChangeListener.java | 1 -
.../dolphinscheduler/registry/api/ha/HAServer.java | 5 +-
7 files changed, 106 insertions(+), 67 deletions(-)
diff --git
a/dolphinscheduler-alert/dolphinscheduler-alert-server/src/main/java/org/apache/dolphinscheduler/alert/AlertServer.java
b/dolphinscheduler-alert/dolphinscheduler-alert-server/src/main/java/org/apache/dolphinscheduler/alert/AlertServer.java
index ff2e985426..7a6d941b70 100644
---
a/dolphinscheduler-alert/dolphinscheduler-alert-server/src/main/java/org/apache/dolphinscheduler/alert/AlertServer.java
+++
b/dolphinscheduler-alert/dolphinscheduler-alert-server/src/main/java/org/apache/dolphinscheduler/alert/AlertServer.java
@@ -18,7 +18,11 @@
package org.apache.dolphinscheduler.alert;
import org.apache.dolphinscheduler.alert.metrics.AlertServerMetrics;
+import org.apache.dolphinscheduler.alert.plugin.AlertPluginManager;
+import org.apache.dolphinscheduler.alert.registry.AlertRegistryClient;
+import org.apache.dolphinscheduler.alert.rpc.AlertRpcServer;
import org.apache.dolphinscheduler.alert.service.AlertBootstrapService;
+import org.apache.dolphinscheduler.alert.service.AlertHAServer;
import org.apache.dolphinscheduler.common.CommonConfiguration;
import org.apache.dolphinscheduler.common.constants.Constants;
import org.apache.dolphinscheduler.common.lifecycle.ServerLifeCycleManager;
@@ -26,6 +30,7 @@ import
org.apache.dolphinscheduler.common.thread.DefaultUncaughtExceptionHandler
import org.apache.dolphinscheduler.common.thread.ThreadUtils;
import org.apache.dolphinscheduler.dao.DaoConfiguration;
import org.apache.dolphinscheduler.registry.api.RegistryConfiguration;
+import
org.apache.dolphinscheduler.registry.api.ha.AbstractServerStatusChangeListener;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
@@ -44,6 +49,18 @@ import org.springframework.context.annotation.Import;
@SpringBootApplication
public class AlertServer {
+ @Autowired
+ private AlertRpcServer alertRpcServer;
+
+ @Autowired
+ private AlertPluginManager alertPluginManager;
+
+ @Autowired
+ private AlertRegistryClient alertRegistryClient;
+
+ @Autowired
+ private AlertHAServer alertHAServer;
+
@Autowired
private AlertBootstrapService alertBootstrapService;
@@ -58,7 +75,25 @@ public class AlertServer {
public void run() {
ServerLifeCycleManager.toRunning();
log.info("AlertServer is staring ...");
- alertBootstrapService.start();
+ alertPluginManager.start();
+ alertRpcServer.start();
+ alertRegistryClient.start();
+
+ alertHAServer.addServerStatusChangeListener(new
AbstractServerStatusChangeListener() {
+
+ @Override
+ public void changeToActive() {
+ alertBootstrapService.start();
+ }
+
+ @Override
+ public void changeToStandBy() {
+ close();
+ }
+ });
+
+ alertHAServer.start();
+
log.info("AlertServer is started ...");
}
@@ -73,7 +108,12 @@ public class AlertServer {
return;
}
log.info("AlertServer is stopping, cause: {}", cause);
- alertBootstrapService.close();
+ try (
+ final AlertRpcServer ignore = alertRpcServer;
+ final AlertRegistryClient ignore1 = alertRegistryClient;
+ final AlertHAServer ignore2 = alertHAServer;
+ final AlertBootstrapService ignore3 =
alertBootstrapService;) {
+ }
// thread sleep 3 seconds for thread quietly stop
ThreadUtils.sleep(Constants.SERVER_CLOSE_WAIT_TIME.toMillis());
log.info("AlertServer stopped, cause: {}", cause);
diff --git
a/dolphinscheduler-alert/dolphinscheduler-alert-server/src/main/java/org/apache/dolphinscheduler/alert/service/AlertBootstrapService.java
b/dolphinscheduler-alert/dolphinscheduler-alert-server/src/main/java/org/apache/dolphinscheduler/alert/service/AlertBootstrapService.java
index 1da1b8180f..2cefe896d3 100644
---
a/dolphinscheduler-alert/dolphinscheduler-alert-server/src/main/java/org/apache/dolphinscheduler/alert/service/AlertBootstrapService.java
+++
b/dolphinscheduler-alert/dolphinscheduler-alert-server/src/main/java/org/apache/dolphinscheduler/alert/service/AlertBootstrapService.java
@@ -17,7 +17,6 @@
package org.apache.dolphinscheduler.alert.service;
-import org.apache.dolphinscheduler.alert.plugin.AlertPluginManager;
import org.apache.dolphinscheduler.alert.registry.AlertRegistryClient;
import org.apache.dolphinscheduler.alert.rpc.AlertRpcServer;
@@ -32,39 +31,21 @@ import org.springframework.stereotype.Service;
@Service
public final class AlertBootstrapService implements AutoCloseable {
- private final AlertRpcServer alertRpcServer;
-
- private final AlertRegistryClient alertRegistryClient;
-
- private final AlertPluginManager alertPluginManager;
-
- private final AlertHAServer alertHAServer;
-
private final AlertEventFetcher alertEventFetcher;
private final AlertEventLoop alertEventLoop;
public AlertBootstrapService(AlertRpcServer alertRpcServer,
AlertRegistryClient alertRegistryClient,
- AlertPluginManager alertPluginManager,
AlertHAServer alertHAServer,
AlertEventFetcher alertEventFetcher,
AlertEventLoop alertEventLoop) {
- this.alertRpcServer = alertRpcServer;
- this.alertRegistryClient = alertRegistryClient;
- this.alertPluginManager = alertPluginManager;
- this.alertHAServer = alertHAServer;
this.alertEventFetcher = alertEventFetcher;
this.alertEventLoop = alertEventLoop;
}
public void start() {
log.info("AlertBootstrapService starting...");
- alertPluginManager.start();
- alertRpcServer.start();
- alertRegistryClient.start();
- alertHAServer.start();
-
alertEventFetcher.start();
alertEventLoop.start();
log.info("AlertBootstrapService started...");
@@ -73,15 +54,8 @@ public final class AlertBootstrapService implements
AutoCloseable {
@Override
public void close() {
log.info("AlertBootstrapService stopping...");
- try (
- AlertRpcServer closedAlertRpcServer = alertRpcServer;
- AlertRegistryClient closedAlertRegistryClient =
alertRegistryClient) {
- // close resource
- alertEventFetcher.shutdown();
-
- alertEventLoop.shutdown();
- alertHAServer.shutdown();
- }
+ alertEventFetcher.shutdown();
+ alertEventLoop.shutdown();
log.info("AlertBootstrapService stopped...");
}
}
diff --git
a/dolphinscheduler-alert/dolphinscheduler-alert-server/src/main/java/org/apache/dolphinscheduler/alert/service/AlertHAServer.java
b/dolphinscheduler-alert/dolphinscheduler-alert-server/src/main/java/org/apache/dolphinscheduler/alert/service/AlertHAServer.java
index 998bc655c4..67382ac470 100644
---
a/dolphinscheduler-alert/dolphinscheduler-alert-server/src/main/java/org/apache/dolphinscheduler/alert/service/AlertHAServer.java
+++
b/dolphinscheduler-alert/dolphinscheduler-alert-server/src/main/java/org/apache/dolphinscheduler/alert/service/AlertHAServer.java
@@ -17,6 +17,7 @@
package org.apache.dolphinscheduler.alert.service;
+import org.apache.dolphinscheduler.alert.config.AlertConfig;
import org.apache.dolphinscheduler.registry.api.Registry;
import org.apache.dolphinscheduler.registry.api.enums.RegistryNodeType;
import org.apache.dolphinscheduler.registry.api.ha.AbstractHAServer;
@@ -29,8 +30,18 @@ import org.springframework.stereotype.Component;
@Component
public class AlertHAServer extends AbstractHAServer {
- public AlertHAServer(Registry registry) {
- super(registry, RegistryNodeType.ALERT_LOCK.getRegistryPath());
+ public AlertHAServer(final Registry registry, final AlertConfig
alertConfig) {
+ super(registry, RegistryNodeType.ALERT_HA_LEADER.getRegistryPath(),
alertConfig.getAlertServerAddress());
}
+ @Override
+ public void start() {
+ super.start();
+ log.info("AlertHAServer started...");
+ }
+
+ @Override
+ public void close() {
+ log.info("AlertHAServer shutdown...");
+ }
}
diff --git
a/dolphinscheduler-registry/dolphinscheduler-registry-api/src/main/java/org/apache/dolphinscheduler/registry/api/enums/RegistryNodeType.java
b/dolphinscheduler-registry/dolphinscheduler-registry-api/src/main/java/org/apache/dolphinscheduler/registry/api/enums/RegistryNodeType.java
index 7aaa5baa40..31a0f8d5aa 100644
---
a/dolphinscheduler-registry/dolphinscheduler-registry-api/src/main/java/org/apache/dolphinscheduler/registry/api/enums/RegistryNodeType.java
+++
b/dolphinscheduler-registry/dolphinscheduler-registry-api/src/main/java/org/apache/dolphinscheduler/registry/api/enums/RegistryNodeType.java
@@ -24,15 +24,15 @@ import lombok.Getter;
@AllArgsConstructor
public enum RegistryNodeType {
- ALL_SERVERS("nodes", "/nodes"),
MASTER("Master", "/nodes/master"),
- MASTER_NODE_LOCK("MasterNodeLock", "/lock/master-node"),
MASTER_FAILOVER_LOCK("MasterFailoverLock", "/lock/master-failover"),
MASTER_TASK_GROUP_COORDINATOR_LOCK("TaskGroupCoordinatorLock",
"/lock/master-task-group-coordinator"),
MASTER_SERIAL_COORDINATOR_LOCK("SerialWorkflowCoordinator",
"/lock/master-serial-workflow-coordinator"),
+
WORKER("Worker", "/nodes/worker"),
+
ALERT_SERVER("AlertServer", "/nodes/alert-server"),
- ALERT_LOCK("AlertNodeLock", "/lock/alert");
+ ALERT_HA_LEADER("AlertHALeader", "/nodes/alert-server-ha-leader");
private final String name;
diff --git
a/dolphinscheduler-registry/dolphinscheduler-registry-api/src/main/java/org/apache/dolphinscheduler/registry/api/ha/AbstractHAServer.java
b/dolphinscheduler-registry/dolphinscheduler-registry-api/src/main/java/org/apache/dolphinscheduler/registry/api/ha/AbstractHAServer.java
index 5dca5552b3..822b4d93f4 100644
---
a/dolphinscheduler-registry/dolphinscheduler-registry-api/src/main/java/org/apache/dolphinscheduler/registry/api/ha/AbstractHAServer.java
+++
b/dolphinscheduler-registry/dolphinscheduler-registry-api/src/main/java/org/apache/dolphinscheduler/registry/api/ha/AbstractHAServer.java
@@ -17,13 +17,12 @@
package org.apache.dolphinscheduler.registry.api.ha;
-import org.apache.dolphinscheduler.common.thread.ThreadUtils;
+import static com.google.common.base.Preconditions.checkNotNull;
+
import org.apache.dolphinscheduler.registry.api.Event;
import org.apache.dolphinscheduler.registry.api.Registry;
import java.util.List;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
import lombok.extern.slf4j.Slf4j;
@@ -34,38 +33,41 @@ public abstract class AbstractHAServer implements HAServer {
private final Registry registry;
- private final String serverPath;
+ private final String selectorPath;
+
+ private final String serverIdentify;
private ServerStatus serverStatus;
private final List<ServerStatusChangeListener> serverStatusChangeListeners;
- public AbstractHAServer(Registry registry, String serverPath) {
+ public AbstractHAServer(final Registry registry, final String
selectorPath, final String serverIdentify) {
this.registry = registry;
- this.serverPath = serverPath;
+ this.selectorPath = checkNotNull(selectorPath);
+ this.serverIdentify = checkNotNull(serverIdentify);
this.serverStatus = ServerStatus.STAND_BY;
this.serverStatusChangeListeners = Lists.newArrayList(new
DefaultServerStatusChangeListener());
}
@Override
public void start() {
- registry.subscribe(serverPath, event -> {
+ registry.subscribe(selectorPath, event -> {
if (Event.Type.REMOVE.equals(event.type())) {
- if (isActive() && !participateElection()) {
+ if (serverIdentify.equals(event.data())) {
statusChange(ServerStatus.STAND_BY);
+ } else {
+ if (participateElection()) {
+ statusChange(ServerStatus.ACTIVE);
+ }
}
}
});
- ScheduledExecutorService electionSelectionThread =
-
ThreadUtils.newSingleDaemonScheduledExecutorService("election-selection-thread");
- electionSelectionThread.schedule(() -> {
- if (isActive()) {
- return;
- }
- if (participateElection()) {
- statusChange(ServerStatus.ACTIVE);
- }
- }, 10, TimeUnit.SECONDS);
+
+ if (participateElection()) {
+ statusChange(ServerStatus.ACTIVE);
+ } else {
+ log.info("Server {} is standby", serverIdentify);
+ }
}
@Override
@@ -75,7 +77,22 @@ public abstract class AbstractHAServer implements HAServer {
@Override
public boolean participateElection() {
- return registry.acquireLock(serverPath, 3_000);
+ final String electionLock = selectorPath + "-lock";
+ try {
+ if (registry.acquireLock(electionLock)) {
+ if (!registry.exists(selectorPath)) {
+ registry.put(selectorPath, serverIdentify, true);
+ return true;
+ }
+ return serverIdentify.equals(registry.get(selectorPath));
+ }
+ return false;
+ } catch (Exception e) {
+ log.error("participate election error", e);
+ return false;
+ } finally {
+ registry.releaseLock(electionLock);
+ }
}
@Override
@@ -88,18 +105,15 @@ public abstract class AbstractHAServer implements HAServer
{
return serverStatus;
}
- @Override
- public void shutdown() {
- if (isActive()) {
- registry.releaseLock(serverPath);
- }
- }
-
private void statusChange(ServerStatus targetStatus) {
+ final ServerStatus originStatus = serverStatus;
+ serverStatus = targetStatus;
synchronized (this) {
- ServerStatus originStatus = serverStatus;
- serverStatus = targetStatus;
- serverStatusChangeListeners.forEach(listener ->
listener.change(originStatus, serverStatus));
+ try {
+ serverStatusChangeListeners.forEach(listener ->
listener.change(originStatus, serverStatus));
+ } catch (Exception ex) {
+ log.error("Trigger ServerStatusChangeListener from {} -> {}
error", originStatus, targetStatus, ex);
+ }
}
}
}
diff --git
a/dolphinscheduler-registry/dolphinscheduler-registry-api/src/main/java/org/apache/dolphinscheduler/registry/api/ha/AbstractServerStatusChangeListener.java
b/dolphinscheduler-registry/dolphinscheduler-registry-api/src/main/java/org/apache/dolphinscheduler/registry/api/ha/AbstractServerStatusChangeListener.java
index f2e332ea20..8b74e08fcc 100644
---
a/dolphinscheduler-registry/dolphinscheduler-registry-api/src/main/java/org/apache/dolphinscheduler/registry/api/ha/AbstractServerStatusChangeListener.java
+++
b/dolphinscheduler-registry/dolphinscheduler-registry-api/src/main/java/org/apache/dolphinscheduler/registry/api/ha/AbstractServerStatusChangeListener.java
@@ -24,7 +24,6 @@ public abstract class AbstractServerStatusChangeListener
implements ServerStatus
@Override
public void change(HAServer.ServerStatus originStatus,
HAServer.ServerStatus currentStatus) {
- log.info("The status change from {} to {}.", originStatus,
currentStatus);
if (originStatus == HAServer.ServerStatus.ACTIVE) {
if (currentStatus == HAServer.ServerStatus.STAND_BY) {
changeToStandBy();
diff --git
a/dolphinscheduler-registry/dolphinscheduler-registry-api/src/main/java/org/apache/dolphinscheduler/registry/api/ha/HAServer.java
b/dolphinscheduler-registry/dolphinscheduler-registry-api/src/main/java/org/apache/dolphinscheduler/registry/api/ha/HAServer.java
index 6a79e6eb84..4fc7929cc8 100644
---
a/dolphinscheduler-registry/dolphinscheduler-registry-api/src/main/java/org/apache/dolphinscheduler/registry/api/ha/HAServer.java
+++
b/dolphinscheduler-registry/dolphinscheduler-registry-api/src/main/java/org/apache/dolphinscheduler/registry/api/ha/HAServer.java
@@ -21,7 +21,7 @@ package org.apache.dolphinscheduler.registry.api.ha;
* Interface for HA server, used to select a active server from multiple
servers.
* In HA mode, there are multiple servers, only one server is active, others
are standby.
*/
-public interface HAServer {
+public interface HAServer extends AutoCloseable {
/**
* Start the server.
@@ -57,7 +57,8 @@ public interface HAServer {
/**
* Shutdown the server, release resources.
*/
- void shutdown();
+ @Override
+ void close();
enum ServerStatus {
ACTIVE,