This is an automated email from the ASF dual-hosted git repository.

roryqi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git


The following commit(s) were added to refs/heads/master by this push:
     new 762df540 [#715] fix(mr): The container does not exit because 
shuffleclient is not closed (#882)
762df540 is described below

commit 762df540f2d947c29178fd943fae6d31c3dcd042
Author: zhaobing <[email protected]>
AuthorDate: Fri May 19 14:30:29 2023 +0800

    [#715] fix(mr): The container does not exit because shuffleclient is not 
closed (#882)
    
    ### What changes were proposed in this pull request?
    The container does not exit because shuffleclient is not closed
    
    ### Why are the changes needed?
    
    For #715
    
    1.The process does not exit after the maptask or reducetask execution is 
complete. The reason is that ShuffleWriteClient has a thread pool that does not 
close when the task completes. So turning off ShuffleWriteClient can solve this 
problem.
    
    2.How do I recreate this scene?
    Initialize a small cluster and submit an mr Task whose requested resources 
exceed the total resources in the cluster.
    We can see that all tasks have completed execution without quitting until 
the timeout time exceeds 60 seconds(mapreduce.task.exit.timeout). The appmaster 
requests the nodemanager to kill the corresponding container.
    
    The nodemanager logs are as follows
    `2023-03-12 13:56:45,901 INFO [AsyncDispatcher event handler] 
org.apache.hadoop.mapreduce.v2.app.job.impl.TaskAttemptImpl: Diagnostics report 
from attempt_1676901654399_1653119_m_000070_0: [2023-03-12 
13:56:44.909]Container killed by the ApplicationMaster.
    [2023-03-12 13:56:44.921]Sent signal OUTPUT_THREAD_DUMP (SIGQUIT) to pid 
45556 as user tc_infra for container 
container_e304_1676901654399_1653119_01_000072, result=success
    [2023-03-12 13:56:44.985]Container killed on request. Exit code is 143
    [2023-03-12 13:56:45.403]Container exited with a non-zero exit code 143.
    `
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    existing UTs.
    
    Co-authored-by: zhaobing <[email protected]>
---
 .../main/java/org/apache/hadoop/mapred/RssMapOutputCollector.java   | 6 +++++-
 1 file changed, 5 insertions(+), 1 deletion(-)

diff --git 
a/client-mr/src/main/java/org/apache/hadoop/mapred/RssMapOutputCollector.java 
b/client-mr/src/main/java/org/apache/hadoop/mapred/RssMapOutputCollector.java
index 47b33728..6bd30981 100644
--- 
a/client-mr/src/main/java/org/apache/hadoop/mapred/RssMapOutputCollector.java
+++ 
b/client-mr/src/main/java/org/apache/hadoop/mapred/RssMapOutputCollector.java
@@ -36,6 +36,7 @@ import 
org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import org.apache.uniffle.client.api.ShuffleWriteClient;
 import org.apache.uniffle.common.ShuffleServerInfo;
 import org.apache.uniffle.common.exception.RssException;
 import org.apache.uniffle.common.util.ByteUnit;
@@ -52,6 +53,7 @@ public class RssMapOutputCollector<K extends Object, V 
extends Object>
   private Set<Long> failedBlockIds = Sets.newConcurrentHashSet();
   private int partitions;
   private SortWriteBufferManager bufferManager;
+  private ShuffleWriteClient shuffleClient;
 
   @Override
   public void init(Context context) throws IOException, ClassNotFoundException 
{
@@ -107,6 +109,7 @@ public class RssMapOutputCollector<K extends Object, V 
extends Object>
         RssMRConfig.RSS_CLIENT_DEFAULT_SEND_THREAD_NUM);
     long maxBufferSize = RssMRUtils.getLong(rssJobConf, mrJobConf, 
RssMRConfig.RSS_WRITER_BUFFER_SIZE,
         RssMRConfig.RSS_WRITER_BUFFER_SIZE_DEFAULT_VALUE);
+    shuffleClient = RssMRUtils.createShuffleClient(mrJobConf);
     bufferManager = new SortWriteBufferManager(
         (long)(ByteUnit.MiB.toBytes(sortmb) * sortThreshold),
         taskAttemptId,
@@ -116,7 +119,7 @@ public class RssMapOutputCollector<K extends Object, V 
extends Object>
         comparator,
         memoryThreshold,
         appId,
-        RssMRUtils.createShuffleClient(mrJobConf),
+        shuffleClient,
         sendCheckInterval,
         sendCheckTimeout,
         partitionToServers,
@@ -180,6 +183,7 @@ public class RssMapOutputCollector<K extends Object, V 
extends Object>
   public void close() throws IOException, InterruptedException {
     reporter.progress();
     bufferManager.freeAllResources();
+    shuffleClient.close();
   }
 
   @Override

Reply via email to