This is an automated email from the ASF dual-hosted git repository.

zuston pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git


The following commit(s) were added to refs/heads/master by this push:
     new 9723e9df [#1132] improvement(spark): Unregister shuffle explicitly 
when Spark application is stopped. #1139
9723e9df is described below

commit 9723e9df548359e6d6900866130354a90650308b
Author: Fantasy-Jay <[email protected]>
AuthorDate: Tue Aug 15 16:01:16 2023 +0800

    [#1132] improvement(spark): Unregister shuffle explicitly when Spark 
application is stopped. #1139
    
    ### What changes were proposed in this pull request?
    
    Unregister shuffle explicitly when Spark application is stopped.
    
    ### Why are the changes needed?
    
    Fix: https://github.com/apache/incubator-uniffle/issues/1132
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    Add more test cases.
---
 .../org/apache/hadoop/mapred/SortWriteBufferManagerTest.java     | 3 +++
 .../org/apache/hadoop/mapreduce/task/reduce/FetcherTest.java     | 3 +++
 .../main/java/org/apache/spark/shuffle/RssShuffleManager.java    | 6 +++++-
 .../main/java/org/apache/spark/shuffle/RssShuffleManager.java    | 2 ++
 .../library/common/sort/buffer/WriteBufferManagerTest.java       | 3 +++
 .../java/org/apache/uniffle/client/api/ShuffleWriteClient.java   | 2 ++
 .../org/apache/uniffle/client/impl/ShuffleWriteClientImpl.java   | 9 +++++++++
 .../apache/uniffle/client/impl/ShuffleWriteClientImplTest.java   | 6 ++++++
 8 files changed, 33 insertions(+), 1 deletion(-)

diff --git 
a/client-mr/core/src/test/java/org/apache/hadoop/mapred/SortWriteBufferManagerTest.java
 
b/client-mr/core/src/test/java/org/apache/hadoop/mapred/SortWriteBufferManagerTest.java
index ae89231b..0cd9e1a0 100644
--- 
a/client-mr/core/src/test/java/org/apache/hadoop/mapred/SortWriteBufferManagerTest.java
+++ 
b/client-mr/core/src/test/java/org/apache/hadoop/mapred/SortWriteBufferManagerTest.java
@@ -496,5 +496,8 @@ public class SortWriteBufferManagerTest {
 
     @Override
     public void unregisterShuffle(String appId, int shuffleId) {}
+
+    @Override
+    public void unregisterShuffle(String appId) {}
   }
 }
diff --git 
a/client-mr/core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/FetcherTest.java
 
b/client-mr/core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/FetcherTest.java
index b171fa9e..f0297540 100644
--- 
a/client-mr/core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/FetcherTest.java
+++ 
b/client-mr/core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/FetcherTest.java
@@ -560,6 +560,9 @@ public class FetcherTest {
 
     @Override
     public void unregisterShuffle(String appId, int shuffleId) {}
+
+    @Override
+    public void unregisterShuffle(String appId) {}
   }
 
   static class MockedShuffleReadClient implements ShuffleReadClient {
diff --git 
a/client-spark/spark2/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java
 
b/client-spark/spark2/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java
index d20fca58..e38cc570 100644
--- 
a/client-spark/spark2/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java
+++ 
b/client-spark/spark2/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java
@@ -540,7 +540,11 @@ public class RssShuffleManager extends 
RssShuffleManagerBase {
         LOG.warn("Errors on closing data pusher", e);
       }
     }
-    shuffleWriteClient.close();
+    if (shuffleWriteClient != null) {
+      // Unregister shuffle before closing shuffle write client.
+      shuffleWriteClient.unregisterShuffle(appId);
+      shuffleWriteClient.close();
+    }
   }
 
   @Override
diff --git 
a/client-spark/spark3/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java
 
b/client-spark/spark3/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java
index d6919697..4605ab04 100644
--- 
a/client-spark/spark3/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java
+++ 
b/client-spark/spark3/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java
@@ -772,6 +772,8 @@ public class RssShuffleManager extends 
RssShuffleManagerBase {
       heartBeatScheduledExecutorService.shutdownNow();
     }
     if (shuffleWriteClient != null) {
+      // Unregister shuffle before closing shuffle write client.
+      shuffleWriteClient.unregisterShuffle(getAppId());
       shuffleWriteClient.close();
     }
     if (dataPusher != null) {
diff --git 
a/client-tez/src/test/java/org/apache/tez/runtime/library/common/sort/buffer/WriteBufferManagerTest.java
 
b/client-tez/src/test/java/org/apache/tez/runtime/library/common/sort/buffer/WriteBufferManagerTest.java
index d9036d68..1ebcc77a 100644
--- 
a/client-tez/src/test/java/org/apache/tez/runtime/library/common/sort/buffer/WriteBufferManagerTest.java
+++ 
b/client-tez/src/test/java/org/apache/tez/runtime/library/common/sort/buffer/WriteBufferManagerTest.java
@@ -655,5 +655,8 @@ public class WriteBufferManagerTest {
 
     @Override
     public void unregisterShuffle(String appId, int shuffleId) {}
+
+    @Override
+    public void unregisterShuffle(String appId) {}
   }
 }
diff --git 
a/client/src/main/java/org/apache/uniffle/client/api/ShuffleWriteClient.java 
b/client/src/main/java/org/apache/uniffle/client/api/ShuffleWriteClient.java
index 39931646..ce7c5378 100644
--- a/client/src/main/java/org/apache/uniffle/client/api/ShuffleWriteClient.java
+++ b/client/src/main/java/org/apache/uniffle/client/api/ShuffleWriteClient.java
@@ -95,4 +95,6 @@ public interface ShuffleWriteClient {
   void close();
 
   void unregisterShuffle(String appId, int shuffleId);
+
+  void unregisterShuffle(String appId);
 }
diff --git 
a/client/src/main/java/org/apache/uniffle/client/impl/ShuffleWriteClientImpl.java
 
b/client/src/main/java/org/apache/uniffle/client/impl/ShuffleWriteClientImpl.java
index 9c5cb535..d261b35b 100644
--- 
a/client/src/main/java/org/apache/uniffle/client/impl/ShuffleWriteClientImpl.java
+++ 
b/client/src/main/java/org/apache/uniffle/client/impl/ShuffleWriteClientImpl.java
@@ -995,6 +995,15 @@ public class ShuffleWriteClientImpl implements 
ShuffleWriteClient {
     }
   }
 
+  @Override
+  public void unregisterShuffle(String appId) {
+    Map<Integer, Set<ShuffleServerInfo>> appServerMap = 
shuffleServerInfoMap.get(appId);
+    if (appServerMap == null) {
+      return;
+    }
+    appServerMap.keySet().forEach(shuffleId -> unregisterShuffle(appId, 
shuffleId));
+  }
+
   private void throwExceptionIfNecessary(ClientResponse response, String 
errorMsg) {
     if (response != null && response.getStatusCode() != StatusCode.SUCCESS) {
       LOG.error(errorMsg);
diff --git 
a/client/src/test/java/org/apache/uniffle/client/impl/ShuffleWriteClientImplTest.java
 
b/client/src/test/java/org/apache/uniffle/client/impl/ShuffleWriteClientImplTest.java
index 4513f746..ac4ed009 100644
--- 
a/client/src/test/java/org/apache/uniffle/client/impl/ShuffleWriteClientImplTest.java
+++ 
b/client/src/test/java/org/apache/uniffle/client/impl/ShuffleWriteClientImplTest.java
@@ -116,6 +116,12 @@ public class ShuffleWriteClientImplTest {
     shuffleWriteClient.addShuffleServer(appId1, 1, server1);
     shuffleWriteClient.unregisterShuffle(appId1, 1);
     assertEquals(1, shuffleWriteClient.getAllShuffleServers(appId1).size());
+    shuffleWriteClient.unregisterShuffle(appId1);
+    assertEquals(0, shuffleWriteClient.getAllShuffleServers(appId1).size());
+    shuffleWriteClient.addShuffleServer(appId2, 2, server1);
+    assertEquals(2, shuffleWriteClient.getAllShuffleServers(appId2).size());
+    shuffleWriteClient.unregisterShuffle(appId2);
+    assertEquals(0, shuffleWriteClient.getAllShuffleServers(appId2).size());
   }
 
   @Test

Reply via email to