This is an automated email from the ASF dual-hosted git repository.

zuston pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git


The following commit(s) were added to refs/heads/master by this push:
     new 5ac76e81 [#1127] fix(netty): incorrect bytebuf release for 
ShuffleBlockInfo data in client side (#1150)
5ac76e81 is described below

commit 5ac76e8179a7c038855d439bfbdc9bf759f3f108
Author: Junfan Zhang <[email protected]>
AuthorDate: Wed Aug 16 18:45:55 2023 +0800

    [#1127] fix(netty): incorrect bytebuf release for ShuffleBlockInfo data in 
client side (#1150)
    
    ### What changes were proposed in this pull request?
    
    The principle of data being released is that the data has been sent.
    However, under the current implementation, all blocks will be released in 
the last event.
    Once executed out of order, the unsent block data will be released 
prematurely, which is wrong.
    
    ### Why are the changes needed?
    
    Fix: #1127
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    1. existing UTs
---
 .../spark/shuffle/writer/WriteBufferManager.java     | 20 ++++++++++++--------
 1 file changed, 12 insertions(+), 8 deletions(-)

diff --git 
a/client-spark/common/src/main/java/org/apache/spark/shuffle/writer/WriteBufferManager.java
 
b/client-spark/common/src/main/java/org/apache/spark/shuffle/writer/WriteBufferManager.java
index b6299cd7..27fb40a2 100644
--- 
a/client-spark/common/src/main/java/org/apache/spark/shuffle/writer/WriteBufferManager.java
+++ 
b/client-spark/common/src/main/java/org/apache/spark/shuffle/writer/WriteBufferManager.java
@@ -408,10 +408,16 @@ public class WriteBufferManager extends MemoryConsumer {
                 + totalSize
                 + " bytes");
         // Use final temporary variables for closures
-        final long _memoryUsed = memoryUsed;
+        final long memoryUsedTemp = memoryUsed;
+        final List<ShuffleBlockInfo> shuffleBlocksTemp = 
shuffleBlockInfosPerEvent;
         events.add(
             new AddBlockEvent(
-                taskId, shuffleBlockInfosPerEvent, () -> 
freeAllocatedMemory(_memoryUsed)));
+                taskId,
+                shuffleBlockInfosPerEvent,
+                () -> {
+                  freeAllocatedMemory(memoryUsedTemp);
+                  shuffleBlocksTemp.stream().forEach(x -> 
x.getData().release());
+                }));
         shuffleBlockInfosPerEvent = Lists.newArrayList();
         totalSize = 0;
         memoryUsed = 0;
@@ -425,17 +431,15 @@ public class WriteBufferManager extends MemoryConsumer {
               + totalSize
               + " bytes");
       // Use final temporary variables for closures
-      final long _memoryUsed = memoryUsed;
-      final List<ShuffleBlockInfo> finalShuffleBlockInfosPerEvent = 
shuffleBlockInfoList;
+      final long memoryUsedTemp = memoryUsed;
+      final List<ShuffleBlockInfo> shuffleBlocksTemp = 
shuffleBlockInfosPerEvent;
       events.add(
           new AddBlockEvent(
               taskId,
               shuffleBlockInfosPerEvent,
               () -> {
-                freeAllocatedMemory(_memoryUsed);
-                for (ShuffleBlockInfo shuffleBlockInfo : 
finalShuffleBlockInfosPerEvent) {
-                  shuffleBlockInfo.getData().release();
-                }
+                freeAllocatedMemory(memoryUsedTemp);
+                shuffleBlocksTemp.stream().forEach(x -> x.getData().release());
               }));
     }
     return events;

Reply via email to