This is an automated email from the ASF dual-hosted git repository.
jinsongzhou pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/amoro.git
The following commit(s) were added to refs/heads/master by this push:
new 75f92c8f6 [AMORO-2810] Support rest service for follower AMS (#3737)
75f92c8f6 is described below
commit 75f92c8f6302dc8108fe87ac38d65a271b54189d
Author: ZhouJinsong <[email protected]>
AuthorDate: Wed Aug 20 10:40:28 2025 +0800
[AMORO-2810] Support rest service for follower AMS (#3737)
* Support multiple nodes to access Amoro Rest service in a high
availability environment
* Fix conflicts
* server dispose rename
* Split dispose processing of rest service and optimizing service
* as head
* Remove useless codes
---------
Co-authored-by: ConradJam <[email protected]>
---
.../apache/amoro/server/AmoroServiceContainer.java | 50 +++++++++++++---------
.../org/apache/amoro/server/AmsEnvironment.java | 3 +-
2 files changed, 32 insertions(+), 21 deletions(-)
diff --git
a/amoro-ams/src/main/java/org/apache/amoro/server/AmoroServiceContainer.java
b/amoro-ams/src/main/java/org/apache/amoro/server/AmoroServiceContainer.java
index 418b60a07..4d418d7f5 100644
--- a/amoro-ams/src/main/java/org/apache/amoro/server/AmoroServiceContainer.java
+++ b/amoro-ams/src/main/java/org/apache/amoro/server/AmoroServiceContainer.java
@@ -126,15 +126,16 @@ public class AmoroServiceContainer {
service.dispose();
LOG.info("AMS service has been shut down");
}));
+ service.startRestServices();
while (true) {
try {
service.waitLeaderShip();
- service.startService();
+ service.startOptimizingService();
service.waitFollowerShip();
} catch (Exception e) {
LOG.error("AMS start error", e);
} finally {
- service.dispose();
+ service.disposeOptimizingService();
}
}
} catch (Throwable t) {
@@ -151,14 +152,21 @@ public class AmoroServiceContainer {
haContainer.waitFollowerShip();
}
- public void startService() throws Exception {
+ public void startRestServices() throws Exception {
EventsManager.getInstance();
MetricManager.getInstance();
catalogManager = new DefaultCatalogManager(serviceConfig);
tableManager = new DefaultTableManager(serviceConfig, catalogManager);
optimizerManager = new DefaultOptimizerManager(serviceConfig,
catalogManager);
+ terminalManager = new TerminalManager(serviceConfig, catalogManager);
+
+ initHttpService();
+ startHttpService();
+ registerAmsServiceMetric();
+ }
+ public void startOptimizingService() throws Exception {
tableService = new DefaultTableService(serviceConfig, catalogManager);
optimizingService =
@@ -180,14 +188,9 @@ public class AmoroServiceContainer {
tableService.initialize();
LOG.info("AMS table service have been initialized");
tableManager.setTableService(tableService);
- terminalManager = new TerminalManager(serviceConfig, catalogManager);
initThriftService();
startThriftService();
-
- initHttpService();
- startHttpService();
- registerAmsServiceMetric();
}
private void addHandlerChain(RuntimeHandlerChain chain) {
@@ -196,7 +199,7 @@ public class AmoroServiceContainer {
}
}
- public void dispose() {
+ public void disposeOptimizingService() {
if (tableManagementServer != null && tableManagementServer.isServing()) {
LOG.info("Stopping table management server...");
tableManagementServer.stop();
@@ -205,6 +208,19 @@ public class AmoroServiceContainer {
LOG.info("Stopping optimizing server...");
optimizingServiceServer.stop();
}
+ if (tableService != null) {
+ LOG.info("Stopping table service...");
+ tableService.dispose();
+ tableService = null;
+ }
+ if (optimizingService != null) {
+ LOG.info("Stopping optimizing service...");
+ optimizingService.dispose();
+ optimizingService = null;
+ }
+ }
+
+ public void disposeRestService() {
if (httpServer != null) {
LOG.info("Stopping http server...");
try {
@@ -213,22 +229,11 @@ public class AmoroServiceContainer {
LOG.error("Error stopping http server", e);
}
}
- if (tableService != null) {
- LOG.info("Stopping table service...");
- tableService.dispose();
- tableService = null;
- }
if (terminalManager != null) {
LOG.info("Stopping terminal manager...");
terminalManager.dispose();
terminalManager = null;
}
- if (optimizingService != null) {
- LOG.info("Stopping optimizing service...");
- optimizingService.dispose();
- optimizingService = null;
- }
-
if (amsServiceMetrics != null) {
amsServiceMetrics.unregister();
}
@@ -237,6 +242,11 @@ public class AmoroServiceContainer {
MetricManager.dispose();
}
+ public void dispose() {
+ disposeOptimizingService();
+ disposeRestService();
+ }
+
private void initConfig() throws Exception {
LOG.info("initializing configurations...");
new ConfigurationHelper().init();
diff --git
a/amoro-ams/src/test/java/org/apache/amoro/server/AmsEnvironment.java
b/amoro-ams/src/test/java/org/apache/amoro/server/AmsEnvironment.java
index 39384c68c..827135da4 100644
--- a/amoro-ams/src/test/java/org/apache/amoro/server/AmsEnvironment.java
+++ b/amoro-ams/src/test/java/org/apache/amoro/server/AmsEnvironment.java
@@ -315,7 +315,8 @@ public class AmsEnvironment {
AmoroManagementConf.OPTIMIZING_SERVICE_THRIFT_BIND_PORT,
optimizingServiceBindPort);
serviceConfig.set(
AmoroManagementConf.REFRESH_EXTERNAL_CATALOGS_INTERVAL,
Duration.ofMillis(1000L));
- serviceContainer.startService();
+ serviceContainer.startRestServices();
+ serviceContainer.startOptimizingService();
LOG.info("Started test AMS.");
break;
} catch (TTransportException e) {