This is an automated email from the ASF dual-hosted git repository.
zuston pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git
The following commit(s) were added to refs/heads/master by this push:
new 9723e9df [#1132] improvement(spark): Unregister shuffle explicitly
when Spark application is stopped. #1139
9723e9df is described below
commit 9723e9df548359e6d6900866130354a90650308b
Author: Fantasy-Jay <[email protected]>
AuthorDate: Tue Aug 15 16:01:16 2023 +0800
[#1132] improvement(spark): Unregister shuffle explicitly when Spark
application is stopped. #1139
### What changes were proposed in this pull request?
Unregister shuffle explicitly when Spark application is stopped.
### Why are the changes needed?
Fix: https://github.com/apache/incubator-uniffle/issues/1132
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Add more test cases.
---
.../org/apache/hadoop/mapred/SortWriteBufferManagerTest.java | 3 +++
.../org/apache/hadoop/mapreduce/task/reduce/FetcherTest.java | 3 +++
.../main/java/org/apache/spark/shuffle/RssShuffleManager.java | 6 +++++-
.../main/java/org/apache/spark/shuffle/RssShuffleManager.java | 2 ++
.../library/common/sort/buffer/WriteBufferManagerTest.java | 3 +++
.../java/org/apache/uniffle/client/api/ShuffleWriteClient.java | 2 ++
.../org/apache/uniffle/client/impl/ShuffleWriteClientImpl.java | 9 +++++++++
.../apache/uniffle/client/impl/ShuffleWriteClientImplTest.java | 6 ++++++
8 files changed, 33 insertions(+), 1 deletion(-)
diff --git
a/client-mr/core/src/test/java/org/apache/hadoop/mapred/SortWriteBufferManagerTest.java
b/client-mr/core/src/test/java/org/apache/hadoop/mapred/SortWriteBufferManagerTest.java
index ae89231b..0cd9e1a0 100644
---
a/client-mr/core/src/test/java/org/apache/hadoop/mapred/SortWriteBufferManagerTest.java
+++
b/client-mr/core/src/test/java/org/apache/hadoop/mapred/SortWriteBufferManagerTest.java
@@ -496,5 +496,8 @@ public class SortWriteBufferManagerTest {
@Override
public void unregisterShuffle(String appId, int shuffleId) {}
+
+ @Override
+ public void unregisterShuffle(String appId) {}
}
}
diff --git
a/client-mr/core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/FetcherTest.java
b/client-mr/core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/FetcherTest.java
index b171fa9e..f0297540 100644
---
a/client-mr/core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/FetcherTest.java
+++
b/client-mr/core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/FetcherTest.java
@@ -560,6 +560,9 @@ public class FetcherTest {
@Override
public void unregisterShuffle(String appId, int shuffleId) {}
+
+ @Override
+ public void unregisterShuffle(String appId) {}
}
static class MockedShuffleReadClient implements ShuffleReadClient {
diff --git
a/client-spark/spark2/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java
b/client-spark/spark2/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java
index d20fca58..e38cc570 100644
---
a/client-spark/spark2/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java
+++
b/client-spark/spark2/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java
@@ -540,7 +540,11 @@ public class RssShuffleManager extends
RssShuffleManagerBase {
LOG.warn("Errors on closing data pusher", e);
}
}
- shuffleWriteClient.close();
+ if (shuffleWriteClient != null) {
+ // Unregister shuffle before closing shuffle write client.
+ shuffleWriteClient.unregisterShuffle(appId);
+ shuffleWriteClient.close();
+ }
}
@Override
diff --git
a/client-spark/spark3/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java
b/client-spark/spark3/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java
index d6919697..4605ab04 100644
---
a/client-spark/spark3/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java
+++
b/client-spark/spark3/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java
@@ -772,6 +772,8 @@ public class RssShuffleManager extends
RssShuffleManagerBase {
heartBeatScheduledExecutorService.shutdownNow();
}
if (shuffleWriteClient != null) {
+ // Unregister shuffle before closing shuffle write client.
+ shuffleWriteClient.unregisterShuffle(getAppId());
shuffleWriteClient.close();
}
if (dataPusher != null) {
diff --git
a/client-tez/src/test/java/org/apache/tez/runtime/library/common/sort/buffer/WriteBufferManagerTest.java
b/client-tez/src/test/java/org/apache/tez/runtime/library/common/sort/buffer/WriteBufferManagerTest.java
index d9036d68..1ebcc77a 100644
---
a/client-tez/src/test/java/org/apache/tez/runtime/library/common/sort/buffer/WriteBufferManagerTest.java
+++
b/client-tez/src/test/java/org/apache/tez/runtime/library/common/sort/buffer/WriteBufferManagerTest.java
@@ -655,5 +655,8 @@ public class WriteBufferManagerTest {
@Override
public void unregisterShuffle(String appId, int shuffleId) {}
+
+ @Override
+ public void unregisterShuffle(String appId) {}
}
}
diff --git
a/client/src/main/java/org/apache/uniffle/client/api/ShuffleWriteClient.java
b/client/src/main/java/org/apache/uniffle/client/api/ShuffleWriteClient.java
index 39931646..ce7c5378 100644
--- a/client/src/main/java/org/apache/uniffle/client/api/ShuffleWriteClient.java
+++ b/client/src/main/java/org/apache/uniffle/client/api/ShuffleWriteClient.java
@@ -95,4 +95,6 @@ public interface ShuffleWriteClient {
void close();
void unregisterShuffle(String appId, int shuffleId);
+
+ void unregisterShuffle(String appId);
}
diff --git
a/client/src/main/java/org/apache/uniffle/client/impl/ShuffleWriteClientImpl.java
b/client/src/main/java/org/apache/uniffle/client/impl/ShuffleWriteClientImpl.java
index 9c5cb535..d261b35b 100644
---
a/client/src/main/java/org/apache/uniffle/client/impl/ShuffleWriteClientImpl.java
+++
b/client/src/main/java/org/apache/uniffle/client/impl/ShuffleWriteClientImpl.java
@@ -995,6 +995,15 @@ public class ShuffleWriteClientImpl implements
ShuffleWriteClient {
}
}
+ @Override
+ public void unregisterShuffle(String appId) {
+ Map<Integer, Set<ShuffleServerInfo>> appServerMap =
shuffleServerInfoMap.get(appId);
+ if (appServerMap == null) {
+ return;
+ }
+ appServerMap.keySet().forEach(shuffleId -> unregisterShuffle(appId,
shuffleId));
+ }
+
private void throwExceptionIfNecessary(ClientResponse response, String
errorMsg) {
if (response != null && response.getStatusCode() != StatusCode.SUCCESS) {
LOG.error(errorMsg);
diff --git
a/client/src/test/java/org/apache/uniffle/client/impl/ShuffleWriteClientImplTest.java
b/client/src/test/java/org/apache/uniffle/client/impl/ShuffleWriteClientImplTest.java
index 4513f746..ac4ed009 100644
---
a/client/src/test/java/org/apache/uniffle/client/impl/ShuffleWriteClientImplTest.java
+++
b/client/src/test/java/org/apache/uniffle/client/impl/ShuffleWriteClientImplTest.java
@@ -116,6 +116,12 @@ public class ShuffleWriteClientImplTest {
shuffleWriteClient.addShuffleServer(appId1, 1, server1);
shuffleWriteClient.unregisterShuffle(appId1, 1);
assertEquals(1, shuffleWriteClient.getAllShuffleServers(appId1).size());
+ shuffleWriteClient.unregisterShuffle(appId1);
+ assertEquals(0, shuffleWriteClient.getAllShuffleServers(appId1).size());
+ shuffleWriteClient.addShuffleServer(appId2, 2, server1);
+ assertEquals(2, shuffleWriteClient.getAllShuffleServers(appId2).size());
+ shuffleWriteClient.unregisterShuffle(appId2);
+ assertEquals(0, shuffleWriteClient.getAllShuffleServers(appId2).size());
}
@Test