This is an automated email from the ASF dual-hosted git repository.
roryqi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git
The following commit(s) were added to refs/heads/master by this push:
new 680b4eaa [#1152] fix: Direct memory may leak in shuffle server. (#1154)
680b4eaa is described below
commit 680b4eaa8fc19b0eff19c320bb249049b72f9c2d
Author: zhengchenyu <[email protected]>
AuthorDate: Thu Aug 17 19:56:28 2023 +0800
[#1152] fix: Direct memory may leak in shuffle server. (#1154)
### What changes were proposed in this pull request?
Release buffer when since the buffer have copied to data of reply.
### Why are the changes needed?
Fix: #1152
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Test in cluster manually.
---
.../org/apache/uniffle/common/ShuffleDataResult.java | 6 ++++++
.../org/apache/uniffle/common/ShuffleIndexResult.java | 6 ++++++
.../apache/uniffle/server/ShuffleServerGrpcService.java | 17 ++++++++++++++---
3 files changed, 26 insertions(+), 3 deletions(-)
diff --git
a/common/src/main/java/org/apache/uniffle/common/ShuffleDataResult.java
b/common/src/main/java/org/apache/uniffle/common/ShuffleDataResult.java
index a5d983bd..35710fa8 100644
--- a/common/src/main/java/org/apache/uniffle/common/ShuffleDataResult.java
+++ b/common/src/main/java/org/apache/uniffle/common/ShuffleDataResult.java
@@ -93,4 +93,10 @@ public class ShuffleDataResult {
|| buffer == null
|| buffer.size() == 0;
}
+
+ public void release() {
+ if (this.buffer != null) {
+ this.buffer.release();
+ }
+ }
}
diff --git
a/common/src/main/java/org/apache/uniffle/common/ShuffleIndexResult.java
b/common/src/main/java/org/apache/uniffle/common/ShuffleIndexResult.java
index 56b9befb..2a686c44 100644
--- a/common/src/main/java/org/apache/uniffle/common/ShuffleIndexResult.java
+++ b/common/src/main/java/org/apache/uniffle/common/ShuffleIndexResult.java
@@ -69,4 +69,10 @@ public class ShuffleIndexResult {
public boolean isEmpty() {
return buffer == null || buffer.size() == 0;
}
+
+ public void release() {
+ if (this.buffer != null) {
+ this.buffer.release();
+ }
+ }
}
diff --git
a/server/src/main/java/org/apache/uniffle/server/ShuffleServerGrpcService.java
b/server/src/main/java/org/apache/uniffle/server/ShuffleServerGrpcService.java
index bbaff9f5..21eb0ae8 100644
---
a/server/src/main/java/org/apache/uniffle/server/ShuffleServerGrpcService.java
+++
b/server/src/main/java/org/apache/uniffle/server/ShuffleServerGrpcService.java
@@ -590,7 +590,7 @@ public class ShuffleServerGrpcService extends
ShuffleServerImplBase {
StatusCode status = StatusCode.SUCCESS;
String msg = "OK";
GetLocalShuffleDataResponse reply = null;
- ShuffleDataResult sdr;
+ ShuffleDataResult sdr = null;
String requestInfo =
"appId["
+ appId
@@ -658,6 +658,9 @@ public class ShuffleServerGrpcService extends
ShuffleServerImplBase {
.setRetMsg(msg)
.build();
} finally {
+ if (sdr != null) {
+ sdr.release();
+ }
shuffleServer.getShuffleBufferManager().releaseReadMemory(length);
}
} else {
@@ -706,9 +709,10 @@ public class ShuffleServerGrpcService extends
ShuffleServerImplBase {
.getShuffleServerConf()
.getLong(ShuffleServerConf.SERVER_SHUFFLE_INDEX_SIZE_HINT);
if
(shuffleServer.getShuffleBufferManager().requireReadMemoryWithRetry(assumedFileSize))
{
+ ShuffleIndexResult shuffleIndexResult = null;
try {
long start = System.currentTimeMillis();
- ShuffleIndexResult shuffleIndexResult =
+ shuffleIndexResult =
shuffleServer
.getShuffleTaskManager()
.getShuffleIndex(appId, shuffleId, partitionId,
partitionNumPerRange, partitionNum);
@@ -744,6 +748,9 @@ public class ShuffleServerGrpcService extends
ShuffleServerImplBase {
.setRetMsg(msg)
.build();
} finally {
+ if (shuffleIndexResult != null) {
+ shuffleIndexResult.release();
+ }
shuffleServer.getShuffleBufferManager().releaseReadMemory(assumedFileSize);
}
} else {
@@ -789,6 +796,7 @@ public class ShuffleServerGrpcService extends
ShuffleServerImplBase {
// todo: if can get the exact memory size?
if
(shuffleServer.getShuffleBufferManager().requireReadMemoryWithRetry(readBufferSize))
{
+ ShuffleDataResult shuffleDataResult = null;
try {
Roaring64NavigableMap expectedTaskIds = null;
if (request.getSerializedExpectedTaskIdsBitmap() != null
@@ -797,7 +805,7 @@ public class ShuffleServerGrpcService extends
ShuffleServerImplBase {
RssUtils.deserializeBitMap(
request.getSerializedExpectedTaskIdsBitmap().toByteArray());
}
- ShuffleDataResult shuffleDataResult =
+ shuffleDataResult =
shuffleServer
.getShuffleTaskManager()
.getInMemoryShuffleData(
@@ -843,6 +851,9 @@ public class ShuffleServerGrpcService extends
ShuffleServerImplBase {
.setRetMsg(msg)
.build();
} finally {
+ if (shuffleDataResult != null) {
+ shuffleDataResult.release();
+ }
shuffleServer.getShuffleBufferManager().releaseReadMemory(readBufferSize);
}
} else {