This is an automated email from the ASF dual-hosted git repository.

rexxiong pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/celeborn.git


The following commit(s) were added to refs/heads/main by this push:
     new f09482108 [CELEBORN-1867][FLINK] Fix flink client memory leak of 
TransportResponseHandler#outstandingRpcs for handling addCredit and 
notifyRequiredSegment response
f09482108 is described below

commit f09482108fe91f986b9cfceebd4b050379bf6f64
Author: codenohup <[email protected]>
AuthorDate: Fri Feb 21 13:50:54 2025 +0800

    [CELEBORN-1867][FLINK] Fix flink client memory leak of 
TransportResponseHandler#outstandingRpcs for handling addCredit and 
notifyRequiredSegment response
    
    ### What changes were proposed in this pull request?
    
    When I tested the performance of Flink with Celeborn in session mode, using 
both the shuffle-service plugin and hybrid shuffle integration strategies, I 
noticed that the task heap continuously grew even when no jobs were running.
    
    The issue arises because the Celeborn client sends addCredit or 
notifyRequiredSegment requests, expecting a response. This creates a callback 
and maintains a reference to CelebornBufferStream, SingleInputGate, and 
StreamTask.
    These callbacks are stored in TransportResponseHandler#outstandingRpcs and 
are cleared upon receiving a response.
    
    However, the Worker does not send a response for these two RPC requests, 
which leads to a significant memory leak in the client.
    
    ### Why are the changes needed?
    It may cause flink client memory leak.
    
    ### Does this PR introduce _any_ user-facing change?
    No.
    
    ### How was this patch tested?
    Test by rerun TPCDS in Flink session mode.
    
    Closes #3103 from codenohup/celeborn-1867.
    
    Authored-by: codenohup <[email protected]>
    Signed-off-by: Shuang <[email protected]>
---
 .../service/deploy/worker/FetchHandler.scala       | 30 +++++++++++++++++-----
 1 file changed, 24 insertions(+), 6 deletions(-)

diff --git 
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/FetchHandler.scala
 
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/FetchHandler.scala
index 3c99639eb..c58c535fc 100644
--- 
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/FetchHandler.scala
+++ 
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/FetchHandler.scala
@@ -32,7 +32,7 @@ import 
org.apache.celeborn.common.CelebornConf.MAX_CHUNKS_BEING_TRANSFERRED
 import org.apache.celeborn.common.exception.CelebornIOException
 import org.apache.celeborn.common.internal.Logging
 import org.apache.celeborn.common.meta.{DiskFileInfo, FileInfo, MapFileMeta, 
MemoryFileInfo, ReduceFileMeta}
-import org.apache.celeborn.common.network.buffer.{FileChunkBuffers, 
MemoryChunkBuffers, NioManagedBuffer}
+import org.apache.celeborn.common.network.buffer.{FileChunkBuffers, 
MemoryChunkBuffers, NettyManagedBuffer, NioManagedBuffer}
 import org.apache.celeborn.common.network.client.{RpcResponseCallback, 
TransportClient}
 import org.apache.celeborn.common.network.protocol._
 import org.apache.celeborn.common.network.server.BaseMessageHandler
@@ -103,7 +103,7 @@ class FetchHandler(
       case r: BufferStreamEnd =>
         handleEndStreamFromClient(client, r.getStreamId)
       case r: ReadAddCredit =>
-        handleReadAddCredit(client, r.getCredit, r.getStreamId)
+        handleReadAddCredit(client, r.getCredit, r.getStreamId, -1)
       case r: ChunkFetchRequest =>
         handleChunkFetchRequest(client, r.streamChunkSlice, r)
       case unknown: RequestMessage =>
@@ -170,13 +170,18 @@ class FetchHandler(
           bufferStreamEnd.getStreamId,
           bufferStreamEnd.getStreamType)
       case readAddCredit: PbReadAddCredit =>
-        handleReadAddCredit(client, readAddCredit.getCredit, 
readAddCredit.getStreamId)
+        handleReadAddCredit(
+          client,
+          readAddCredit.getCredit,
+          readAddCredit.getStreamId,
+          rpcRequest.requestId)
       case notifyRequiredSegment: PbNotifyRequiredSegment =>
         handleNotifyRequiredSegment(
           client,
           notifyRequiredSegment.getRequiredSegmentId,
           notifyRequiredSegment.getStreamId,
-          notifyRequiredSegment.getSubPartitionId)
+          notifyRequiredSegment.getSubPartitionId,
+          rpcRequest.requestId)
       case chunkFetchRequest: PbChunkFetchRequest =>
         handleChunkFetchRequest(
           client,
@@ -480,13 +485,22 @@ class FetchHandler(
     }
   }
 
-  def handleReadAddCredit(client: TransportClient, credit: Int, streamId: 
Long): Unit = {
+  def handleReadAddCredit(
+      client: TransportClient,
+      credit: Int,
+      streamId: Long,
+      requestId: Long): Unit = {
     val shuffleKey = creditStreamManager.getStreamShuffleKey(streamId)
     if (shuffleKey != null) {
       workerSource.recordAppActiveConnection(
         client,
         shuffleKey)
       creditStreamManager.addCredit(credit, streamId)
+      if (requestId != -1) {
+        client.getChannel.writeAndFlush(new RpcResponse(
+          requestId,
+          NettyManagedBuffer.EmptyBuffer))
+      }
     }
   }
 
@@ -494,7 +508,8 @@ class FetchHandler(
       client: TransportClient,
       requiredSegmentId: Int,
       streamId: Long,
-      subPartitionId: Int): Unit = {
+      subPartitionId: Int,
+      requestId: Long): Unit = {
     // process NotifyRequiredSegment request here, the MapPartitionDataReader 
will send data if the segment buffer is available.
     logDebug(
       s"Handle NotifyRequiredSegment with streamId: $streamId, 
requiredSegmentId: $requiredSegmentId")
@@ -504,6 +519,9 @@ class FetchHandler(
         client,
         shuffleKey)
       creditStreamManager.notifyRequiredSegment(requiredSegmentId, streamId, 
subPartitionId)
+      client.getChannel.writeAndFlush(new RpcResponse(
+        requestId,
+        NettyManagedBuffer.EmptyBuffer))
     }
   }
 

Reply via email to