[ 
https://issues.apache.org/jira/browse/FLINK-34528?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

junzhong qin updated FLINK-34528:
---------------------------------
    Description: 
In https://issues.apache.org/jira/browse/FLINK-34526 we disconnect the killed 
TM in RM. But in the following scenario, we can further reduce the restart time.
h3. Phenomenon

In the test case, the pipeline looks like:

!image-2024-02-27-16-35-04-464.png!

The Source: Custom Source generates strings, and the job keyBy the strings to 
Sink: Unnamed.
 # parallelism = 100
 # taskmanager.numberOfTaskSlots = 2
 # disable checkpoint

The worker was killed at 
{code:java}
2024-02-27 16:41:49,982 INFO  
org.apache.flink.runtime.executiongraph.ExecutionGraph       [] - Sink: Unnamed 
(6/100) (2f1c7b22098a273f5471e3e8f794e1d3_0a448493b4782967b150582570326227_5_0) 
switched from RUNNING to FAILED on 
container_e2472_1705993319725_62292_01_000046 @ xxx 
(dataPort=38827).org.apache.flink.runtime.io.network.netty.exception.RemoteTransportException:
 Connection unexpectedly closed by remote task manager 'xxx/10.169.18.138:35983 
[ container_e2472_1705993319725_62292_01_000010(xxx:5454) ] '. This might 
indicate that the remote task manager was lost.    at 
org.apache.flink.runtime.io.network.netty.CreditBasedPartitionRequestClientHandler.channelInactive(CreditBasedPartitionRequestClientHandler.java:134)
    at 
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:262)
    at 
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:248)
    at 
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:241)
    at 
org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandlerAdapter.channelInactive(ChannelInboundHandlerAdapter.java:81)
    at 
org.apache.flink.runtime.io.network.netty.NettyMessageClientDecoderDelegate.channelInactive(NettyMessageClientDecoderDelegate.java:94)
    at 
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:262)
    at 
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:248)
    at 
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:241)
    at 
org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline$HeadContext.channelInactive(DefaultChannelPipeline.java:1405)
    at 
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:262)
    at 
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:248)
    at 
org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline.fireChannelInactive(DefaultChannelPipeline.java:901)
    at 
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannel$AbstractUnsafe$7.run(AbstractChannel.java:813)
    at 
org.apache.flink.shaded.netty4.io.netty.util.concurrent.AbstractEventExecutor.runTask(AbstractEventExecutor.java:174)
    at 
org.apache.flink.shaded.netty4.io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:167)
    at 
org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:470)
    at 
org.apache.flink.shaded.netty4.io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:403)
    at 
org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:997)
    at 
org.apache.flink.shaded.netty4.io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
    at java.lang.Thread.run(Thread.java:748) ~[?:1.8.0_252]{code}
{code:java}
// The task was scheduled to a task manager that had already been killed
2024-02-27 16:41:53,506 INFO 
org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Deploying Source: 
Custom Source (16/100) (attempt #3) with attempt id 
2f1c7b22098a273f5471e3e8f794e1d3_bc764cd8ddf7a0cff126f51c16239658_15_3 and 
vertex id bc764cd8ddf7a0cff126f51c16239658_15 to 
container_e2472_1705993319725_62292_01_000010 @ xxx (dataPort=35983) with 
allocation id 975dded4548ad15b36d0e5e6aac8f5b6

// The last task switched from INITIALIZING to RUNNING
2024-02-27 16:42:05,176 INFO 
org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Sink: Unnamed 
(64/100) 
(2f1c7b22098a273f5471e3e8f794e1d3_0a448493b4782967b150582570326227_63_14) 
switched from INITIALIZING to RUNNING. {code}
In this scenario, we found the task was repeatedly scheduled to a task manager 
which has already been killed.
h3. Solution

We can disconnect the killed TM actively in JobMaster when updating task 
execution state.

 

  was:
In the test case, the pipeline looks like:

!image-2024-02-27-16-35-04-464.png!

The Source: Custom Source generates strings, and the job keyBy the strings to 
Sink: Unnamed.
 # parallelism = 100
 # taskmanager.numberOfTaskSlots = 2
 # disable checkpoint

The worker was killed at 
{code:java}
2024-02-27 16:41:49,982 INFO  
org.apache.flink.runtime.executiongraph.ExecutionGraph       [] - Sink: Unnamed 
(6/100) (2f1c7b22098a273f5471e3e8f794e1d3_0a448493b4782967b150582570326227_5_0) 
switched from RUNNING to FAILED on 
container_e2472_1705993319725_62292_01_000046 @ xxx 
(dataPort=38827).org.apache.flink.runtime.io.network.netty.exception.RemoteTransportException:
 Connection unexpectedly closed by remote task manager 'xxx/10.169.18.138:35983 
[ container_e2472_1705993319725_62292_01_000010(xxx:5454) ] '. This might 
indicate that the remote task manager was lost.    at 
org.apache.flink.runtime.io.network.netty.CreditBasedPartitionRequestClientHandler.channelInactive(CreditBasedPartitionRequestClientHandler.java:134)
    at 
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:262)
    at 
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:248)
    at 
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:241)
    at 
org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandlerAdapter.channelInactive(ChannelInboundHandlerAdapter.java:81)
    at 
org.apache.flink.runtime.io.network.netty.NettyMessageClientDecoderDelegate.channelInactive(NettyMessageClientDecoderDelegate.java:94)
    at 
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:262)
    at 
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:248)
    at 
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:241)
    at 
org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline$HeadContext.channelInactive(DefaultChannelPipeline.java:1405)
    at 
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:262)
    at 
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:248)
    at 
org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline.fireChannelInactive(DefaultChannelPipeline.java:901)
    at 
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannel$AbstractUnsafe$7.run(AbstractChannel.java:813)
    at 
org.apache.flink.shaded.netty4.io.netty.util.concurrent.AbstractEventExecutor.runTask(AbstractEventExecutor.java:174)
    at 
org.apache.flink.shaded.netty4.io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:167)
    at 
org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:470)
    at 
org.apache.flink.shaded.netty4.io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:403)
    at 
org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:997)
    at 
org.apache.flink.shaded.netty4.io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
    at java.lang.Thread.run(Thread.java:748) ~[?:1.8.0_252]{code}
The job took about 16 seconds to restart.
{code:java}
// The task was scheduled to a task manager that had already been killed
2024-02-27 16:41:53,506 INFO 
org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Deploying Source: 
Custom Source (16/100) (attempt #3) with attempt id 
2f1c7b22098a273f5471e3e8f794e1d3_bc764cd8ddf7a0cff126f51c16239658_15_3 and 
vertex id bc764cd8ddf7a0cff126f51c16239658_15 to 
container_e2472_1705993319725_62292_01_000010 @ xxx (dataPort=35983) with 
allocation id 975dded4548ad15b36d0e5e6aac8f5b6

// The last task switched from INITIALIZING to RUNNING
2024-02-27 16:42:05,176 INFO 
org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Sink: Unnamed 
(64/100) 
(2f1c7b22098a273f5471e3e8f794e1d3_0a448493b4782967b150582570326227_63_14) 
switched from INITIALIZING to RUNNING. {code}


> With shuffling data, when a Task Manager is killed, restarting the Flink job 
> takes a considerable amount of time
> ----------------------------------------------------------------------------------------------------------------
>
>                 Key: FLINK-34528
>                 URL: https://issues.apache.org/jira/browse/FLINK-34528
>             Project: Flink
>          Issue Type: Sub-task
>            Reporter: junzhong qin
>            Assignee: junzhong qin
>            Priority: Not a Priority
>         Attachments: image-2024-02-27-16-35-04-464.png
>
>
> In https://issues.apache.org/jira/browse/FLINK-34526 we disconnect the killed 
> TM in RM. But in the following scenario, we can further reduce the restart 
> time.
> h3. Phenomenon
> In the test case, the pipeline looks like:
> !image-2024-02-27-16-35-04-464.png!
> The Source: Custom Source generates strings, and the job keyBy the strings to 
> Sink: Unnamed.
>  # parallelism = 100
>  # taskmanager.numberOfTaskSlots = 2
>  # disable checkpoint
> The worker was killed at 
> {code:java}
> 2024-02-27 16:41:49,982 INFO  
> org.apache.flink.runtime.executiongraph.ExecutionGraph       [] - Sink: 
> Unnamed (6/100) 
> (2f1c7b22098a273f5471e3e8f794e1d3_0a448493b4782967b150582570326227_5_0) 
> switched from RUNNING to FAILED on 
> container_e2472_1705993319725_62292_01_000046 @ xxx 
> (dataPort=38827).org.apache.flink.runtime.io.network.netty.exception.RemoteTransportException:
>  Connection unexpectedly closed by remote task manager 
> 'xxx/10.169.18.138:35983 [ 
> container_e2472_1705993319725_62292_01_000010(xxx:5454) ] '. This might 
> indicate that the remote task manager was lost.    at 
> org.apache.flink.runtime.io.network.netty.CreditBasedPartitionRequestClientHandler.channelInactive(CreditBasedPartitionRequestClientHandler.java:134)
>     at 
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:262)
>     at 
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:248)
>     at 
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:241)
>     at 
> org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandlerAdapter.channelInactive(ChannelInboundHandlerAdapter.java:81)
>     at 
> org.apache.flink.runtime.io.network.netty.NettyMessageClientDecoderDelegate.channelInactive(NettyMessageClientDecoderDelegate.java:94)
>     at 
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:262)
>     at 
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:248)
>     at 
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:241)
>     at 
> org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline$HeadContext.channelInactive(DefaultChannelPipeline.java:1405)
>     at 
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:262)
>     at 
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:248)
>     at 
> org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline.fireChannelInactive(DefaultChannelPipeline.java:901)
>     at 
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannel$AbstractUnsafe$7.run(AbstractChannel.java:813)
>     at 
> org.apache.flink.shaded.netty4.io.netty.util.concurrent.AbstractEventExecutor.runTask(AbstractEventExecutor.java:174)
>     at 
> org.apache.flink.shaded.netty4.io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:167)
>     at 
> org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:470)
>     at 
> org.apache.flink.shaded.netty4.io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:403)
>     at 
> org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:997)
>     at 
> org.apache.flink.shaded.netty4.io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
>     at java.lang.Thread.run(Thread.java:748) ~[?:1.8.0_252]{code}
> {code:java}
> // The task was scheduled to a task manager that had already been killed
> 2024-02-27 16:41:53,506 INFO 
> org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Deploying Source: 
> Custom Source (16/100) (attempt #3) with attempt id 
> 2f1c7b22098a273f5471e3e8f794e1d3_bc764cd8ddf7a0cff126f51c16239658_15_3 and 
> vertex id bc764cd8ddf7a0cff126f51c16239658_15 to 
> container_e2472_1705993319725_62292_01_000010 @ xxx (dataPort=35983) with 
> allocation id 975dded4548ad15b36d0e5e6aac8f5b6
> // The last task switched from INITIALIZING to RUNNING
> 2024-02-27 16:42:05,176 INFO 
> org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Sink: Unnamed 
> (64/100) 
> (2f1c7b22098a273f5471e3e8f794e1d3_0a448493b4782967b150582570326227_63_14) 
> switched from INITIALIZING to RUNNING. {code}
> In this scenario, we found the task was repeatedly scheduled to a task 
> manager which has already been killed.
> h3. Solution
> We can disconnect the killed TM actively in JobMaster when updating task 
> execution state.
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to