This is an automated email from the ASF dual-hosted git repository.

roryqi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git


The following commit(s) were added to refs/heads/master by this push:
     new e488a54c [#1101] improvement(tez): Release server resources as soon as 
possible in RssDAGAppMaster. (#1102)
e488a54c is described below

commit e488a54c7d2f1531f8620cdb28b6d29c78e93567
Author: Fantasy-Jay <[email protected]>
AuthorDate: Mon Aug 7 18:41:14 2023 +0800

    [#1101] improvement(tez): Release server resources as soon as possible in 
RssDAGAppMaster. (#1102)
    
    ### What changes were proposed in this pull request?
    
    Only when the JVM is about to exit, the server resources will be released. 
In some scenarios, it will occupy server resources for a long time. For 
example, when enable history logging service and timeline service.
    
    ### Why are the changes needed?
    
    Fix: https://github.com/apache/incubator-uniffle/issues/1101
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    Existing tests can cover it.
---
 .../org/apache/tez/dag/app/RssDAGAppMaster.java    | 84 ++++++++++++++--------
 .../RssDAGAppMasterForWordCountWithFailures.java   |  3 +-
 2 files changed, 56 insertions(+), 31 deletions(-)

diff --git 
a/client-tez/src/main/java/org/apache/tez/dag/app/RssDAGAppMaster.java 
b/client-tez/src/main/java/org/apache/tez/dag/app/RssDAGAppMaster.java
index dc498b5f..8249cde9 100644
--- a/client-tez/src/main/java/org/apache/tez/dag/app/RssDAGAppMaster.java
+++ b/client-tez/src/main/java/org/apache/tez/dag/app/RssDAGAppMaster.java
@@ -92,11 +92,16 @@ import static 
org.apache.tez.dag.api.TezConfiguration.TEZ_AM_NODE_UNHEALTHY_RESC
 
 public class RssDAGAppMaster extends DAGAppMaster {
   private static final Logger LOG = 
LoggerFactory.getLogger(RssDAGAppMaster.class);
+
+  // RSS_SHUTDOWN_HOOK_PRIORITY is higher than SHUTDOWN_HOOK_PRIORITY(30) and 
will execute rss
+  // shutdown hook first.
+  public static final int RSS_SHUTDOWN_HOOK_PRIORITY = 50;
+
   private ShuffleWriteClient shuffleWriteClient;
   private TezRemoteShuffleManager tezRemoteShuffleManager;
   private Map<String, String> clusterClientConf;
 
-  final ScheduledExecutorService scheduledExecutorService =
+  final ScheduledExecutorService heartBeatExecutorService =
       
Executors.newSingleThreadScheduledExecutor(ThreadUtils.getThreadFactory("AppHeartbeat"));
 
   public RssDAGAppMaster(
@@ -187,7 +192,7 @@ public class RssDAGAppMaster extends DAGAppMaster {
     long heartbeatTimeout = conf.getLong(RssTezConfig.RSS_HEARTBEAT_TIMEOUT, 
heartbeatInterval / 2);
     client.registerApplicationInfo(strAppAttemptId, heartbeatTimeout, "user");
 
-    appMaster.scheduledExecutorService.scheduleAtFixedRate(
+    appMaster.heartBeatExecutorService.scheduleAtFixedRate(
         () -> {
           try {
             client.sendAppHeartbeat(strAppAttemptId, heartbeatTimeout);
@@ -238,6 +243,52 @@ public class RssDAGAppMaster extends DAGAppMaster {
     return dag;
   }
 
+  @Override
+  public void serviceStop() throws Exception {
+    releaseRssResources(this);
+    super.serviceStop();
+  }
+
+  static class RssDAGAppMasterShutdownHook implements Runnable {
+    RssDAGAppMaster appMaster;
+
+    RssDAGAppMasterShutdownHook(RssDAGAppMaster appMaster) {
+      this.appMaster = appMaster;
+    }
+
+    @Override
+    public void run() {
+      releaseRssResources(appMaster);
+
+      LOG.info(
+          "RssDAGAppMaster received a signal. Signaling RMCommunicator and 
JobHistoryEventHandler.");
+      this.appMaster.stop();
+    }
+  }
+
+  static void releaseRssResources(RssDAGAppMaster appMaster) {
+    try {
+      LOG.info("RssDAGAppMaster releaseRssResources invoked");
+      appMaster.heartBeatExecutorService.shutdownNow();
+
+      if (appMaster.shuffleWriteClient != null) {
+        appMaster.shuffleWriteClient.close();
+      }
+      appMaster.shuffleWriteClient = null;
+
+      if (appMaster.tezRemoteShuffleManager != null) {
+        try {
+          appMaster.tezRemoteShuffleManager.shutdown();
+        } catch (Exception e) {
+          LOG.info("Failed to shutdown TezRemoteShuffleManager.", e);
+        }
+      }
+      appMaster.tezRemoteShuffleManager = null;
+    } catch (Throwable t) {
+      LOG.error("Failed to release Rss resources.", t);
+    }
+  }
+
   /**
    * main method
    *
@@ -348,7 +399,7 @@ public class RssDAGAppMaster extends DAGAppMaster {
       ShutdownHookManager.get()
           .addShutdownHook(new DAGAppMasterShutdownHook(appMaster), 
SHUTDOWN_HOOK_PRIORITY);
       ShutdownHookManager.get()
-          .addShutdownHook(new RssDAGAppMasterShutdownHook(appMaster), 
SHUTDOWN_HOOK_PRIORITY);
+          .addShutdownHook(new RssDAGAppMasterShutdownHook(appMaster), 
RSS_SHUTDOWN_HOOK_PRIORITY);
 
       // log the system properties
       if (LOG.isInfoEnabled()) {
@@ -383,33 +434,6 @@ public class RssDAGAppMaster extends DAGAppMaster {
     }
   }
 
-  static class RssDAGAppMasterShutdownHook implements Runnable {
-    RssDAGAppMaster appMaster;
-
-    RssDAGAppMasterShutdownHook(RssDAGAppMaster appMaster) {
-      this.appMaster = appMaster;
-    }
-
-    @Override
-    public void run() {
-      if (appMaster.shuffleWriteClient != null) {
-        appMaster.shuffleWriteClient.close();
-      }
-
-      if (appMaster.tezRemoteShuffleManager != null) {
-        try {
-          appMaster.tezRemoteShuffleManager.shutdown();
-        } catch (Exception e) {
-          RssDAGAppMaster.LOG.info("TezRemoteShuffleManager shutdown error: " 
+ e.getMessage());
-        }
-      }
-
-      RssDAGAppMaster.LOG.info(
-          "RssDAGAppMaster received a signal. Signaling RMCommunicator and 
JobHistoryEventHandler.");
-      this.appMaster.stop();
-    }
-  }
-
   @VisibleForTesting
   public static void registerStateEnteredCallback(DAGImpl dag, RssDAGAppMaster 
appMaster) {
     StateMachineTez stateMachine = (StateMachineTez) getPrivateField(dag, 
"stateMachine");
diff --git 
a/integration-test/tez/src/test/java/org/apache/tez/dag/app/RssDAGAppMasterForWordCountWithFailures.java
 
b/integration-test/tez/src/test/java/org/apache/tez/dag/app/RssDAGAppMasterForWordCountWithFailures.java
index 9637a2f8..457fbbd6 100644
--- 
a/integration-test/tez/src/test/java/org/apache/tez/dag/app/RssDAGAppMasterForWordCountWithFailures.java
+++ 
b/integration-test/tez/src/test/java/org/apache/tez/dag/app/RssDAGAppMasterForWordCountWithFailures.java
@@ -241,7 +241,8 @@ public class RssDAGAppMasterForWordCountWithFailures 
extends RssDAGAppMaster {
               testMode);
       ShutdownHookManager.get()
           .addShutdownHook(
-              new RssDAGAppMaster.RssDAGAppMasterShutdownHook(appMaster), 
SHUTDOWN_HOOK_PRIORITY);
+              new RssDAGAppMaster.RssDAGAppMasterShutdownHook(appMaster),
+              RSS_SHUTDOWN_HOOK_PRIORITY);
 
       // log the system properties
       if (LOG.isInfoEnabled()) {

Reply via email to