This is an automated email from the ASF dual-hosted git repository.

zuston pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git


The following commit(s) were added to refs/heads/master by this push:
     new 453e6d694 [#1335] fix(server)(netty): release bytebuf explicitly when 
requiredId is expired or cache failed (#1357)
453e6d694 is described below

commit 453e6d6948dd1bcc2f4f3425b493d7288931ffef
Author: Junfan Zhang <[email protected]>
AuthorDate: Fri Dec 8 11:06:57 2023 +0800

    [#1335] fix(server)(netty): release bytebuf explicitly when requiredId is 
expired or cache failed (#1357)
    
    ### What changes were proposed in this pull request?
    
    Release bytebuf explicitly when `requiredId` is expired and cache failure 
occurs
    
    ### Why are the changes needed?
    
    Fix: #1335
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    Internal production tests.
---
 .../uniffle/server/netty/ShuffleServerNettyHandler.java | 17 +++++++++++++++--
 1 file changed, 15 insertions(+), 2 deletions(-)

diff --git 
a/server/src/main/java/org/apache/uniffle/server/netty/ShuffleServerNettyHandler.java
 
b/server/src/main/java/org/apache/uniffle/server/netty/ShuffleServerNettyHandler.java
index e4b469152..ae02ba2d1 100644
--- 
a/server/src/main/java/org/apache/uniffle/server/netty/ShuffleServerNettyHandler.java
+++ 
b/server/src/main/java/org/apache/uniffle/server/netty/ShuffleServerNettyHandler.java
@@ -17,6 +17,8 @@
 
 package org.apache.uniffle.server.netty;
 
+import java.util.Arrays;
+import java.util.Collection;
 import java.util.List;
 import java.util.Map;
 
@@ -121,6 +123,10 @@ public class ShuffleServerNettyHandler implements 
BaseMessageHandler {
       PreAllocatedBufferInfo info = 
manager.getAndRemovePreAllocatedBuffer(requireBufferId);
       boolean isPreAllocated = info != null;
       if (!isPreAllocated) {
+        req.getPartitionToBlocks().values().stream()
+            .flatMap(Collection::stream)
+            .forEach(block -> block.getData().release());
+
         String errorMsg =
             "Can't find requireBufferId["
                 + requireBufferId
@@ -139,7 +145,14 @@ public class ShuffleServerNettyHandler implements 
BaseMessageHandler {
       final long start = System.currentTimeMillis();
       List<ShufflePartitionedData> shufflePartitionedData = 
toPartitionedData(req);
       long alreadyReleasedSize = 0;
+      boolean isFailureOccurs = false;
       for (ShufflePartitionedData spd : shufflePartitionedData) {
+        // Once the cache failure occurs, we should explicitly release data 
held by byteBuf
+        if (isFailureOccurs) {
+          Arrays.stream(spd.getBlockList()).forEach(block -> 
block.getData().release());
+          continue;
+        }
+
         String shuffleDataInfo =
             "appId["
                 + appId
@@ -158,7 +171,7 @@ public class ShuffleServerNettyHandler implements 
BaseMessageHandler {
                     + ret;
             LOG.error(errorMsg);
             responseMessage = errorMsg;
-            break;
+            isFailureOccurs = true;
           } else {
             long toReleasedSize = spd.getTotalBlockSize();
             // after each cacheShuffleData call, the `preAllocatedSize` is 
updated timely.
@@ -176,7 +189,7 @@ public class ShuffleServerNettyHandler implements 
BaseMessageHandler {
           ret = StatusCode.INTERNAL_ERROR;
           responseMessage = errorMsg;
           LOG.error(errorMsg);
-          break;
+          isFailureOccurs = true;
         }
       }
       // since the required buffer id is only used once, the shuffle client 
would try to require

Reply via email to