This is an automated email from the ASF dual-hosted git repository.

roryqi pushed a commit to branch branch-0.7
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git


The following commit(s) were added to refs/heads/branch-0.7 by this push:
     new 19fe80c1 [#715] fix(mr): The container does not exit because 
shuffleclient is not closed (#882)
19fe80c1 is described below

commit 19fe80c121a9ff2965c38cbfaa0e86347c0fb1e6
Author: zhaobing <[email protected]>
AuthorDate: Fri May 19 14:30:29 2023 +0800

    [#715] fix(mr): The container does not exit because shuffleclient is not 
closed (#882)
    
    ### What changes were proposed in this pull request?
    The container does not exit because shuffleclient is not closed
    
    ### Why are the changes needed?
    
    For #715
    
    1.The process does not exit after the maptask or reducetask execution is 
complete. The reason is that ShuffleWriteClient has a thread pool that does not 
close when the task completes. So turning off ShuffleWriteClient can solve this 
problem.
    
    2.How do I recreate this scene?
    Initialize a small cluster and submit an mr Task whose requested resources 
exceed the total resources in the cluster.
    We can see that all tasks have completed execution without quitting until 
the timeout time exceeds 60 seconds(mapreduce.task.exit.timeout). The appmaster 
requests the nodemanager to kill the corresponding container.
    
    The nodemanager logs are as follows
    `2023-03-12 13:56:45,901 INFO [AsyncDispatcher event handler] 
org.apache.hadoop.mapreduce.v2.app.job.impl.TaskAttemptImpl: Diagnostics report 
from attempt_1676901654399_1653119_m_000070_0: [2023-03-12 
13:56:44.909]Container killed by the ApplicationMaster.
    [2023-03-12 13:56:44.921]Sent signal OUTPUT_THREAD_DUMP (SIGQUIT) to pid 
45556 as user tc_infra for container 
container_e304_1676901654399_1653119_01_000072, result=success
    [2023-03-12 13:56:44.985]Container killed on request. Exit code is 143
    [2023-03-12 13:56:45.403]Container exited with a non-zero exit code 143.
    `
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    existing UTs.
    
    Co-authored-by: zhaobing <[email protected]>
---
 .../main/java/org/apache/hadoop/mapred/RssMapOutputCollector.java   | 6 +++++-
 1 file changed, 5 insertions(+), 1 deletion(-)

diff --git 
a/client-mr/src/main/java/org/apache/hadoop/mapred/RssMapOutputCollector.java 
b/client-mr/src/main/java/org/apache/hadoop/mapred/RssMapOutputCollector.java
index c9cb553f..7d7b1806 100644
--- 
a/client-mr/src/main/java/org/apache/hadoop/mapred/RssMapOutputCollector.java
+++ 
b/client-mr/src/main/java/org/apache/hadoop/mapred/RssMapOutputCollector.java
@@ -36,6 +36,7 @@ import 
org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import org.apache.uniffle.client.api.ShuffleWriteClient;
 import org.apache.uniffle.common.ShuffleServerInfo;
 import org.apache.uniffle.common.exception.RssException;
 import org.apache.uniffle.common.util.ByteUnit;
@@ -52,6 +53,7 @@ public class RssMapOutputCollector<K extends Object, V 
extends Object>
   private Set<Long> failedBlockIds = Sets.newConcurrentHashSet();
   private int partitions;
   private SortWriteBufferManager bufferManager;
+  private ShuffleWriteClient shuffleClient;
 
   @Override
   public void init(Context context) throws IOException, ClassNotFoundException 
{
@@ -107,6 +109,7 @@ public class RssMapOutputCollector<K extends Object, V 
extends Object>
         RssMRConfig.RSS_CLIENT_DEFAULT_SEND_THREAD_NUM);
     long maxBufferSize = RssMRUtils.getLong(rssJobConf, mrJobConf, 
RssMRConfig.RSS_WRITER_BUFFER_SIZE,
         RssMRConfig.RSS_WRITER_BUFFER_SIZE_DEFAULT_VALUE);
+    shuffleClient = RssMRUtils.createShuffleClient(mrJobConf);
     bufferManager = new SortWriteBufferManager(
         (long)(ByteUnit.MiB.toBytes(sortmb) * sortThreshold),
         taskAttemptId,
@@ -116,7 +119,7 @@ public class RssMapOutputCollector<K extends Object, V 
extends Object>
         comparator,
         memoryThreshold,
         appId,
-        RssMRUtils.createShuffleClient(mrJobConf),
+        shuffleClient,
         sendCheckInterval,
         sendCheckTimeout,
         partitionToServers,
@@ -188,6 +191,7 @@ public class RssMapOutputCollector<K extends Object, V 
extends Object>
   public void close() throws IOException, InterruptedException {
     reporter.progress();
     bufferManager.freeAllResources();
+    shuffleClient.close();
   }
 
   @Override

Reply via email to