This is an automated email from the ASF dual-hosted git repository.

xianjingfeng pushed a commit to branch branch-0.8
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git


The following commit(s) were added to refs/heads/branch-0.8 by this push:
     new 089d3cbf9 [#1335][0.8] fix(server, netty): release bytebuf explicitly 
when requiredId is expired and cache failed #1357 (#1371)
089d3cbf9 is described below

commit 089d3cbf9190f41f68725602ccd6983bf71f8caa
Author: Junfan Zhang <[email protected]>
AuthorDate: Thu Dec 14 10:11:27 2023 +0800

    [#1335][0.8] fix(server, netty): release bytebuf explicitly when requiredId 
is expired and cache failed #1357 (#1371)
    
    ### What changes were proposed in this pull request?
    Release bytebuf explicitly when requiredId is expired and cache failure 
occurs
    
    ### Why are the changes needed?
    Fix: #1335
    
    ### Does this PR introduce any user-facing change?
    No.
    
    ### How was this patch tested?
    Internal production tests.
---
 .../uniffle/server/netty/ShuffleServerNettyHandler.java  | 16 ++++++++++++++--
 1 file changed, 14 insertions(+), 2 deletions(-)

diff --git 
a/server/src/main/java/org/apache/uniffle/server/netty/ShuffleServerNettyHandler.java
 
b/server/src/main/java/org/apache/uniffle/server/netty/ShuffleServerNettyHandler.java
index 30a90623b..6bc531802 100644
--- 
a/server/src/main/java/org/apache/uniffle/server/netty/ShuffleServerNettyHandler.java
+++ 
b/server/src/main/java/org/apache/uniffle/server/netty/ShuffleServerNettyHandler.java
@@ -18,6 +18,8 @@
 package org.apache.uniffle.server.netty;
 
 import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.Collection;
 import java.util.List;
 import java.util.Map;
 
@@ -122,6 +124,10 @@ public class ShuffleServerNettyHandler implements 
BaseMessageHandler {
       PreAllocatedBufferInfo info = 
manager.getAndRemovePreAllocatedBuffer(requireBufferId);
       boolean isPreAllocated = info != null;
       if (!isPreAllocated) {
+        req.getPartitionToBlocks().values().stream()
+            .flatMap(Collection::stream)
+            .forEach(block -> block.getData().release());
+
         String errorMsg =
             "Can't find requireBufferId["
                 + requireBufferId
@@ -140,7 +146,13 @@ public class ShuffleServerNettyHandler implements 
BaseMessageHandler {
       final long start = System.currentTimeMillis();
       List<ShufflePartitionedData> shufflePartitionedData = 
toPartitionedData(req);
       long alreadyReleasedSize = 0;
+      boolean isFailureOccurs = false;
       for (ShufflePartitionedData spd : shufflePartitionedData) {
+        // Once the cache failure occurs, we should explicitly release data 
held by byteBuf
+        if (isFailureOccurs) {
+          Arrays.stream(spd.getBlockList()).forEach(block -> 
block.getData().release());
+          continue;
+        }
         String shuffleDataInfo =
             "appId["
                 + appId
@@ -159,7 +171,7 @@ public class ShuffleServerNettyHandler implements 
BaseMessageHandler {
                     + ret;
             LOG.error(errorMsg);
             responseMessage = errorMsg;
-            break;
+            isFailureOccurs = true;
           } else {
             long toReleasedSize = spd.getTotalBlockSize();
             // after each cacheShuffleData call, the `preAllocatedSize` is 
updated timely.
@@ -177,7 +189,7 @@ public class ShuffleServerNettyHandler implements 
BaseMessageHandler {
           ret = StatusCode.INTERNAL_ERROR;
           responseMessage = errorMsg;
           LOG.error(errorMsg);
-          break;
+          isFailureOccurs = true;
         }
       }
       // since the required buffer id is only used once, the shuffle client 
would try to require

Reply via email to