SteNicholas opened a new pull request, #3239:
URL: https://github.com/apache/celeborn/pull/3239

   ### What changes were proposed in this pull request?
   
   Client should send heartbeat to worker for processing heartbeat to avoid 
reading idleness of worker which enables heartbeat.
   
   Follow up #1457.
   
   ### Why are the changes needed?
   
   In Flink batch jobs, the following exception is caused by closed connection:
   ```
   2025-04-27 23:30:28
   java.io.IOException: Client /:9093 is lost, notify related stream 
805472050177
        at 
org.apache.celeborn.plugin.flink.RemoteBufferStreamReader.errorReceived(RemoteBufferStreamReader.java:146)
        at 
org.apache.celeborn.plugin.flink.RemoteBufferStreamReader.lambda$new$0(RemoteBufferStreamReader.java:77)
        at 
org.apache.celeborn.plugin.flink.network.ReadClientHandler.processMessageInternal(ReadClientHandler.java:64)
        at 
org.apache.celeborn.plugin.flink.network.ReadClientHandler.lambda$channelInactive$0(ReadClientHandler.java:145)
        at 
java.base/java.util.concurrent.ConcurrentHashMap.forEach(ConcurrentHashMap.java:1603)
        at 
org.apache.celeborn.plugin.flink.network.ReadClientHandler.channelInactive(ReadClientHandler.java:136)
        at 
org.apache.celeborn.common.network.server.TransportRequestHandler.channelInactive(TransportRequestHandler.java:74)
        at 
org.apache.celeborn.common.network.server.TransportChannelHandler.channelInactive(TransportChannelHandler.java:141)
        at 
org.apache.celeborn.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:303)
        at 
org.apache.celeborn.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:281)
        at 
org.apache.celeborn.shaded.io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:274)
        at 
org.apache.celeborn.shaded.io.netty.channel.ChannelInboundHandlerAdapter.channelInactive(ChannelInboundHandlerAdapter.java:81)
        at 
org.apache.celeborn.common.network.client.ReconnectHandler.scheduleReconnect(ReconnectHandler.java:93)
        at 
org.apache.celeborn.common.network.client.ReconnectHandler.channelInactive(ReconnectHandler.java:63)
        at 
org.apache.celeborn.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:303)
        at 
org.apache.celeborn.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:281)
        at 
org.apache.celeborn.shaded.io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:274)
        at 
org.apache.celeborn.shaded.io.netty.channel.ChannelInboundHandlerAdapter.channelInactive(ChannelInboundHandlerAdapter.java:81)
        at 
org.apache.celeborn.shaded.io.netty.handler.timeout.IdleStateHandler.channelInactive(IdleStateHandler.java:280)
        at 
org.apache.celeborn.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:303)
        at 
org.apache.celeborn.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:281)
        at 
org.apache.celeborn.shaded.io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:274)
        at 
org.apache.celeborn.shaded.io.netty.channel.ChannelInboundHandlerAdapter.channelInactive(ChannelInboundHandlerAdapter.java:81)
        at 
org.apache.celeborn.plugin.flink.network.TransportFrameDecoderWithBufferSupplier.channelInactive(TransportFrameDecoderWithBufferSupplier.java:207)
        at 
org.apache.celeborn.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:303)
        at 
org.apache.celeborn.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:281)
        at 
org.apache.celeborn.shaded.io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:274)
        at 
org.apache.celeborn.shaded.io.netty.channel.DefaultChannelPipeline$HeadContext.channelInactive(DefaultChannelPipeline.java:1405)
        at 
org.apache.celeborn.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:301)
        at 
org.apache.celeborn.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:281)
        at 
org.apache.celeborn.shaded.io.netty.channel.DefaultChannelPipeline.fireChannelInactive(DefaultChannelPipeline.java:901)
        at 
org.apache.celeborn.shaded.io.netty.channel.AbstractChannel$AbstractUnsafe$7.run(AbstractChannel.java:813)
        at 
org.apache.celeborn.shaded.io.netty.util.concurrent.AbstractEventExecutor.runTask(AbstractEventExecutor.java:173)
        at 
org.apache.celeborn.shaded.io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:166)
        at 
org.apache.celeborn.shaded.io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:470)
        at 
org.apache.celeborn.shaded.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:566)
        at 
org.apache.celeborn.shaded.io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:997)
        at 
org.apache.celeborn.shaded.io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
        at 
org.apache.celeborn.shaded.io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
        at java.base/java.lang.Thread.run(Thread.java:991)
   ```
   The closed connection is caused by reading idleness of worker which enables 
heartbeat with troubleshooting via debug mode of log.
   ```
   2025-04-27 23:30:32,341 [fetch-server-11-5] DEBUG util.NettyLogger - [id: 
0x2dc85987, L:/:9093 - R:/33.133.79.187:44862] WRITE: MessageWithHeader 
[headerLength: 17, bodyLength: 26]
   2025-04-27 23:30:32,341 [fetch-server-11-5] DEBUG util.NettyLogger - [id: 
0x2dc85987, L:/:9093 - R:/33.133.79.187:44862] FLUSH
   2025-04-27 23:30:32,380 [fetch-server-11-5] DEBUG util.NettyLogger - [id: 
0x2dc85987, L:/:9093 - R:/33.133.79.187:44862] READ 38B
   2025-04-27 23:30:32,380 [fetch-server-11-5] DEBUG util.NettyLogger - [id: 
0x2dc85987, L:/:9093 - R:/33.133.79.187:44862] READ COMPLETE
   2025-04-27 23:31:31,813 [fetch-server-11-5] DEBUG util.NettyLogger - [id: 
0x2dc85987, L:/:9093 - R:/33.133.79.187:44862] WRITE 10B
   2025-04-27 23:31:31,813 [fetch-server-11-5] DEBUG util.NettyLogger - [id: 
0x2dc85987, L:/:9093 - R:/33.133.79.187:44862] FLUSH
   2025-04-27 23:32:31,823 [fetch-server-11-5] DEBUG util.NettyLogger - [id: 
0x2dc85987, L:/:9093 - R:/33.133.79.187:44862] WRITE 10B
   2025-04-27 23:32:31,824 [fetch-server-11-5] DEBUG util.NettyLogger - [id: 
0x2dc85987, L:/:9093 - R:/33.133.79.187:44862] FLUSH
   2025-04-27 23:33:31,826 [fetch-server-11-5] DEBUG util.NettyLogger - [id: 
0x2dc85987, L:/:9093 - R:/33.133.79.187:44862] WRITE 10B
   2025-04-27 23:33:31,826 [fetch-server-11-5] DEBUG util.NettyLogger - [id: 
0x2dc85987, L:/:9093 - R:/33.133.79.187:44862] FLUSH
   2025-04-27 23:34:31,826 [fetch-server-11-5] DEBUG util.NettyLogger - [id: 
0x2dc85987, L:/:9093 - R:/33.133.79.187:44862] WRITE 10B
   2025-04-27 23:34:31,826 [fetch-server-11-5] DEBUG util.NettyLogger - [id: 
0x2dc85987, L:/:9093 - R:/33.133.79.187:44862] FLUSH
   2025-04-27 23:34:32,380 [fetch-server-11-5] DEBUG util.NettyLogger - [id: 
0x2dc85987, L:/:9093 - R:/33.133.79.187:44862] CLOSE
   2025-04-27 23:34:32,380 [fetch-server-11-5] DEBUG util.NettyLogger - [id: 
0x2dc85987, L:/:9093 ! R:/33.133.79.187:44862] INACTIVE
   2025-04-27 23:34:32,380 [fetch-server-11-5] DEBUG util.NettyLogger - [id: 
0x2dc85987, L:/:9093 ! R:/33.133.79.187:44862] UNREGISTERED
   ```
   The reading idleness of worker which enables heartbeat is resulted via 
one-way heartbeat from worker to client, which only keeps the channel of client 
active. Client should handle heartbeat to keep the channel of worker active via 
sending heartbeat to worker.
   
   ### Does this PR introduce _any_ user-facing change?
   
   No.
   
   ### How was this patch tested?
   
   `HeartbeatTest`
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to