This is an automated email from the ASF dual-hosted git repository.

roryqi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git


The following commit(s) were added to refs/heads/master by this push:
     new 55ac4c2d [MINOR] refactor: Reduce the usage of memory in the 
`ShuffleWriter` (#877)
55ac4c2d is described below

commit 55ac4c2d4b739e7698415182fa94f46f3c5198ae
Author: roryqi <[email protected]>
AuthorDate: Fri May 12 22:36:08 2023 +0800

    [MINOR] refactor: Reduce the usage of memory in the `ShuffleWriter` (#877)
    
    ### What changes were proposed in this pull request?
    We should immediately remove the `WriteBuffer` after  we use the method 
`createShuffleBlock` to create `ShuffeBlock`. Otherwise we will have the 
compressed data and uncompressed data at the same time in the memory. JVM's GC 
can't collect the memory of uncompressed data, because the data have the 
reference in the map `buffers`.
    
    ### Why are the changes needed?
    Reduce the usage of memory.
    
    ### Does this PR introduce _any_ user-facing change?
    No.
    
    ### How was this patch tested?
    CI passed.
---
 .../java/org/apache/spark/shuffle/writer/WriteBufferManager.java   | 7 +++++--
 1 file changed, 5 insertions(+), 2 deletions(-)

diff --git 
a/client-spark/common/src/main/java/org/apache/spark/shuffle/writer/WriteBufferManager.java
 
b/client-spark/common/src/main/java/org/apache/spark/shuffle/writer/WriteBufferManager.java
index bbabeb50..326ccfba 100644
--- 
a/client-spark/common/src/main/java/org/apache/spark/shuffle/writer/WriteBufferManager.java
+++ 
b/client-spark/common/src/main/java/org/apache/spark/shuffle/writer/WriteBufferManager.java
@@ -18,6 +18,7 @@
 package org.apache.spark.shuffle.writer;
 
 import java.util.ArrayList;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
@@ -206,16 +207,18 @@ public class WriteBufferManager extends MemoryConsumer {
     List<ShuffleBlockInfo> result = Lists.newArrayList();
     long dataSize = 0;
     long memoryUsed = 0;
-    for (Entry<Integer, WriterBuffer> entry : buffers.entrySet()) {
+    Iterator<Entry<Integer, WriterBuffer>> iterator = 
buffers.entrySet().iterator();
+    while (iterator.hasNext()) {
+      Entry<Integer, WriterBuffer>  entry = iterator.next();
       WriterBuffer wb = entry.getValue();
       dataSize += wb.getDataLength();
       memoryUsed += wb.getMemoryUsed();
       result.add(createShuffleBlock(entry.getKey(), wb));
+      iterator.remove();
       copyTime += wb.getCopyTime();
     }
     LOG.info("Flush total buffer for shuffleId[" + shuffleId + "] with 
allocated["
         + allocatedBytes + "], dataSize[" + dataSize + "], memoryUsed[" + 
memoryUsed + "]");
-    buffers.clear();
     return result;
   }
 

Reply via email to