SteNicholas opened a new pull request, #3154:
URL: https://github.com/apache/celeborn/pull/3154

   ### What changes were proposed in this pull request?
   
   `ReadClientHandler` should support reconnection for 
`ChannelInboundHandler#channelInactive`.
   
   ### Why are the changes needed?
   
   `ReadClientHandler` should support reconnection for 
ChannelInboundHandler#channelInactive to avoid Flink subtask failover because 
of the below exception:
   
   ```
   2025-03-10 15:20:54
   java.io.IOException: Client /11.35.44.216:9093 is lost, notify related 
stream 152545797850
        at 
org.apache.celeborn.plugin.flink.RemoteBufferStreamReader.errorReceived(RemoteBufferStreamReader.java:146)
        at 
org.apache.celeborn.plugin.flink.RemoteBufferStreamReader.lambda$new$0(RemoteBufferStreamReader.java:77)
        at 
org.apache.celeborn.plugin.flink.network.ReadClientHandler.processMessageInternal(ReadClientHandler.java:64)
        at 
org.apache.celeborn.plugin.flink.network.ReadClientHandler.lambda$channelInactive$0(ReadClientHandler.java:145)
        at 
java.base/java.util.concurrent.ConcurrentHashMap.forEach(ConcurrentHashMap.java:1603)
        at 
org.apache.celeborn.plugin.flink.network.ReadClientHandler.channelInactive(ReadClientHandler.java:136)
        at 
org.apache.celeborn.common.network.server.TransportRequestHandler.channelInactive(TransportRequestHandler.java:74)
        at 
org.apache.celeborn.common.network.server.TransportChannelHandler.channelInactive(TransportChannelHandler.java:141)
        at 
org.apache.celeborn.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:303)
        at 
org.apache.celeborn.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:281)
        at 
org.apache.celeborn.shaded.io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:274)
        at 
org.apache.celeborn.shaded.io.netty.channel.ChannelInboundHandlerAdapter.channelInactive(ChannelInboundHandlerAdapter.java:81)
        at 
org.apache.celeborn.shaded.io.netty.handler.timeout.IdleStateHandler.channelInactive(IdleStateHandler.java:280)
        at 
org.apache.celeborn.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:303)
        at 
org.apache.celeborn.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:281)
        at 
org.apache.celeborn.shaded.io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:274)
        at 
org.apache.celeborn.shaded.io.netty.channel.ChannelInboundHandlerAdapter.channelInactive(ChannelInboundHandlerAdapter.java:81)
        at 
org.apache.celeborn.plugin.flink.network.TransportFrameDecoderWithBufferSupplier.channelInactive(TransportFrameDecoderWithBufferSupplier.java:207)
        at 
org.apache.celeborn.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:303)
        at 
org.apache.celeborn.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:281)
        at 
org.apache.celeborn.shaded.io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:274)
        at 
org.apache.celeborn.shaded.io.netty.channel.DefaultChannelPipeline$HeadContext.channelInactive(DefaultChannelPipeline.java:1405)
        at 
org.apache.celeborn.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:301)
        at 
org.apache.celeborn.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:281)
        at 
org.apache.celeborn.shaded.io.netty.channel.DefaultChannelPipeline.fireChannelInactive(DefaultChannelPipeline.java:901)
        at 
org.apache.celeborn.shaded.io.netty.channel.AbstractChannel$AbstractUnsafe$7.run(AbstractChannel.java:813)
        at 
org.apache.celeborn.shaded.io.netty.util.concurrent.AbstractEventExecutor.runTask(AbstractEventExecutor.java:173)
        at 
org.apache.celeborn.shaded.io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:166)
        at 
org.apache.celeborn.shaded.io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:470)
        at 
org.apache.celeborn.shaded.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:566)
        at 
org.apache.celeborn.shaded.io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:997)
        at 
org.apache.celeborn.shaded.io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
        at 
org.apache.celeborn.shaded.io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
        at java.base/java.lang.Thread.run(Thread.java:991)
   ```
   
   ### Does this PR introduce _any_ user-facing change?
   
   Introduce `celeborn.<module>.reconnect.maxRetries` and 
`celeborn.<module>.reconnect.retryWait` to support reconnection of client.
   
   ### How was this patch tested?
   
   `TransportClientFactorySuiteJ#testChannelInactiveReconnect `


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to