This is an automated email from the ASF dual-hosted git repository.

zuston pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git


The following commit(s) were added to refs/heads/master by this push:
     new 5a8c2ebd8 [#1391] fix(server): Direct memory may leak in exceptional 
scenarios in shuffle server. (#1392)
5a8c2ebd8 is described below

commit 5a8c2ebd89911cb8656327a8daad0e863d842c7f
Author: RickyMa <[email protected]>
AuthorDate: Mon Dec 25 10:01:34 2023 +0800

    [#1391] fix(server): Direct memory may leak in exceptional scenarios in 
shuffle server. (#1392)
    
    ### What changes were proposed in this pull request?
    
    Release the direct memory in the finally block to prevent memory leaks.
    
    ### Why are the changes needed?
    
    For [#1391](https://github.com/apache/incubator-uniffle/issues/1391)
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    By running server successfully.
---
 .../uniffle/server/netty/ShuffleServerNettyHandler.java    | 14 ++++++++------
 1 file changed, 8 insertions(+), 6 deletions(-)

diff --git 
a/server/src/main/java/org/apache/uniffle/server/netty/ShuffleServerNettyHandler.java
 
b/server/src/main/java/org/apache/uniffle/server/netty/ShuffleServerNettyHandler.java
index ae02ba2d1..184cde000 100644
--- 
a/server/src/main/java/org/apache/uniffle/server/netty/ShuffleServerNettyHandler.java
+++ 
b/server/src/main/java/org/apache/uniffle/server/netty/ShuffleServerNettyHandler.java
@@ -147,12 +147,6 @@ public class ShuffleServerNettyHandler implements 
BaseMessageHandler {
       long alreadyReleasedSize = 0;
       boolean isFailureOccurs = false;
       for (ShufflePartitionedData spd : shufflePartitionedData) {
-        // Once the cache failure occurs, we should explicitly release data 
held by byteBuf
-        if (isFailureOccurs) {
-          Arrays.stream(spd.getBlockList()).forEach(block -> 
block.getData().release());
-          continue;
-        }
-
         String shuffleDataInfo =
             "appId["
                 + appId
@@ -162,6 +156,9 @@ public class ShuffleServerNettyHandler implements 
BaseMessageHandler {
                 + spd.getPartitionId()
                 + "]";
         try {
+          if (isFailureOccurs) {
+            continue;
+          }
           ret = manager.cacheShuffleData(appId, shuffleId, isPreAllocated, 
spd);
           if (ret != StatusCode.SUCCESS) {
             String errorMsg =
@@ -190,6 +187,11 @@ public class ShuffleServerNettyHandler implements 
BaseMessageHandler {
           responseMessage = errorMsg;
           LOG.error(errorMsg);
           isFailureOccurs = true;
+        } finally {
+          // Once the cache failure occurs, we should explicitly release data 
held by byteBuf
+          if (isFailureOccurs) {
+            Arrays.stream(spd.getBlockList()).forEach(block -> 
block.getData().release());
+          }
         }
       }
       // since the required buffer id is only used once, the shuffle client 
would try to require

Reply via email to