This is an automated email from the ASF dual-hosted git repository.
xianjingfeng pushed a commit to branch branch-0.8
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git
The following commit(s) were added to refs/heads/branch-0.8 by this push:
new 089d3cbf9 [#1335][0.8] fix(server, netty): release bytebuf explicitly
when requiredId is expired and cache failed #1357 (#1371)
089d3cbf9 is described below
commit 089d3cbf9190f41f68725602ccd6983bf71f8caa
Author: Junfan Zhang <[email protected]>
AuthorDate: Thu Dec 14 10:11:27 2023 +0800
[#1335][0.8] fix(server, netty): release bytebuf explicitly when requiredId
is expired and cache failed #1357 (#1371)
### What changes were proposed in this pull request?
Release bytebuf explicitly when requiredId is expired and cache failure
occurs
### Why are the changes needed?
Fix: #1335
### Does this PR introduce any user-facing change?
No.
### How was this patch tested?
Internal production tests.
---
.../uniffle/server/netty/ShuffleServerNettyHandler.java | 16 ++++++++++++++--
1 file changed, 14 insertions(+), 2 deletions(-)
diff --git
a/server/src/main/java/org/apache/uniffle/server/netty/ShuffleServerNettyHandler.java
b/server/src/main/java/org/apache/uniffle/server/netty/ShuffleServerNettyHandler.java
index 30a90623b..6bc531802 100644
---
a/server/src/main/java/org/apache/uniffle/server/netty/ShuffleServerNettyHandler.java
+++
b/server/src/main/java/org/apache/uniffle/server/netty/ShuffleServerNettyHandler.java
@@ -18,6 +18,8 @@
package org.apache.uniffle.server.netty;
import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.Collection;
import java.util.List;
import java.util.Map;
@@ -122,6 +124,10 @@ public class ShuffleServerNettyHandler implements
BaseMessageHandler {
PreAllocatedBufferInfo info =
manager.getAndRemovePreAllocatedBuffer(requireBufferId);
boolean isPreAllocated = info != null;
if (!isPreAllocated) {
+ req.getPartitionToBlocks().values().stream()
+ .flatMap(Collection::stream)
+ .forEach(block -> block.getData().release());
+
String errorMsg =
"Can't find requireBufferId["
+ requireBufferId
@@ -140,7 +146,13 @@ public class ShuffleServerNettyHandler implements
BaseMessageHandler {
final long start = System.currentTimeMillis();
List<ShufflePartitionedData> shufflePartitionedData =
toPartitionedData(req);
long alreadyReleasedSize = 0;
+ boolean isFailureOccurs = false;
for (ShufflePartitionedData spd : shufflePartitionedData) {
+ // Once the cache failure occurs, we should explicitly release data
held by byteBuf
+ if (isFailureOccurs) {
+ Arrays.stream(spd.getBlockList()).forEach(block ->
block.getData().release());
+ continue;
+ }
String shuffleDataInfo =
"appId["
+ appId
@@ -159,7 +171,7 @@ public class ShuffleServerNettyHandler implements
BaseMessageHandler {
+ ret;
LOG.error(errorMsg);
responseMessage = errorMsg;
- break;
+ isFailureOccurs = true;
} else {
long toReleasedSize = spd.getTotalBlockSize();
// after each cacheShuffleData call, the `preAllocatedSize` is
updated timely.
@@ -177,7 +189,7 @@ public class ShuffleServerNettyHandler implements
BaseMessageHandler {
ret = StatusCode.INTERNAL_ERROR;
responseMessage = errorMsg;
LOG.error(errorMsg);
- break;
+ isFailureOccurs = true;
}
}
// since the required buffer id is only used once, the shuffle client
would try to require