[ 
https://issues.apache.org/jira/browse/FLINK-34526?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17821640#comment-17821640
 ] 

Xintong Song commented on FLINK-34526:
--------------------------------------

This makes sense tome.

Just to provide some backgrounds on this. Flink's JM, RM and TM coordinates 
based on an assumption that the ground truth of how TMs' resources are 
allocated for JMs lies with the TMs. Therefore, to avoid inconsistency, RM does 
not tell JM which resource (slot) from which TM is (de)allocated for it, nor 
tell JM to connect to or disconnect from a TM.

However, this IMHO might be a bit over protecting. If a TM is known for sure to 
be terminated, I don't see any problem in notifying relevant JMs earlier about 
it.

> Actively disconnect the killed TM in RM to reduce restart time
> --------------------------------------------------------------
>
>                 Key: FLINK-34526
>                 URL: https://issues.apache.org/jira/browse/FLINK-34526
>             Project: Flink
>          Issue Type: Sub-task
>            Reporter: junzhong qin
>            Assignee: junzhong qin
>            Priority: Not a Priority
>         Attachments: image-2024-02-27-15-50-25-071.png, 
> image-2024-02-27-15-50-39-337.png
>
>
> In our test case, the pipeline is:
> !image-2024-02-27-15-50-25-071.png!
>  # parallelism = 100
>  # taskmanager.numberOfTaskSlots = 2
>  # disable checkpoint
> h3. Phenomenon
> When the worker was killed at 2024-02-27 15:10:13,691
> {code:java}
> 2024-02-27 15:10:13,691 INFO  
> org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager [] - 
> Worker container_e2472_1706081484717_60538_01_000050 is terminated. 
> Diagnostics: Container container_e2472_1706081484717_60538_01_000050 marked 
> as failed. Exit code:137. Diagnostics:[2024-02-27 15:10:12.720]Container 
> killed on request. Exit code is 137[2024-02-27 15:10:12.763]Container exited 
> with a non-zero exit code 137. [2024-02-27 15:10:12.839]Killed by external 
> signal {code}
> It took about 20 seconds to restart the job.
> {code:java}
> 2024-02-27 15:10:30,749 INFO  
> org.apache.flink.runtime.executiongraph.ExecutionGraph       [] - Source: 
> datagen_source[1] -> Sink: print_sink[2] (70/100) 
> (2a1d06e6610bc499475fa6e647f8cac9_d3f21cabc6fe0fdf76c8be915bdb22a2_69_0) 
> switched from RUNNING to FAILED on 
> container_e2472_1706081484717_60538_01_000050 @ xxx 
> (dataPort=38597).org.apache.flink.runtime.jobmaster.JobMasterException: 
> TaskManager with id container_e2472_1706081484717_60538_01_000050(xxx:5454) 
> is no longer reachable.
>     at 
> org.apache.flink.runtime.jobmaster.JobMaster$TaskManagerHeartbeatListener.notifyTargetUnreachable(JobMaster.java:1515)
>     at 
> org.apache.flink.runtime.heartbeat.DefaultHeartbeatMonitor.reportHeartbeatRpcFailure(DefaultHeartbeatMonitor.java:126)
>     at 
> org.apache.flink.runtime.heartbeat.HeartbeatManagerImpl.runIfHeartbeatMonitorExists(HeartbeatManagerImpl.java:275)
>     at 
> org.apache.flink.runtime.heartbeat.HeartbeatManagerImpl.reportHeartbeatTargetUnreachable(HeartbeatManagerImpl.java:267)
>     at 
> org.apache.flink.runtime.heartbeat.HeartbeatManagerImpl.handleHeartbeatRpcFailure(HeartbeatManagerImpl.java:262)
>     at 
> org.apache.flink.runtime.heartbeat.HeartbeatManagerImpl.lambda$handleHeartbeatRpc$0(HeartbeatManagerImpl.java:248)........//
>  Deploy and run task
> 2024-02-27 15:10:32,426 INFO  
> org.apache.flink.runtime.executiongraph.ExecutionGraph       [] - Source: 
> datagen_source[1] -> Sink: print_sink[2] (70/100) 
> (2a1d06e6610bc499475fa6e647f8cac9_d3f21cabc6fe0fdf76c8be915bdb22a2_69_1) 
> switched from DEPLOYING to INITIALIZING.2024-02-27 15:10:32,427 INFO  
> org.apache.flink.runtime.executiongraph.ExecutionGraph       [] - Source: 
> datagen_source[1] -> Sink: print_sink[2] (69/100) 
> (2a1d06e6610bc499475fa6e647f8cac9_d3f21cabc6fe0fdf76c8be915bdb22a2_68_1) 
> switched from DEPLOYING to INITIALIZING.2024-02-27 15:10:33,347 INFO  
> org.apache.flink.runtime.executiongraph.ExecutionGraph       [] - Source: 
> datagen_source[1] -> Sink: print_sink[2] (70/100) 
> (2a1d06e6610bc499475fa6e647f8cac9_d3f21cabc6fe0fdf76c8be915bdb22a2_69_1) 
> switched from INITIALIZING to RUNNING.2024-02-27 15:10:33,421 INFO  
> org.apache.flink.runtime.executiongraph.ExecutionGraph       [] - Source: 
> datagen_source[1] -> Sink: print_sink[2] (69/100) 
> (2a1d06e6610bc499475fa6e647f8cac9_d3f21cabc6fe0fdf76c8be915bdb22a2_68_1) 
> switched from INITIALIZING to RUNNING. {code}
>  
> h3. Reason
> When the RM received the message the TM was killed, the JobMaster still kept 
> the connection with the killed TM.  And the JobMaster found the TM is no 
> longer reachable after about 17 seconds.
> h3. Solution:
> We can reduce the restart time by disConnectTaskManager actively in 
> ResourceManager
> {code:java}
> // class ResourceManager
> protected Optional<WorkerType> closeTaskManagerConnection(
>         final ResourceID resourceID, final Exception cause) {
>    // ....
>    // using JobManagerGateway to actively disconnect TaskManager 
>     
> workerRegistration.getTaskExecutorGateway().disconnectResourceManager(cause);
> } {code}
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to