This is an automated email from the ASF dual-hosted git repository.
zuston pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git
The following commit(s) were added to refs/heads/master by this push:
new 5a8c2ebd8 [#1391] fix(server): Direct memory may leak in exceptional
scenarios in shuffle server. (#1392)
5a8c2ebd8 is described below
commit 5a8c2ebd89911cb8656327a8daad0e863d842c7f
Author: RickyMa <[email protected]>
AuthorDate: Mon Dec 25 10:01:34 2023 +0800
[#1391] fix(server): Direct memory may leak in exceptional scenarios in
shuffle server. (#1392)
### What changes were proposed in this pull request?
Release the direct memory in the finally block to prevent memory leaks.
### Why are the changes needed?
For [#1391](https://github.com/apache/incubator-uniffle/issues/1391)
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
By running server successfully.
---
.../uniffle/server/netty/ShuffleServerNettyHandler.java | 14 ++++++++------
1 file changed, 8 insertions(+), 6 deletions(-)
diff --git
a/server/src/main/java/org/apache/uniffle/server/netty/ShuffleServerNettyHandler.java
b/server/src/main/java/org/apache/uniffle/server/netty/ShuffleServerNettyHandler.java
index ae02ba2d1..184cde000 100644
---
a/server/src/main/java/org/apache/uniffle/server/netty/ShuffleServerNettyHandler.java
+++
b/server/src/main/java/org/apache/uniffle/server/netty/ShuffleServerNettyHandler.java
@@ -147,12 +147,6 @@ public class ShuffleServerNettyHandler implements
BaseMessageHandler {
long alreadyReleasedSize = 0;
boolean isFailureOccurs = false;
for (ShufflePartitionedData spd : shufflePartitionedData) {
- // Once the cache failure occurs, we should explicitly release data
held by byteBuf
- if (isFailureOccurs) {
- Arrays.stream(spd.getBlockList()).forEach(block ->
block.getData().release());
- continue;
- }
-
String shuffleDataInfo =
"appId["
+ appId
@@ -162,6 +156,9 @@ public class ShuffleServerNettyHandler implements
BaseMessageHandler {
+ spd.getPartitionId()
+ "]";
try {
+ if (isFailureOccurs) {
+ continue;
+ }
ret = manager.cacheShuffleData(appId, shuffleId, isPreAllocated,
spd);
if (ret != StatusCode.SUCCESS) {
String errorMsg =
@@ -190,6 +187,11 @@ public class ShuffleServerNettyHandler implements
BaseMessageHandler {
responseMessage = errorMsg;
LOG.error(errorMsg);
isFailureOccurs = true;
+ } finally {
+ // Once the cache failure occurs, we should explicitly release data
held by byteBuf
+ if (isFailureOccurs) {
+ Arrays.stream(spd.getBlockList()).forEach(block ->
block.getData().release());
+ }
}
}
// since the required buffer id is only used once, the shuffle client
would try to require