This is an automated email from the ASF dual-hosted git repository.
rexxiong pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/celeborn.git
The following commit(s) were added to refs/heads/main by this push:
new f09482108 [CELEBORN-1867][FLINK] Fix flink client memory leak of
TransportResponseHandler#outstandingRpcs for handling addCredit and
notifyRequiredSegment response
f09482108 is described below
commit f09482108fe91f986b9cfceebd4b050379bf6f64
Author: codenohup <[email protected]>
AuthorDate: Fri Feb 21 13:50:54 2025 +0800
[CELEBORN-1867][FLINK] Fix flink client memory leak of
TransportResponseHandler#outstandingRpcs for handling addCredit and
notifyRequiredSegment response
### What changes were proposed in this pull request?
When I tested the performance of Flink with Celeborn in session mode, using
both the shuffle-service plugin and hybrid shuffle integration strategies, I
noticed that the task heap continuously grew even when no jobs were running.
The issue arises because the Celeborn client sends addCredit or
notifyRequiredSegment requests, expecting a response. This creates a callback
and maintains a reference to CelebornBufferStream, SingleInputGate, and
StreamTask.
These callbacks are stored in TransportResponseHandler#outstandingRpcs and
are cleared upon receiving a response.
However, the Worker does not send a response for these two RPC requests,
which leads to a significant memory leak in the client.
### Why are the changes needed?
It may cause flink client memory leak.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Test by rerun TPCDS in Flink session mode.
Closes #3103 from codenohup/celeborn-1867.
Authored-by: codenohup <[email protected]>
Signed-off-by: Shuang <[email protected]>
---
.../service/deploy/worker/FetchHandler.scala | 30 +++++++++++++++++-----
1 file changed, 24 insertions(+), 6 deletions(-)
diff --git
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/FetchHandler.scala
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/FetchHandler.scala
index 3c99639eb..c58c535fc 100644
---
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/FetchHandler.scala
+++
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/FetchHandler.scala
@@ -32,7 +32,7 @@ import
org.apache.celeborn.common.CelebornConf.MAX_CHUNKS_BEING_TRANSFERRED
import org.apache.celeborn.common.exception.CelebornIOException
import org.apache.celeborn.common.internal.Logging
import org.apache.celeborn.common.meta.{DiskFileInfo, FileInfo, MapFileMeta,
MemoryFileInfo, ReduceFileMeta}
-import org.apache.celeborn.common.network.buffer.{FileChunkBuffers,
MemoryChunkBuffers, NioManagedBuffer}
+import org.apache.celeborn.common.network.buffer.{FileChunkBuffers,
MemoryChunkBuffers, NettyManagedBuffer, NioManagedBuffer}
import org.apache.celeborn.common.network.client.{RpcResponseCallback,
TransportClient}
import org.apache.celeborn.common.network.protocol._
import org.apache.celeborn.common.network.server.BaseMessageHandler
@@ -103,7 +103,7 @@ class FetchHandler(
case r: BufferStreamEnd =>
handleEndStreamFromClient(client, r.getStreamId)
case r: ReadAddCredit =>
- handleReadAddCredit(client, r.getCredit, r.getStreamId)
+ handleReadAddCredit(client, r.getCredit, r.getStreamId, -1)
case r: ChunkFetchRequest =>
handleChunkFetchRequest(client, r.streamChunkSlice, r)
case unknown: RequestMessage =>
@@ -170,13 +170,18 @@ class FetchHandler(
bufferStreamEnd.getStreamId,
bufferStreamEnd.getStreamType)
case readAddCredit: PbReadAddCredit =>
- handleReadAddCredit(client, readAddCredit.getCredit,
readAddCredit.getStreamId)
+ handleReadAddCredit(
+ client,
+ readAddCredit.getCredit,
+ readAddCredit.getStreamId,
+ rpcRequest.requestId)
case notifyRequiredSegment: PbNotifyRequiredSegment =>
handleNotifyRequiredSegment(
client,
notifyRequiredSegment.getRequiredSegmentId,
notifyRequiredSegment.getStreamId,
- notifyRequiredSegment.getSubPartitionId)
+ notifyRequiredSegment.getSubPartitionId,
+ rpcRequest.requestId)
case chunkFetchRequest: PbChunkFetchRequest =>
handleChunkFetchRequest(
client,
@@ -480,13 +485,22 @@ class FetchHandler(
}
}
- def handleReadAddCredit(client: TransportClient, credit: Int, streamId:
Long): Unit = {
+ def handleReadAddCredit(
+ client: TransportClient,
+ credit: Int,
+ streamId: Long,
+ requestId: Long): Unit = {
val shuffleKey = creditStreamManager.getStreamShuffleKey(streamId)
if (shuffleKey != null) {
workerSource.recordAppActiveConnection(
client,
shuffleKey)
creditStreamManager.addCredit(credit, streamId)
+ if (requestId != -1) {
+ client.getChannel.writeAndFlush(new RpcResponse(
+ requestId,
+ NettyManagedBuffer.EmptyBuffer))
+ }
}
}
@@ -494,7 +508,8 @@ class FetchHandler(
client: TransportClient,
requiredSegmentId: Int,
streamId: Long,
- subPartitionId: Int): Unit = {
+ subPartitionId: Int,
+ requestId: Long): Unit = {
// process NotifyRequiredSegment request here, the MapPartitionDataReader
will send data if the segment buffer is available.
logDebug(
s"Handle NotifyRequiredSegment with streamId: $streamId,
requiredSegmentId: $requiredSegmentId")
@@ -504,6 +519,9 @@ class FetchHandler(
client,
shuffleKey)
creditStreamManager.notifyRequiredSegment(requiredSegmentId, streamId,
subPartitionId)
+ client.getChannel.writeAndFlush(new RpcResponse(
+ requestId,
+ NettyManagedBuffer.EmptyBuffer))
}
}