This is an automated email from the ASF dual-hosted git repository.

zuston pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git


The following commit(s) were added to refs/heads/master by this push:
     new 110531df [#1115] improvement(tez&mr): Unregister shuffle explicitly 
when application is stopped. (#1131)
110531df is described below

commit 110531df628b03569284fa8db7f73550e7d81e62
Author: zhengchenyu <[email protected]>
AuthorDate: Fri Aug 11 15:29:38 2023 +0800

    [#1115] improvement(tez&mr): Unregister shuffle explicitly when application 
is stopped. (#1131)
    
    ### What changes were proposed in this pull request?
    
    For MR/Tez, as applications is stopped, we should unregister shuffle 
explicitly. There are no need to wait timeout.
    
    ### Why are the changes needed?
    
    Fix: #1115
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    test in yarn cluster
---
 .../hadoop/mapreduce/v2/app/RssMRAppMaster.java       | 19 +++++++++++++++++--
 .../java/org/apache/tez/dag/app/RssDAGAppMaster.java  | 18 +++++-------------
 .../apache/tez/dag/app/TezRemoteShuffleManager.java   | 12 ++++++++++++
 3 files changed, 34 insertions(+), 15 deletions(-)

diff --git 
a/client-mr/core/src/main/java/org/apache/hadoop/mapreduce/v2/app/RssMRAppMaster.java
 
b/client-mr/core/src/main/java/org/apache/hadoop/mapreduce/v2/app/RssMRAppMaster.java
index 626e88e8..d3fe6b09 100644
--- 
a/client-mr/core/src/main/java/org/apache/hadoop/mapreduce/v2/app/RssMRAppMaster.java
+++ 
b/client-mr/core/src/main/java/org/apache/hadoop/mapreduce/v2/app/RssMRAppMaster.java
@@ -90,6 +90,7 @@ public class RssMRAppMaster extends MRAppMaster {
   private final int rssNmHttpPort;
   private final ContainerId rssContainerID;
   private RssContainerAllocatorRouter rssContainerAllocator;
+  private ShuffleWriteClient shuffleWriteClient;
 
   public RssMRAppMaster(
       ApplicationAttemptId applicationAttemptId,
@@ -97,7 +98,8 @@ public class RssMRAppMaster extends MRAppMaster {
       String nmHost,
       int nmPort,
       int nmHttpPort,
-      long appSubmitTime) {
+      long appSubmitTime,
+      ShuffleWriteClient client) {
     super(
         applicationAttemptId,
         containerId,
@@ -111,20 +113,32 @@ public class RssMRAppMaster extends MRAppMaster {
     rssNmHttpPort = nmHttpPort;
     rssContainerID = containerId;
     rssContainerAllocator = null;
+    shuffleWriteClient = client;
   }
 
   private static final Logger LOG = 
LoggerFactory.getLogger(RssMRAppMaster.class);
 
+  @Override
+  protected void serviceStop() throws Exception {
+    LOG.info("Unregister shuffle for app {}", this.getAttemptID());
+    if (shuffleWriteClient != null) {
+      shuffleWriteClient.unregisterShuffle(getAttemptID().toString(), 0);
+    }
+    super.serviceStop();
+  }
+
   public static void main(String[] args) {
 
     JobConf conf = new JobConf(new YarnConfiguration());
     conf.addResource(new Path(MRJobConfig.JOB_CONF_FILE));
 
+    ShuffleWriteClient shuffleWriteClient = null;
     int numReduceTasks = conf.getInt(MRJobConfig.NUM_REDUCES, 0);
     if (numReduceTasks > 0) {
       String coordinators = conf.get(RssMRConfig.RSS_COORDINATOR_QUORUM);
 
       ShuffleWriteClient client = RssMRUtils.createShuffleClient(conf);
+      shuffleWriteClient = client;
 
       LOG.info("Registering coordinators {}", coordinators);
       client.registerCoordinators(coordinators);
@@ -369,7 +383,8 @@ public class RssMRAppMaster extends MRAppMaster {
               nodeHostString,
               Integer.parseInt(nodePortString),
               Integer.parseInt(nodeHttpPortString),
-              appSubmitTime);
+              appSubmitTime,
+              shuffleWriteClient);
       ShutdownHookManager.get().addShutdownHook(new 
RssMRAppMasterShutdownHook(appMaster), 30);
       MRWebAppUtil.initialize(conf);
       String systemPropsToLog = MRApps.getSystemPropertiesToLog(conf);
diff --git 
a/client-tez/src/main/java/org/apache/tez/dag/app/RssDAGAppMaster.java 
b/client-tez/src/main/java/org/apache/tez/dag/app/RssDAGAppMaster.java
index 9c7e6652..281c53aa 100644
--- a/client-tez/src/main/java/org/apache/tez/dag/app/RssDAGAppMaster.java
+++ b/client-tez/src/main/java/org/apache/tez/dag/app/RssDAGAppMaster.java
@@ -289,8 +289,6 @@ public class RssDAGAppMaster extends DAGAppMaster {
 
     @Override
     public void run() {
-      releaseRssResources(appMaster);
-
       LOG.info(
           "RssDAGAppMaster received a signal. Signaling RMCommunicator and 
JobHistoryEventHandler.");
       this.appMaster.stop();
@@ -301,20 +299,14 @@ public class RssDAGAppMaster extends DAGAppMaster {
     try {
       LOG.info("RssDAGAppMaster releaseRssResources invoked");
       appMaster.heartBeatExecutorService.shutdownNow();
-
+      if (appMaster.tezRemoteShuffleManager != null) {
+        appMaster.tezRemoteShuffleManager.shutdown();
+        appMaster.tezRemoteShuffleManager = null;
+      }
       if (appMaster.shuffleWriteClient != null) {
         appMaster.shuffleWriteClient.close();
+        appMaster.shuffleWriteClient = null;
       }
-      appMaster.shuffleWriteClient = null;
-
-      if (appMaster.tezRemoteShuffleManager != null) {
-        try {
-          appMaster.tezRemoteShuffleManager.shutdown();
-        } catch (Exception e) {
-          LOG.info("Failed to shutdown TezRemoteShuffleManager.", e);
-        }
-      }
-      appMaster.tezRemoteShuffleManager = null;
     } catch (Throwable t) {
       LOG.error("Failed to release Rss resources.", t);
     }
diff --git 
a/client-tez/src/main/java/org/apache/tez/dag/app/TezRemoteShuffleManager.java 
b/client-tez/src/main/java/org/apache/tez/dag/app/TezRemoteShuffleManager.java
index d1998eeb..808810e1 100644
--- 
a/client-tez/src/main/java/org/apache/tez/dag/app/TezRemoteShuffleManager.java
+++ 
b/client-tez/src/main/java/org/apache/tez/dag/app/TezRemoteShuffleManager.java
@@ -109,6 +109,14 @@ public class TezRemoteShuffleManager implements 
ServicePluginLifecycle {
 
   @Override
   public void shutdown() throws Exception {
+    if (rssClient != null) {
+      LOG.info("unregister all shuffle for appid {}", appId);
+      Map<Integer, ShuffleAssignmentsInfo> infos =
+          tezRemoteShuffleUmbilical.getShuffleIdToShuffleAssignsInfo();
+      for (Map.Entry<Integer, ShuffleAssignmentsInfo> entry : 
infos.entrySet()) {
+        rssClient.unregisterShuffle(appId, entry.getKey());
+      }
+    }
     server.stop();
   }
 
@@ -173,6 +181,10 @@ public class TezRemoteShuffleManager implements 
ServicePluginLifecycle {
 
       return response;
     }
+
+    Map<Integer, ShuffleAssignmentsInfo> getShuffleIdToShuffleAssignsInfo() {
+      return shuffleIdToShuffleAssignsInfo;
+    }
   }
 
   private ShuffleAssignmentsInfo getShuffleWorks(int partitionNum, int 
shuffleId) {

Reply via email to