[ 
https://issues.apache.org/jira/browse/FLINK-34528?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17821159#comment-17821159
 ] 

junzhong qin commented on FLINK-34528:
--------------------------------------

Hi [~wanglijie] good question, the exception is "Connection unexpectedly closed 
by remote task manager *. This might indicate that the remote task manager was 
lost."  And it can't always mean the TM is killed or unavailable. Maybe 
providing an option to enable or disable disconnecting the TM in JM when the 
task manager is lost (enabled redundant TM) is a choice. If you have any good 
ideas, feel free to share them as well.

> Disconnect TM in JM when TM was killed to further reduce the job restart time
> -----------------------------------------------------------------------------
>
>                 Key: FLINK-34528
>                 URL: https://issues.apache.org/jira/browse/FLINK-34528
>             Project: Flink
>          Issue Type: Sub-task
>            Reporter: junzhong qin
>            Assignee: junzhong qin
>            Priority: Not a Priority
>         Attachments: image-2024-02-27-16-35-04-464.png
>
>
> In https://issues.apache.org/jira/browse/FLINK-34526 we disconnect the killed 
> TM in RM. But in the following scenario, we can further reduce the restart 
> time.
> h3. Phenomenon
> In the test case, the pipeline looks like:
> !image-2024-02-27-16-35-04-464.png!
> The Source: Custom Source generates strings, and the job keyBy the strings to 
> Sink: Unnamed.
>  # parallelism = 100
>  # taskmanager.numberOfTaskSlots = 2
>  # disable checkpoint
> The worker was killed at 
> {code:java}
> 2024-02-27 16:41:49,982 INFO  
> org.apache.flink.runtime.executiongraph.ExecutionGraph       [] - Sink: 
> Unnamed (6/100) 
> (2f1c7b22098a273f5471e3e8f794e1d3_0a448493b4782967b150582570326227_5_0) 
> switched from RUNNING to FAILED on 
> container_e2472_1705993319725_62292_01_000046 @ xxx 
> (dataPort=38827).org.apache.flink.runtime.io.network.netty.exception.RemoteTransportException:
>  Connection unexpectedly closed by remote task manager 
> 'xxx/10.169.18.138:35983 [ 
> container_e2472_1705993319725_62292_01_000010(xxx:5454) ] '. This might 
> indicate that the remote task manager was lost.    at 
> org.apache.flink.runtime.io.network.netty.CreditBasedPartitionRequestClientHandler.channelInactive(CreditBasedPartitionRequestClientHandler.java:134)
>     at 
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:262)
>     at 
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:248)
>     at 
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:241)
>     at 
> org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandlerAdapter.channelInactive(ChannelInboundHandlerAdapter.java:81)
>     at 
> org.apache.flink.runtime.io.network.netty.NettyMessageClientDecoderDelegate.channelInactive(NettyMessageClientDecoderDelegate.java:94)
>     at 
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:262)
>     at 
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:248)
>     at 
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:241)
>     at 
> org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline$HeadContext.channelInactive(DefaultChannelPipeline.java:1405)
>     at 
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:262)
>     at 
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:248)
>     at 
> org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline.fireChannelInactive(DefaultChannelPipeline.java:901)
>     at 
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannel$AbstractUnsafe$7.run(AbstractChannel.java:813)
>     at 
> org.apache.flink.shaded.netty4.io.netty.util.concurrent.AbstractEventExecutor.runTask(AbstractEventExecutor.java:174)
>     at 
> org.apache.flink.shaded.netty4.io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:167)
>     at 
> org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:470)
>     at 
> org.apache.flink.shaded.netty4.io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:403)
>     at 
> org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:997)
>     at 
> org.apache.flink.shaded.netty4.io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
>     at java.lang.Thread.run(Thread.java:748) ~[?:1.8.0_252]{code}
> {code:java}
> // The task was scheduled to a task manager that had already been killed
> 2024-02-27 16:41:53,506 INFO 
> org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Deploying Source: 
> Custom Source (16/100) (attempt #3) with attempt id 
> 2f1c7b22098a273f5471e3e8f794e1d3_bc764cd8ddf7a0cff126f51c16239658_15_3 and 
> vertex id bc764cd8ddf7a0cff126f51c16239658_15 to 
> container_e2472_1705993319725_62292_01_000010 @ xxx (dataPort=35983) with 
> allocation id 975dded4548ad15b36d0e5e6aac8f5b6
> // The last task switched from INITIALIZING to RUNNING
> 2024-02-27 16:42:05,176 INFO 
> org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Sink: Unnamed 
> (64/100) 
> (2f1c7b22098a273f5471e3e8f794e1d3_0a448493b4782967b150582570326227_63_14) 
> switched from INITIALIZING to RUNNING. {code}
> In this scenario, we found the task was repeatedly scheduled to a task 
> manager which has already been killed.
> h3. Solution
> We can disconnect the killed TM actively in JobMaster when updating the task 
> execution state.
> {code:java}
> // class JobMaster
> public CompletableFuture<Acknowledge> updateTaskExecutionState(final 
> TaskExecutionState taskExecutionState) {
>     // ...
>     // Throwable error = taskExecutionState.getError(userCodeLoader);
>     // disconnectTaskManager() if the TM was killed
>     if (schedulerNG.updateTaskExecutionState(taskExecutionState)) {
>         return CompletableFuture.completedFuture(Acknowledge.get());
>     } else {
>         // ...
>     }
> } {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to