This is an automated email from the ASF dual-hosted git repository.
zuston pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git
The following commit(s) were added to refs/heads/master by this push:
new 453e6d694 [#1335] fix(server)(netty): release bytebuf explicitly when
requiredId is expired or cache failed (#1357)
453e6d694 is described below
commit 453e6d6948dd1bcc2f4f3425b493d7288931ffef
Author: Junfan Zhang <[email protected]>
AuthorDate: Fri Dec 8 11:06:57 2023 +0800
[#1335] fix(server)(netty): release bytebuf explicitly when requiredId is
expired or cache failed (#1357)
### What changes were proposed in this pull request?
Release bytebuf explicitly when `requiredId` is expired and cache failure
occurs
### Why are the changes needed?
Fix: #1335
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Internal production tests.
---
.../uniffle/server/netty/ShuffleServerNettyHandler.java | 17 +++++++++++++++--
1 file changed, 15 insertions(+), 2 deletions(-)
diff --git
a/server/src/main/java/org/apache/uniffle/server/netty/ShuffleServerNettyHandler.java
b/server/src/main/java/org/apache/uniffle/server/netty/ShuffleServerNettyHandler.java
index e4b469152..ae02ba2d1 100644
---
a/server/src/main/java/org/apache/uniffle/server/netty/ShuffleServerNettyHandler.java
+++
b/server/src/main/java/org/apache/uniffle/server/netty/ShuffleServerNettyHandler.java
@@ -17,6 +17,8 @@
package org.apache.uniffle.server.netty;
+import java.util.Arrays;
+import java.util.Collection;
import java.util.List;
import java.util.Map;
@@ -121,6 +123,10 @@ public class ShuffleServerNettyHandler implements
BaseMessageHandler {
PreAllocatedBufferInfo info =
manager.getAndRemovePreAllocatedBuffer(requireBufferId);
boolean isPreAllocated = info != null;
if (!isPreAllocated) {
+ req.getPartitionToBlocks().values().stream()
+ .flatMap(Collection::stream)
+ .forEach(block -> block.getData().release());
+
String errorMsg =
"Can't find requireBufferId["
+ requireBufferId
@@ -139,7 +145,14 @@ public class ShuffleServerNettyHandler implements
BaseMessageHandler {
final long start = System.currentTimeMillis();
List<ShufflePartitionedData> shufflePartitionedData =
toPartitionedData(req);
long alreadyReleasedSize = 0;
+ boolean isFailureOccurs = false;
for (ShufflePartitionedData spd : shufflePartitionedData) {
+ // Once the cache failure occurs, we should explicitly release data
held by byteBuf
+ if (isFailureOccurs) {
+ Arrays.stream(spd.getBlockList()).forEach(block ->
block.getData().release());
+ continue;
+ }
+
String shuffleDataInfo =
"appId["
+ appId
@@ -158,7 +171,7 @@ public class ShuffleServerNettyHandler implements
BaseMessageHandler {
+ ret;
LOG.error(errorMsg);
responseMessage = errorMsg;
- break;
+ isFailureOccurs = true;
} else {
long toReleasedSize = spd.getTotalBlockSize();
// after each cacheShuffleData call, the `preAllocatedSize` is
updated timely.
@@ -176,7 +189,7 @@ public class ShuffleServerNettyHandler implements
BaseMessageHandler {
ret = StatusCode.INTERNAL_ERROR;
responseMessage = errorMsg;
LOG.error(errorMsg);
- break;
+ isFailureOccurs = true;
}
}
// since the required buffer id is only used once, the shuffle client
would try to require