This is an automated email from the ASF dual-hosted git repository.

jinsongzhou pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/amoro.git


The following commit(s) were added to refs/heads/master by this push:
     new 75f92c8f6 [AMORO-2810] Support rest service for follower AMS (#3737)
75f92c8f6 is described below

commit 75f92c8f6302dc8108fe87ac38d65a271b54189d
Author: ZhouJinsong <[email protected]>
AuthorDate: Wed Aug 20 10:40:28 2025 +0800

    [AMORO-2810] Support rest service for follower AMS (#3737)
    
    * Support multiple nodes to access Amoro Rest service in a high 
availability environment
    
    * Fix conflicts
    
    * server dispose rename
    
    * Split dispose processing of rest service and optimizing service
    
    * as head
    
    * Remove useless codes
    
    ---------
    
    Co-authored-by: ConradJam <[email protected]>
---
 .../apache/amoro/server/AmoroServiceContainer.java | 50 +++++++++++++---------
 .../org/apache/amoro/server/AmsEnvironment.java    |  3 +-
 2 files changed, 32 insertions(+), 21 deletions(-)

diff --git 
a/amoro-ams/src/main/java/org/apache/amoro/server/AmoroServiceContainer.java 
b/amoro-ams/src/main/java/org/apache/amoro/server/AmoroServiceContainer.java
index 418b60a07..4d418d7f5 100644
--- a/amoro-ams/src/main/java/org/apache/amoro/server/AmoroServiceContainer.java
+++ b/amoro-ams/src/main/java/org/apache/amoro/server/AmoroServiceContainer.java
@@ -126,15 +126,16 @@ public class AmoroServiceContainer {
                     service.dispose();
                     LOG.info("AMS service has been shut down");
                   }));
+      service.startRestServices();
       while (true) {
         try {
           service.waitLeaderShip();
-          service.startService();
+          service.startOptimizingService();
           service.waitFollowerShip();
         } catch (Exception e) {
           LOG.error("AMS start error", e);
         } finally {
-          service.dispose();
+          service.disposeOptimizingService();
         }
       }
     } catch (Throwable t) {
@@ -151,14 +152,21 @@ public class AmoroServiceContainer {
     haContainer.waitFollowerShip();
   }
 
-  public void startService() throws Exception {
+  public void startRestServices() throws Exception {
     EventsManager.getInstance();
     MetricManager.getInstance();
 
     catalogManager = new DefaultCatalogManager(serviceConfig);
     tableManager = new DefaultTableManager(serviceConfig, catalogManager);
     optimizerManager = new DefaultOptimizerManager(serviceConfig, 
catalogManager);
+    terminalManager = new TerminalManager(serviceConfig, catalogManager);
+
+    initHttpService();
+    startHttpService();
+    registerAmsServiceMetric();
+  }
 
+  public void startOptimizingService() throws Exception {
     tableService = new DefaultTableService(serviceConfig, catalogManager);
 
     optimizingService =
@@ -180,14 +188,9 @@ public class AmoroServiceContainer {
     tableService.initialize();
     LOG.info("AMS table service have been initialized");
     tableManager.setTableService(tableService);
-    terminalManager = new TerminalManager(serviceConfig, catalogManager);
 
     initThriftService();
     startThriftService();
-
-    initHttpService();
-    startHttpService();
-    registerAmsServiceMetric();
   }
 
   private void addHandlerChain(RuntimeHandlerChain chain) {
@@ -196,7 +199,7 @@ public class AmoroServiceContainer {
     }
   }
 
-  public void dispose() {
+  public void disposeOptimizingService() {
     if (tableManagementServer != null && tableManagementServer.isServing()) {
       LOG.info("Stopping table management server...");
       tableManagementServer.stop();
@@ -205,6 +208,19 @@ public class AmoroServiceContainer {
       LOG.info("Stopping optimizing server...");
       optimizingServiceServer.stop();
     }
+    if (tableService != null) {
+      LOG.info("Stopping table service...");
+      tableService.dispose();
+      tableService = null;
+    }
+    if (optimizingService != null) {
+      LOG.info("Stopping optimizing service...");
+      optimizingService.dispose();
+      optimizingService = null;
+    }
+  }
+
+  public void disposeRestService() {
     if (httpServer != null) {
       LOG.info("Stopping http server...");
       try {
@@ -213,22 +229,11 @@ public class AmoroServiceContainer {
         LOG.error("Error stopping http server", e);
       }
     }
-    if (tableService != null) {
-      LOG.info("Stopping table service...");
-      tableService.dispose();
-      tableService = null;
-    }
     if (terminalManager != null) {
       LOG.info("Stopping terminal manager...");
       terminalManager.dispose();
       terminalManager = null;
     }
-    if (optimizingService != null) {
-      LOG.info("Stopping optimizing service...");
-      optimizingService.dispose();
-      optimizingService = null;
-    }
-
     if (amsServiceMetrics != null) {
       amsServiceMetrics.unregister();
     }
@@ -237,6 +242,11 @@ public class AmoroServiceContainer {
     MetricManager.dispose();
   }
 
+  public void dispose() {
+    disposeOptimizingService();
+    disposeRestService();
+  }
+
   private void initConfig() throws Exception {
     LOG.info("initializing configurations...");
     new ConfigurationHelper().init();
diff --git 
a/amoro-ams/src/test/java/org/apache/amoro/server/AmsEnvironment.java 
b/amoro-ams/src/test/java/org/apache/amoro/server/AmsEnvironment.java
index 39384c68c..827135da4 100644
--- a/amoro-ams/src/test/java/org/apache/amoro/server/AmsEnvironment.java
+++ b/amoro-ams/src/test/java/org/apache/amoro/server/AmsEnvironment.java
@@ -315,7 +315,8 @@ public class AmsEnvironment {
             AmoroManagementConf.OPTIMIZING_SERVICE_THRIFT_BIND_PORT, 
optimizingServiceBindPort);
         serviceConfig.set(
             AmoroManagementConf.REFRESH_EXTERNAL_CATALOGS_INTERVAL, 
Duration.ofMillis(1000L));
-        serviceContainer.startService();
+        serviceContainer.startRestServices();
+        serviceContainer.startOptimizingService();
         LOG.info("Started test AMS.");
         break;
       } catch (TTransportException e) {

Reply via email to