This is an automated email from the ASF dual-hosted git repository.

jinsongzhou pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/amoro.git


The following commit(s) were added to refs/heads/master by this push:
     new f5d2e85d7 [AMORO-3056] Support graceful shutdown for AMS (#3057)
f5d2e85d7 is described below

commit f5d2e85d7bf18b5d77747683d5d24018cf2cc84c
Author: Paul Lin <[email protected]>
AuthorDate: Wed Jul 24 14:48:13 2024 +0800

    [AMORO-3056] Support graceful shutdown for AMS (#3057)
---
 .../org/apache/amoro/server/AmoroServiceContainer.java  | 17 +++++++++++++++--
 .../apache/amoro/server/terminal/TerminalManager.java   |  9 ++++++---
 2 files changed, 21 insertions(+), 5 deletions(-)

diff --git 
a/amoro-ams/amoro-ams-server/src/main/java/org/apache/amoro/server/AmoroServiceContainer.java
 
b/amoro-ams/amoro-ams-server/src/main/java/org/apache/amoro/server/AmoroServiceContainer.java
index 52fbe2355..d9527fbae 100644
--- 
a/amoro-ams/amoro-ams-server/src/main/java/org/apache/amoro/server/AmoroServiceContainer.java
+++ 
b/amoro-ams/amoro-ams-server/src/main/java/org/apache/amoro/server/AmoroServiceContainer.java
@@ -102,6 +102,14 @@ public class AmoroServiceContainer {
   public static void main(String[] args) {
     try {
       AmoroServiceContainer service = new AmoroServiceContainer();
+      Runtime.getRuntime()
+          .addShutdownHook(
+              new Thread(
+                  () -> {
+                    LOG.info("AMS service is shutting down...");
+                    service.dispose();
+                    LOG.info("AMS service has been shut down");
+                  }));
       while (true) {
         try {
           service.waitLeaderShip();
@@ -165,13 +173,16 @@ public class AmoroServiceContainer {
   }
 
   public void dispose() {
-    if (tableManagementServer != null) {
+    if (tableManagementServer != null && tableManagementServer.isServing()) {
+      LOG.info("Stopping table management server...");
       tableManagementServer.stop();
     }
-    if (optimizingServiceServer != null) {
+    if (optimizingServiceServer != null && 
optimizingServiceServer.isServing()) {
+      LOG.info("Stopping optimizing server...");
       optimizingServiceServer.stop();
     }
     if (httpServer != null) {
+      LOG.info("Stopping http server...");
       try {
         httpServer.close();
       } catch (Exception e) {
@@ -179,10 +190,12 @@ public class AmoroServiceContainer {
       }
     }
     if (tableService != null) {
+      LOG.info("Stopping table service...");
       tableService.dispose();
       tableService = null;
     }
     if (terminalManager != null) {
+      LOG.info("Stopping terminal manager...");
       terminalManager.dispose();
       terminalManager = null;
     }
diff --git 
a/amoro-ams/amoro-ams-server/src/main/java/org/apache/amoro/server/terminal/TerminalManager.java
 
b/amoro-ams/amoro-ams-server/src/main/java/org/apache/amoro/server/terminal/TerminalManager.java
index 8e244b832..2e3000448 100644
--- 
a/amoro-ams/amoro-ams-server/src/main/java/org/apache/amoro/server/terminal/TerminalManager.java
+++ 
b/amoro-ams/amoro-ams-server/src/main/java/org/apache/amoro/server/terminal/TerminalManager.java
@@ -69,7 +69,7 @@ public class TerminalManager {
   private final Object sessionMapLock = new Object();
   private final Map<String, TerminalSessionContext> sessionMap = 
Maps.newHashMap();
   private final Thread gcThread;
-  private boolean stop = false;
+  private volatile boolean running = true;
 
   private final ThreadPoolExecutor executionPool =
       new ThreadPoolExecutor(
@@ -237,7 +237,10 @@ public class TerminalManager {
   }
 
   public void dispose() {
-    stop = true;
+    if (!running) {
+      return;
+    }
+    running = false;
     if (gcThread != null) {
       gcThread.interrupt();
     }
@@ -399,7 +402,7 @@ public class TerminalManager {
       LOG.info(
           "Terminal Session Clean Task, check interval: " + 
SESSION_TIMEOUT_CHECK_INTERVAL + " ms");
       LOG.info("Terminal Session Timeout: {} minutes", sessionTimeout);
-      while (!stop) {
+      while (running) {
         try {
           List<TerminalSessionContext> sessionToRelease = checkIdleSession();
           sessionToRelease.forEach(this::releaseSession);

Reply via email to