This is an automated email from the ASF dual-hosted git repository.
jinsongzhou pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/amoro.git
The following commit(s) were added to refs/heads/master by this push:
new b324ae24f [AMORO-3994] Support Exposing AMS High Availability (HA)
Status (#3996)
b324ae24f is described below
commit b324ae24f001e4788e3bf71d0dadc40d129ad1e7
Author: davedwwang <[email protected]>
AuthorDate: Thu Jan 8 19:57:51 2026 +0800
[AMORO-3994] Support Exposing AMS High Availability (HA) Status (#3996)
[amoro-3994] Support Exposing AMS High Availability (HA) Status
Co-authored-by: davedwwang <[email protected]>
---
.../apache/amoro/server/AmoroServiceContainer.java | 52 ++++++++++++++++++++--
.../org/apache/amoro/server/AmsServiceMetrics.java | 23 +++++++++-
.../amoro/server/dashboard/DashboardServer.java | 6 ++-
.../controller/HealthCheckController.java | 16 ++++++-
4 files changed, 89 insertions(+), 8 deletions(-)
diff --git
a/amoro-ams/src/main/java/org/apache/amoro/server/AmoroServiceContainer.java
b/amoro-ams/src/main/java/org/apache/amoro/server/AmoroServiceContainer.java
index 18cdf8666..ad19bbe93 100644
--- a/amoro-ams/src/main/java/org/apache/amoro/server/AmoroServiceContainer.java
+++ b/amoro-ams/src/main/java/org/apache/amoro/server/AmoroServiceContainer.java
@@ -114,6 +114,7 @@ public class AmoroServiceContainer {
private TServer optimizingServiceServer;
private Javalin httpServer;
private AmsServiceMetrics amsServiceMetrics;
+ private HAState haState = HAState.INITIALIZING;
public AmoroServiceContainer() throws Exception {
initConfig();
@@ -134,13 +135,15 @@ public class AmoroServiceContainer {
service.startRestServices();
while (true) {
try {
+ // Used to block AMS instances that have not acquired leadership
service.waitLeaderShip();
- service.startOptimizingService();
+ service.transitionToLeader();
+ // Used to block AMS instances that have acquired leadership
service.waitFollowerShip();
} catch (Exception e) {
LOG.error("AMS start error", e);
} finally {
- service.disposeOptimizingService();
+ service.transitionToFollower();
}
}
} catch (Throwable t) {
@@ -149,6 +152,26 @@ public class AmoroServiceContainer {
}
}
+ public enum HAState {
+ INITIALIZING(0),
+ FOLLOWER(1),
+ LEADER(2);
+
+ private int code;
+
+ HAState(int code) {
+ this.code = code;
+ }
+
+ public int getCode() {
+ return code;
+ }
+ }
+
+ public HAState getHaState() {
+ return haState;
+ }
+
public void waitLeaderShip() throws Exception {
haContainer.waitLeaderShip();
}
@@ -171,6 +194,22 @@ public class AmoroServiceContainer {
registerAmsServiceMetric();
}
+ public void transitionToLeader() throws Exception {
+ if (haState == HAState.LEADER) {
+ return;
+ }
+ startOptimizingService();
+ haState = HAState.LEADER;
+ }
+
+ public void transitionToFollower() {
+ if (haState == HAState.FOLLOWER) {
+ return;
+ }
+ haState = HAState.FOLLOWER;
+ disposeOptimizingService();
+ }
+
public void startOptimizingService() throws Exception {
TableRuntimeFactoryManager tableRuntimeFactoryManager = new
TableRuntimeFactoryManager();
tableRuntimeFactoryManager.initialize();
@@ -269,6 +308,10 @@ public class AmoroServiceContainer {
new ConfigurationHelper().init();
}
+ public Configurations getServiceConfig() {
+ return serviceConfig;
+ }
+
private void startThriftService() {
startThriftServer(tableManagementServer,
"thrift-table-management-server-thread");
startThriftServer(optimizingServiceServer,
"thrift-optimizing-server-thread");
@@ -284,7 +327,7 @@ public class AmoroServiceContainer {
private void initHttpService() {
DashboardServer dashboardServer =
new DashboardServer(
- serviceConfig, catalogManager, tableManager, optimizerManager,
terminalManager);
+ serviceConfig, catalogManager, tableManager, optimizerManager,
terminalManager, this);
RestCatalogService restCatalogService = new
RestCatalogService(catalogManager, tableManager);
httpServer =
@@ -360,7 +403,8 @@ public class AmoroServiceContainer {
}
private void registerAmsServiceMetric() {
- amsServiceMetrics = new
AmsServiceMetrics(MetricManager.getInstance().getGlobalRegistry());
+ amsServiceMetrics =
+ new AmsServiceMetrics(MetricManager.getInstance().getGlobalRegistry(),
this);
amsServiceMetrics.register();
}
diff --git
a/amoro-ams/src/main/java/org/apache/amoro/server/AmsServiceMetrics.java
b/amoro-ams/src/main/java/org/apache/amoro/server/AmsServiceMetrics.java
index 560e4da77..64e2c591c 100644
--- a/amoro-ams/src/main/java/org/apache/amoro/server/AmsServiceMetrics.java
+++ b/amoro-ams/src/main/java/org/apache/amoro/server/AmsServiceMetrics.java
@@ -76,11 +76,22 @@ public class AmsServiceMetrics {
.withTags(GARBAGE_COLLECTOR_TAG)
.build();
+ public static final String SERVICE_STATE_TAG = "service_state";
+ public static final String AMS_HA_STATE_NAME = "ams_ha_state";
+ public static final MetricDefine AMS_HA_STATE =
+ defineGauge(AMS_HA_STATE_NAME)
+ .withDescription("The HA state of the AMS")
+ .withTags(SERVICE_STATE_TAG)
+ .build();
+
private final MetricRegistry registry;
private List<MetricKey> registeredMetricKeys = Lists.newArrayList();
- public AmsServiceMetrics(MetricRegistry registry) {
+ private AmoroServiceContainer ams;
+
+ public AmsServiceMetrics(MetricRegistry registry, AmoroServiceContainer ams)
{
this.registry = registry;
+ this.ams = ams;
}
public void register() {
@@ -88,6 +99,7 @@ public class AmsServiceMetrics {
registerThreadMetric();
registerCPuMetric();
registerGarbageCollectorMetrics();
+ registerHAStateMetrics();
}
public void unregister() {
@@ -140,6 +152,15 @@ public class AmsServiceMetrics {
}
}
+ private void registerHAStateMetrics() {
+ String host =
ams.getServiceConfig().get(AmoroManagementConf.SERVER_EXPOSE_HOST);
+ registerMetric(
+ registry,
+ AMS_HA_STATE,
+ ImmutableMap.of(SERVICE_STATE_TAG, host),
+ (Gauge<Integer>) () -> ams.getHaState().getCode());
+ }
+
private void registerMetric(MetricRegistry registry, MetricDefine define,
Metric metric) {
MetricKey key = registry.register(define, Collections.emptyMap(), metric);
registeredMetricKeys.add(key);
diff --git
a/amoro-ams/src/main/java/org/apache/amoro/server/dashboard/DashboardServer.java
b/amoro-ams/src/main/java/org/apache/amoro/server/dashboard/DashboardServer.java
index f24f9c333..5233b7676 100644
---
a/amoro-ams/src/main/java/org/apache/amoro/server/dashboard/DashboardServer.java
+++
b/amoro-ams/src/main/java/org/apache/amoro/server/dashboard/DashboardServer.java
@@ -36,6 +36,7 @@ import org.apache.amoro.config.Configurations;
import org.apache.amoro.exception.ForbiddenException;
import org.apache.amoro.exception.SignatureCheckException;
import org.apache.amoro.server.AmoroManagementConf;
+import org.apache.amoro.server.AmoroServiceContainer;
import org.apache.amoro.server.RestCatalogService;
import org.apache.amoro.server.authentication.HttpAuthenticationFactory;
import org.apache.amoro.server.catalog.CatalogManager;
@@ -99,10 +100,11 @@ public class DashboardServer {
CatalogManager catalogManager,
TableManager tableManager,
OptimizerManager optimizerManager,
- TerminalManager terminalManager) {
+ TerminalManager terminalManager,
+ AmoroServiceContainer ams) {
PlatformFileManager platformFileManager = new PlatformFileManager();
this.catalogController = new CatalogController(catalogManager,
platformFileManager);
- this.healthCheckController = new HealthCheckController();
+ this.healthCheckController = new HealthCheckController(ams);
this.loginController = new LoginController(serviceConfig);
this.optimizerGroupController = new OptimizerGroupController(tableManager,
optimizerManager);
this.optimizerController = new OptimizerController(optimizerManager);
diff --git
a/amoro-ams/src/main/java/org/apache/amoro/server/dashboard/controller/HealthCheckController.java
b/amoro-ams/src/main/java/org/apache/amoro/server/dashboard/controller/HealthCheckController.java
index 23eff8f91..b2e229a37 100644
---
a/amoro-ams/src/main/java/org/apache/amoro/server/dashboard/controller/HealthCheckController.java
+++
b/amoro-ams/src/main/java/org/apache/amoro/server/dashboard/controller/HealthCheckController.java
@@ -18,13 +18,27 @@
package org.apache.amoro.server.dashboard.controller;
+import static org.apache.amoro.server.AmsServiceMetrics.AMS_HA_STATE_NAME;
+
import io.javalin.http.Context;
+import org.apache.amoro.server.AmoroServiceContainer;
import org.apache.amoro.server.dashboard.response.OkResponse;
+import java.util.HashMap;
+import java.util.Map;
+
/** The controller that handles health check requests. */
public class HealthCheckController {
+ private AmoroServiceContainer ams;
+
+ public HealthCheckController(AmoroServiceContainer ams) {
+ this.ams = ams;
+ }
+
public void healthCheck(Context ctx) {
- ctx.json(OkResponse.of(null));
+ Map<String, String> healthCheckResult = new HashMap<>(4);
+ healthCheckResult.put(AMS_HA_STATE_NAME, ams.getHaState().toString());
+ ctx.json(OkResponse.of(healthCheckResult));
}
}