[ 
https://issues.apache.org/jira/browse/FLINK-34528?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17821631#comment-17821631
 ] 

Xintong Song commented on FLINK-34528:
--------------------------------------

I'm not sure about introducing an option for this.

A big concern of the Flink project is that it has too many configuration 
options, many of which are too detailed and require user to be extremely 
familiar with the Flink internals in order to use. The community is working 
hard to reduce such no-one-knows-how-to-use knobs, rather than introducing more 
of them.

Pursuing ultimate performance must not come at the price of throwing every 
single decision to the users. The engine itself should learn to smartly decide 
its behavior, or we should admit that there's no good way to achieve such 
improvements.

In this particular case, if the user wants to discover the TM lost faster, 
he/she can simply decrease the heartbeat timeout.

> Disconnect TM in JM when TM was killed to further reduce the job restart time
> -----------------------------------------------------------------------------
>
>                 Key: FLINK-34528
>                 URL: https://issues.apache.org/jira/browse/FLINK-34528
>             Project: Flink
>          Issue Type: Sub-task
>            Reporter: junzhong qin
>            Assignee: junzhong qin
>            Priority: Not a Priority
>         Attachments: image-2024-02-27-16-35-04-464.png
>
>
> In https://issues.apache.org/jira/browse/FLINK-34526 we disconnect the killed 
> TM in RM. But in the following scenario, we can further reduce the restart 
> time.
> h3. Phenomenon
> In the test case, the pipeline looks like:
> !image-2024-02-27-16-35-04-464.png!
> The Source: Custom Source generates strings, and the job keyBy the strings to 
> Sink: Unnamed.
>  # parallelism = 100
>  # taskmanager.numberOfTaskSlots = 2
>  # disable checkpoint
> The worker was killed at 
> {code:java}
> 2024-02-27 16:41:49,982 INFO  
> org.apache.flink.runtime.executiongraph.ExecutionGraph       [] - Sink: 
> Unnamed (6/100) 
> (2f1c7b22098a273f5471e3e8f794e1d3_0a448493b4782967b150582570326227_5_0) 
> switched from RUNNING to FAILED on 
> container_e2472_1705993319725_62292_01_000046 @ xxx 
> (dataPort=38827).org.apache.flink.runtime.io.network.netty.exception.RemoteTransportException:
>  Connection unexpectedly closed by remote task manager 
> 'xxx/10.169.18.138:35983 [ 
> container_e2472_1705993319725_62292_01_000010(xxx:5454) ] '. This might 
> indicate that the remote task manager was lost.    at 
> org.apache.flink.runtime.io.network.netty.CreditBasedPartitionRequestClientHandler.channelInactive(CreditBasedPartitionRequestClientHandler.java:134)
>     at 
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:262)
>     at 
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:248)
>     at 
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:241)
>     at 
> org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandlerAdapter.channelInactive(ChannelInboundHandlerAdapter.java:81)
>     at 
> org.apache.flink.runtime.io.network.netty.NettyMessageClientDecoderDelegate.channelInactive(NettyMessageClientDecoderDelegate.java:94)
>     at 
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:262)
>     at 
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:248)
>     at 
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:241)
>     at 
> org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline$HeadContext.channelInactive(DefaultChannelPipeline.java:1405)
>     at 
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:262)
>     at 
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:248)
>     at 
> org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline.fireChannelInactive(DefaultChannelPipeline.java:901)
>     at 
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannel$AbstractUnsafe$7.run(AbstractChannel.java:813)
>     at 
> org.apache.flink.shaded.netty4.io.netty.util.concurrent.AbstractEventExecutor.runTask(AbstractEventExecutor.java:174)
>     at 
> org.apache.flink.shaded.netty4.io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:167)
>     at 
> org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:470)
>     at 
> org.apache.flink.shaded.netty4.io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:403)
>     at 
> org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:997)
>     at 
> org.apache.flink.shaded.netty4.io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
>     at java.lang.Thread.run(Thread.java:748) ~[?:1.8.0_252]{code}
> {code:java}
> // The task was scheduled to a task manager that had already been killed
> 2024-02-27 16:41:53,506 INFO 
> org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Deploying Source: 
> Custom Source (16/100) (attempt #3) with attempt id 
> 2f1c7b22098a273f5471e3e8f794e1d3_bc764cd8ddf7a0cff126f51c16239658_15_3 and 
> vertex id bc764cd8ddf7a0cff126f51c16239658_15 to 
> container_e2472_1705993319725_62292_01_000010 @ xxx (dataPort=35983) with 
> allocation id 975dded4548ad15b36d0e5e6aac8f5b6
> // The last task switched from INITIALIZING to RUNNING
> 2024-02-27 16:42:05,176 INFO 
> org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Sink: Unnamed 
> (64/100) 
> (2f1c7b22098a273f5471e3e8f794e1d3_0a448493b4782967b150582570326227_63_14) 
> switched from INITIALIZING to RUNNING. {code}
> In this scenario, we found the task was repeatedly scheduled to a task 
> manager which has already been killed.
> h3. Solution
> We can disconnect the killed TM actively in JobMaster when updating the task 
> execution state.
> {code:java}
> // class JobMaster
> public CompletableFuture<Acknowledge> updateTaskExecutionState(final 
> TaskExecutionState taskExecutionState) {
>     // ...
>     // Throwable error = taskExecutionState.getError(userCodeLoader);
>     // disconnectTaskManager() if the TM was killed
>     if (schedulerNG.updateTaskExecutionState(taskExecutionState)) {
>         return CompletableFuture.completedFuture(Acknowledge.get());
>     } else {
>         // ...
>     }
> } {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to