This is an automated email from the ASF dual-hosted git repository.
zuston pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git
The following commit(s) were added to refs/heads/master by this push:
new 110531df [#1115] improvement(tez&mr): Unregister shuffle explicitly
when application is stopped. (#1131)
110531df is described below
commit 110531df628b03569284fa8db7f73550e7d81e62
Author: zhengchenyu <[email protected]>
AuthorDate: Fri Aug 11 15:29:38 2023 +0800
[#1115] improvement(tez&mr): Unregister shuffle explicitly when application
is stopped. (#1131)
### What changes were proposed in this pull request?
For MR/Tez, as applications is stopped, we should unregister shuffle
explicitly. There are no need to wait timeout.
### Why are the changes needed?
Fix: #1115
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
test in yarn cluster
---
.../hadoop/mapreduce/v2/app/RssMRAppMaster.java | 19 +++++++++++++++++--
.../java/org/apache/tez/dag/app/RssDAGAppMaster.java | 18 +++++-------------
.../apache/tez/dag/app/TezRemoteShuffleManager.java | 12 ++++++++++++
3 files changed, 34 insertions(+), 15 deletions(-)
diff --git
a/client-mr/core/src/main/java/org/apache/hadoop/mapreduce/v2/app/RssMRAppMaster.java
b/client-mr/core/src/main/java/org/apache/hadoop/mapreduce/v2/app/RssMRAppMaster.java
index 626e88e8..d3fe6b09 100644
---
a/client-mr/core/src/main/java/org/apache/hadoop/mapreduce/v2/app/RssMRAppMaster.java
+++
b/client-mr/core/src/main/java/org/apache/hadoop/mapreduce/v2/app/RssMRAppMaster.java
@@ -90,6 +90,7 @@ public class RssMRAppMaster extends MRAppMaster {
private final int rssNmHttpPort;
private final ContainerId rssContainerID;
private RssContainerAllocatorRouter rssContainerAllocator;
+ private ShuffleWriteClient shuffleWriteClient;
public RssMRAppMaster(
ApplicationAttemptId applicationAttemptId,
@@ -97,7 +98,8 @@ public class RssMRAppMaster extends MRAppMaster {
String nmHost,
int nmPort,
int nmHttpPort,
- long appSubmitTime) {
+ long appSubmitTime,
+ ShuffleWriteClient client) {
super(
applicationAttemptId,
containerId,
@@ -111,20 +113,32 @@ public class RssMRAppMaster extends MRAppMaster {
rssNmHttpPort = nmHttpPort;
rssContainerID = containerId;
rssContainerAllocator = null;
+ shuffleWriteClient = client;
}
private static final Logger LOG =
LoggerFactory.getLogger(RssMRAppMaster.class);
+ @Override
+ protected void serviceStop() throws Exception {
+ LOG.info("Unregister shuffle for app {}", this.getAttemptID());
+ if (shuffleWriteClient != null) {
+ shuffleWriteClient.unregisterShuffle(getAttemptID().toString(), 0);
+ }
+ super.serviceStop();
+ }
+
public static void main(String[] args) {
JobConf conf = new JobConf(new YarnConfiguration());
conf.addResource(new Path(MRJobConfig.JOB_CONF_FILE));
+ ShuffleWriteClient shuffleWriteClient = null;
int numReduceTasks = conf.getInt(MRJobConfig.NUM_REDUCES, 0);
if (numReduceTasks > 0) {
String coordinators = conf.get(RssMRConfig.RSS_COORDINATOR_QUORUM);
ShuffleWriteClient client = RssMRUtils.createShuffleClient(conf);
+ shuffleWriteClient = client;
LOG.info("Registering coordinators {}", coordinators);
client.registerCoordinators(coordinators);
@@ -369,7 +383,8 @@ public class RssMRAppMaster extends MRAppMaster {
nodeHostString,
Integer.parseInt(nodePortString),
Integer.parseInt(nodeHttpPortString),
- appSubmitTime);
+ appSubmitTime,
+ shuffleWriteClient);
ShutdownHookManager.get().addShutdownHook(new
RssMRAppMasterShutdownHook(appMaster), 30);
MRWebAppUtil.initialize(conf);
String systemPropsToLog = MRApps.getSystemPropertiesToLog(conf);
diff --git
a/client-tez/src/main/java/org/apache/tez/dag/app/RssDAGAppMaster.java
b/client-tez/src/main/java/org/apache/tez/dag/app/RssDAGAppMaster.java
index 9c7e6652..281c53aa 100644
--- a/client-tez/src/main/java/org/apache/tez/dag/app/RssDAGAppMaster.java
+++ b/client-tez/src/main/java/org/apache/tez/dag/app/RssDAGAppMaster.java
@@ -289,8 +289,6 @@ public class RssDAGAppMaster extends DAGAppMaster {
@Override
public void run() {
- releaseRssResources(appMaster);
-
LOG.info(
"RssDAGAppMaster received a signal. Signaling RMCommunicator and
JobHistoryEventHandler.");
this.appMaster.stop();
@@ -301,20 +299,14 @@ public class RssDAGAppMaster extends DAGAppMaster {
try {
LOG.info("RssDAGAppMaster releaseRssResources invoked");
appMaster.heartBeatExecutorService.shutdownNow();
-
+ if (appMaster.tezRemoteShuffleManager != null) {
+ appMaster.tezRemoteShuffleManager.shutdown();
+ appMaster.tezRemoteShuffleManager = null;
+ }
if (appMaster.shuffleWriteClient != null) {
appMaster.shuffleWriteClient.close();
+ appMaster.shuffleWriteClient = null;
}
- appMaster.shuffleWriteClient = null;
-
- if (appMaster.tezRemoteShuffleManager != null) {
- try {
- appMaster.tezRemoteShuffleManager.shutdown();
- } catch (Exception e) {
- LOG.info("Failed to shutdown TezRemoteShuffleManager.", e);
- }
- }
- appMaster.tezRemoteShuffleManager = null;
} catch (Throwable t) {
LOG.error("Failed to release Rss resources.", t);
}
diff --git
a/client-tez/src/main/java/org/apache/tez/dag/app/TezRemoteShuffleManager.java
b/client-tez/src/main/java/org/apache/tez/dag/app/TezRemoteShuffleManager.java
index d1998eeb..808810e1 100644
---
a/client-tez/src/main/java/org/apache/tez/dag/app/TezRemoteShuffleManager.java
+++
b/client-tez/src/main/java/org/apache/tez/dag/app/TezRemoteShuffleManager.java
@@ -109,6 +109,14 @@ public class TezRemoteShuffleManager implements
ServicePluginLifecycle {
@Override
public void shutdown() throws Exception {
+ if (rssClient != null) {
+ LOG.info("unregister all shuffle for appid {}", appId);
+ Map<Integer, ShuffleAssignmentsInfo> infos =
+ tezRemoteShuffleUmbilical.getShuffleIdToShuffleAssignsInfo();
+ for (Map.Entry<Integer, ShuffleAssignmentsInfo> entry :
infos.entrySet()) {
+ rssClient.unregisterShuffle(appId, entry.getKey());
+ }
+ }
server.stop();
}
@@ -173,6 +181,10 @@ public class TezRemoteShuffleManager implements
ServicePluginLifecycle {
return response;
}
+
+ Map<Integer, ShuffleAssignmentsInfo> getShuffleIdToShuffleAssignsInfo() {
+ return shuffleIdToShuffleAssignsInfo;
+ }
}
private ShuffleAssignmentsInfo getShuffleWorks(int partitionNum, int
shuffleId) {