This is an automated email from the ASF dual-hosted git repository.
roryqi pushed a commit to branch branch-0.7
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git
The following commit(s) were added to refs/heads/branch-0.7 by this push:
new 19fe80c1 [#715] fix(mr): The container does not exit because
shuffleclient is not closed (#882)
19fe80c1 is described below
commit 19fe80c121a9ff2965c38cbfaa0e86347c0fb1e6
Author: zhaobing <[email protected]>
AuthorDate: Fri May 19 14:30:29 2023 +0800
[#715] fix(mr): The container does not exit because shuffleclient is not
closed (#882)
### What changes were proposed in this pull request?
The container does not exit because shuffleclient is not closed
### Why are the changes needed?
For #715
1.The process does not exit after the maptask or reducetask execution is
complete. The reason is that ShuffleWriteClient has a thread pool that does not
close when the task completes. So turning off ShuffleWriteClient can solve this
problem.
2.How do I recreate this scene?
Initialize a small cluster and submit an mr Task whose requested resources
exceed the total resources in the cluster.
We can see that all tasks have completed execution without quitting until
the timeout time exceeds 60 seconds(mapreduce.task.exit.timeout). The appmaster
requests the nodemanager to kill the corresponding container.
The nodemanager logs are as follows
`2023-03-12 13:56:45,901 INFO [AsyncDispatcher event handler]
org.apache.hadoop.mapreduce.v2.app.job.impl.TaskAttemptImpl: Diagnostics report
from attempt_1676901654399_1653119_m_000070_0: [2023-03-12
13:56:44.909]Container killed by the ApplicationMaster.
[2023-03-12 13:56:44.921]Sent signal OUTPUT_THREAD_DUMP (SIGQUIT) to pid
45556 as user tc_infra for container
container_e304_1676901654399_1653119_01_000072, result=success
[2023-03-12 13:56:44.985]Container killed on request. Exit code is 143
[2023-03-12 13:56:45.403]Container exited with a non-zero exit code 143.
`
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
existing UTs.
Co-authored-by: zhaobing <[email protected]>
---
.../main/java/org/apache/hadoop/mapred/RssMapOutputCollector.java | 6 +++++-
1 file changed, 5 insertions(+), 1 deletion(-)
diff --git
a/client-mr/src/main/java/org/apache/hadoop/mapred/RssMapOutputCollector.java
b/client-mr/src/main/java/org/apache/hadoop/mapred/RssMapOutputCollector.java
index c9cb553f..7d7b1806 100644
---
a/client-mr/src/main/java/org/apache/hadoop/mapred/RssMapOutputCollector.java
+++
b/client-mr/src/main/java/org/apache/hadoop/mapred/RssMapOutputCollector.java
@@ -36,6 +36,7 @@ import
org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.apache.uniffle.client.api.ShuffleWriteClient;
import org.apache.uniffle.common.ShuffleServerInfo;
import org.apache.uniffle.common.exception.RssException;
import org.apache.uniffle.common.util.ByteUnit;
@@ -52,6 +53,7 @@ public class RssMapOutputCollector<K extends Object, V
extends Object>
private Set<Long> failedBlockIds = Sets.newConcurrentHashSet();
private int partitions;
private SortWriteBufferManager bufferManager;
+ private ShuffleWriteClient shuffleClient;
@Override
public void init(Context context) throws IOException, ClassNotFoundException
{
@@ -107,6 +109,7 @@ public class RssMapOutputCollector<K extends Object, V
extends Object>
RssMRConfig.RSS_CLIENT_DEFAULT_SEND_THREAD_NUM);
long maxBufferSize = RssMRUtils.getLong(rssJobConf, mrJobConf,
RssMRConfig.RSS_WRITER_BUFFER_SIZE,
RssMRConfig.RSS_WRITER_BUFFER_SIZE_DEFAULT_VALUE);
+ shuffleClient = RssMRUtils.createShuffleClient(mrJobConf);
bufferManager = new SortWriteBufferManager(
(long)(ByteUnit.MiB.toBytes(sortmb) * sortThreshold),
taskAttemptId,
@@ -116,7 +119,7 @@ public class RssMapOutputCollector<K extends Object, V
extends Object>
comparator,
memoryThreshold,
appId,
- RssMRUtils.createShuffleClient(mrJobConf),
+ shuffleClient,
sendCheckInterval,
sendCheckTimeout,
partitionToServers,
@@ -188,6 +191,7 @@ public class RssMapOutputCollector<K extends Object, V
extends Object>
public void close() throws IOException, InterruptedException {
reporter.progress();
bufferManager.freeAllResources();
+ shuffleClient.close();
}
@Override