This is an automated email from the ASF dual-hosted git repository.

roryqi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git


The following commit(s) were added to refs/heads/master by this push:
     new 680b4eaa [#1152] fix: Direct memory may leak in shuffle server. (#1154)
680b4eaa is described below

commit 680b4eaa8fc19b0eff19c320bb249049b72f9c2d
Author: zhengchenyu <[email protected]>
AuthorDate: Thu Aug 17 19:56:28 2023 +0800

    [#1152] fix: Direct memory may leak in shuffle server. (#1154)
    
    ### What changes were proposed in this pull request?
    
    Release buffer when since the buffer have copied to data of reply.
    
    ### Why are the changes needed?
    
    Fix: #1152
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    Test in cluster manually.
---
 .../org/apache/uniffle/common/ShuffleDataResult.java    |  6 ++++++
 .../org/apache/uniffle/common/ShuffleIndexResult.java   |  6 ++++++
 .../apache/uniffle/server/ShuffleServerGrpcService.java | 17 ++++++++++++++---
 3 files changed, 26 insertions(+), 3 deletions(-)

diff --git 
a/common/src/main/java/org/apache/uniffle/common/ShuffleDataResult.java 
b/common/src/main/java/org/apache/uniffle/common/ShuffleDataResult.java
index a5d983bd..35710fa8 100644
--- a/common/src/main/java/org/apache/uniffle/common/ShuffleDataResult.java
+++ b/common/src/main/java/org/apache/uniffle/common/ShuffleDataResult.java
@@ -93,4 +93,10 @@ public class ShuffleDataResult {
         || buffer == null
         || buffer.size() == 0;
   }
+
+  public void release() {
+    if (this.buffer != null) {
+      this.buffer.release();
+    }
+  }
 }
diff --git 
a/common/src/main/java/org/apache/uniffle/common/ShuffleIndexResult.java 
b/common/src/main/java/org/apache/uniffle/common/ShuffleIndexResult.java
index 56b9befb..2a686c44 100644
--- a/common/src/main/java/org/apache/uniffle/common/ShuffleIndexResult.java
+++ b/common/src/main/java/org/apache/uniffle/common/ShuffleIndexResult.java
@@ -69,4 +69,10 @@ public class ShuffleIndexResult {
   public boolean isEmpty() {
     return buffer == null || buffer.size() == 0;
   }
+
+  public void release() {
+    if (this.buffer != null) {
+      this.buffer.release();
+    }
+  }
 }
diff --git 
a/server/src/main/java/org/apache/uniffle/server/ShuffleServerGrpcService.java 
b/server/src/main/java/org/apache/uniffle/server/ShuffleServerGrpcService.java
index bbaff9f5..21eb0ae8 100644
--- 
a/server/src/main/java/org/apache/uniffle/server/ShuffleServerGrpcService.java
+++ 
b/server/src/main/java/org/apache/uniffle/server/ShuffleServerGrpcService.java
@@ -590,7 +590,7 @@ public class ShuffleServerGrpcService extends 
ShuffleServerImplBase {
     StatusCode status = StatusCode.SUCCESS;
     String msg = "OK";
     GetLocalShuffleDataResponse reply = null;
-    ShuffleDataResult sdr;
+    ShuffleDataResult sdr = null;
     String requestInfo =
         "appId["
             + appId
@@ -658,6 +658,9 @@ public class ShuffleServerGrpcService extends 
ShuffleServerImplBase {
                 .setRetMsg(msg)
                 .build();
       } finally {
+        if (sdr != null) {
+          sdr.release();
+        }
         shuffleServer.getShuffleBufferManager().releaseReadMemory(length);
       }
     } else {
@@ -706,9 +709,10 @@ public class ShuffleServerGrpcService extends 
ShuffleServerImplBase {
             .getShuffleServerConf()
             .getLong(ShuffleServerConf.SERVER_SHUFFLE_INDEX_SIZE_HINT);
     if 
(shuffleServer.getShuffleBufferManager().requireReadMemoryWithRetry(assumedFileSize))
 {
+      ShuffleIndexResult shuffleIndexResult = null;
       try {
         long start = System.currentTimeMillis();
-        ShuffleIndexResult shuffleIndexResult =
+        shuffleIndexResult =
             shuffleServer
                 .getShuffleTaskManager()
                 .getShuffleIndex(appId, shuffleId, partitionId, 
partitionNumPerRange, partitionNum);
@@ -744,6 +748,9 @@ public class ShuffleServerGrpcService extends 
ShuffleServerImplBase {
                 .setRetMsg(msg)
                 .build();
       } finally {
+        if (shuffleIndexResult != null) {
+          shuffleIndexResult.release();
+        }
         
shuffleServer.getShuffleBufferManager().releaseReadMemory(assumedFileSize);
       }
     } else {
@@ -789,6 +796,7 @@ public class ShuffleServerGrpcService extends 
ShuffleServerImplBase {
 
     // todo: if can get the exact memory size?
     if 
(shuffleServer.getShuffleBufferManager().requireReadMemoryWithRetry(readBufferSize))
 {
+      ShuffleDataResult shuffleDataResult = null;
       try {
         Roaring64NavigableMap expectedTaskIds = null;
         if (request.getSerializedExpectedTaskIdsBitmap() != null
@@ -797,7 +805,7 @@ public class ShuffleServerGrpcService extends 
ShuffleServerImplBase {
               RssUtils.deserializeBitMap(
                   request.getSerializedExpectedTaskIdsBitmap().toByteArray());
         }
-        ShuffleDataResult shuffleDataResult =
+        shuffleDataResult =
             shuffleServer
                 .getShuffleTaskManager()
                 .getInMemoryShuffleData(
@@ -843,6 +851,9 @@ public class ShuffleServerGrpcService extends 
ShuffleServerImplBase {
                 .setRetMsg(msg)
                 .build();
       } finally {
+        if (shuffleDataResult != null) {
+          shuffleDataResult.release();
+        }
         
shuffleServer.getShuffleBufferManager().releaseReadMemory(readBufferSize);
       }
     } else {

Reply via email to