This is an automated email from the ASF dual-hosted git repository.
rickyma pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git
The following commit(s) were added to refs/heads/master by this push:
new 0f56b822b [#1917] fix(server,netty): Fix memory leak issues when
handling SendShuffleDataRequest (#1918)
0f56b822b is described below
commit 0f56b822bc45370f23e191dd0d8f5a9b0606a9e4
Author: maobaolong <[email protected]>
AuthorDate: Thu Jul 18 02:43:52 2024 +0800
[#1917] fix(server,netty): Fix memory leak issues when handling
SendShuffleDataRequest (#1918)
### What changes were proposed in this pull request?
Extract a method to release resources including Netty buffers and memory
metrics when handling SendShuffleDataRequest.
### Why are the changes needed?
Fix: #1917
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Tested on our test cluster.
---
.../server/netty/ShuffleServerNettyHandler.java | 85 +++++++++++++++++++---
1 file changed, 74 insertions(+), 11 deletions(-)
diff --git
a/server/src/main/java/org/apache/uniffle/server/netty/ShuffleServerNettyHandler.java
b/server/src/main/java/org/apache/uniffle/server/netty/ShuffleServerNettyHandler.java
index d297603b9..27a3f1dc4 100644
---
a/server/src/main/java/org/apache/uniffle/server/netty/ShuffleServerNettyHandler.java
+++
b/server/src/main/java/org/apache/uniffle/server/netty/ShuffleServerNettyHandler.java
@@ -26,6 +26,7 @@ import com.google.common.collect.Lists;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
+import org.apache.commons.collections.MapUtils;
import org.apache.commons.collections4.CollectionUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -105,6 +106,18 @@ public class ShuffleServerNettyHandler implements
BaseMessageHandler {
long timestamp = req.getTimestamp();
int stageAttemptNumber = req.getStageAttemptNumber();
ShuffleTaskInfo taskInfo =
shuffleServer.getShuffleTaskManager().getShuffleTaskInfo(appId);
+ ShuffleBufferManager shuffleBufferManager =
shuffleServer.getShuffleBufferManager();
+ ShuffleTaskManager shuffleTaskManager =
shuffleServer.getShuffleTaskManager();
+ // info is null, means pre-allocated buffer has been removed by
preAllocatedBufferCheck thread,
+ // otherwise we need to release the required size.
+ PreAllocatedBufferInfo info =
+ shuffleTaskManager.getAndRemovePreAllocatedBuffer(requireBufferId);
+ int requireSize = info == null ? 0 : info.getRequireSize();
+ int requireBlocksSize =
+ requireSize - req.encodedLength() < 0 ? 0 : requireSize -
req.encodedLength();
+
+ boolean isPreAllocated = info != null;
+
if (taskInfo == null) {
rpcResponse =
new RpcResponse(
@@ -116,9 +129,20 @@ public class ShuffleServerNettyHandler implements
BaseMessageHandler {
+ appId
+ "], shuffleId["
+ shuffleId
+ + "], isPreAllocated["
+ + isPreAllocated
+ "]";
LOG.error(errorMsg);
ShuffleServerMetrics.counterAppNotFound.inc();
+ releaseNettyBufferAndMetrics(
+ req,
+ appId,
+ shuffleId,
+ requireBufferId,
+ requireBlocksSize,
+ shuffleBufferManager,
+ info,
+ isPreAllocated);
client.getChannel().writeAndFlush(rpcResponse);
return;
}
@@ -129,6 +153,25 @@ public class ShuffleServerNettyHandler implements
BaseMessageHandler {
String responseMessage = "A retry has occurred at the Stage, sending
data is invalid.";
rpcResponse =
new RpcResponse(req.getRequestId(), StatusCode.STAGE_RETRY_IGNORE,
responseMessage);
+ LOG.warn(
+ "Stage retry occurred, appId["
+ + appId
+ + "], shuffleId["
+ + shuffleId
+ + "], stageAttemptNumber["
+ + stageAttemptNumber
+ + "], latestStageAttemptNumber["
+ + latestStageAttemptNumber
+ + "]");
+ releaseNettyBufferAndMetrics(
+ req,
+ appId,
+ shuffleId,
+ requireBufferId,
+ requireBlocksSize,
+ shuffleBufferManager,
+ info,
+ isPreAllocated);
client.getChannel().writeAndFlush(rpcResponse);
return;
}
@@ -147,17 +190,11 @@ public class ShuffleServerNettyHandler implements
BaseMessageHandler {
.recordTransportTime(SendShuffleDataRequest.class.getName(),
transportTime);
}
}
- int requireSize =
shuffleServer.getShuffleTaskManager().getRequireBufferSize(requireBufferId);
- int requireBlocksSize =
- requireSize - req.encodedLength() < 0 ? 0 : requireSize -
req.encodedLength();
StatusCode ret = StatusCode.SUCCESS;
String responseMessage = "OK";
if (req.getPartitionToBlocks().size() > 0) {
ShuffleServerMetrics.counterTotalReceivedDataSize.inc(requireBlocksSize);
- ShuffleTaskManager manager = shuffleServer.getShuffleTaskManager();
- PreAllocatedBufferInfo info =
manager.getAndRemovePreAllocatedBuffer(requireBufferId);
- boolean isPreAllocated = info != null;
if (!isPreAllocated) {
req.getPartitionToBlocks().values().stream()
.flatMap(Collection::stream)
@@ -180,7 +217,6 @@ public class ShuffleServerNettyHandler implements
BaseMessageHandler {
return;
}
final long start = System.currentTimeMillis();
- ShuffleBufferManager shuffleBufferManager =
shuffleServer.getShuffleBufferManager();
shuffleBufferManager.releaseMemory(req.encodedLength(), false, true);
List<ShufflePartitionedData> shufflePartitionedData =
toPartitionedData(req);
long alreadyReleasedSize = 0;
@@ -198,7 +234,7 @@ public class ShuffleServerNettyHandler implements
BaseMessageHandler {
if (hasFailureOccurred) {
continue;
}
- ret = manager.cacheShuffleData(appId, shuffleId, isPreAllocated,
spd);
+ ret = shuffleTaskManager.cacheShuffleData(appId, shuffleId,
isPreAllocated, spd);
if (ret != StatusCode.SUCCESS) {
String errorMsg =
"Error happened when shuffleEngine.write for "
@@ -211,9 +247,9 @@ public class ShuffleServerNettyHandler implements
BaseMessageHandler {
} else {
long toReleasedSize = spd.getTotalBlockSize();
// after each cacheShuffleData call, the `preAllocatedSize` is
updated timely.
- manager.releasePreAllocatedSize(toReleasedSize);
+ shuffleTaskManager.releasePreAllocatedSize(toReleasedSize);
alreadyReleasedSize += toReleasedSize;
- manager.updateCachedBlockIds(
+ shuffleTaskManager.updateCachedBlockIds(
appId, shuffleId, spd.getPartitionId(), spd.getBlockList());
}
} catch (Exception e) {
@@ -240,7 +276,7 @@ public class ShuffleServerNettyHandler implements
BaseMessageHandler {
// removed, then after
// cacheShuffleData finishes, the preAllocatedSize should be updated
accordingly.
if (requireBlocksSize > alreadyReleasedSize) {
- manager.releasePreAllocatedSize(requireBlocksSize -
alreadyReleasedSize);
+ shuffleTaskManager.releasePreAllocatedSize(requireBlocksSize -
alreadyReleasedSize);
}
rpcResponse = new RpcResponse(req.getRequestId(), ret, responseMessage);
long costTime = System.currentTimeMillis() - start;
@@ -269,6 +305,33 @@ public class ShuffleServerNettyHandler implements
BaseMessageHandler {
client.getChannel().writeAndFlush(rpcResponse);
}
+ private static void releaseNettyBufferAndMetrics(
+ SendShuffleDataRequest req,
+ String appId,
+ int shuffleId,
+ long requireBufferId,
+ long requireBlocksSize,
+ ShuffleBufferManager shuffleBufferManager,
+ PreAllocatedBufferInfo info,
+ boolean isPreAllocated) {
+ if (isPreAllocated) {
+ shuffleBufferManager.releaseMemory(info.getRequireSize(), false, true);
+ }
+ if (MapUtils.isNotEmpty(req.getPartitionToBlocks())) {
+ // release memory
+ req.getPartitionToBlocks().values().stream()
+ .flatMap(Collection::stream)
+ .forEach(block -> block.getData().release());
+ ShuffleServerMetrics.counterTotalReceivedDataSize.inc(requireBlocksSize);
+ } else {
+ LOG.error(
+ "Failed to handle send shuffle data request, no blocks found in this
request. appId: {}, shuffleId: {}, requireBufferId: {}",
+ appId,
+ shuffleId,
+ requireBufferId);
+ }
+ }
+
public void handleGetMemoryShuffleDataRequest(
TransportClient client, GetMemoryShuffleDataRequest req) {
String appId = req.getAppId();