[ 
https://issues.apache.org/jira/browse/FLINK-34526?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

junzhong qin updated FLINK-34526:
---------------------------------
    Description: 
In our test case, the pipeline is:

!image-2024-02-27-15-50-25-071.png!
 # parallelism = 100
 # taskmanager.numberOfTaskSlots = 2
 # disable checkpoint

h3. Phenomenon

When the worker was killed at 2024-02-27 15:10:13,691
{code:java}
2024-02-27 15:10:13,691 INFO  
org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager [] - 
Worker container_e2472_1706081484717_60538_01_000050 is terminated. 
Diagnostics: Container container_e2472_1706081484717_60538_01_000050 marked as 
failed. Exit code:137. Diagnostics:[2024-02-27 15:10:12.720]Container killed on 
request. Exit code is 137[2024-02-27 15:10:12.763]Container exited with a 
non-zero exit code 137. [2024-02-27 15:10:12.839]Killed by external signal 
{code}
It took about 20 seconds to restart the job.
{code:java}
2024-02-27 15:10:30,749 INFO  
org.apache.flink.runtime.executiongraph.ExecutionGraph       [] - Source: 
datagen_source[1] -> Sink: print_sink[2] (70/100) 
(2a1d06e6610bc499475fa6e647f8cac9_d3f21cabc6fe0fdf76c8be915bdb22a2_69_0) 
switched from RUNNING to FAILED on 
container_e2472_1706081484717_60538_01_000050 @ xxx 
(dataPort=38597).org.apache.flink.runtime.jobmaster.JobMasterException: 
TaskManager with id container_e2472_1706081484717_60538_01_000050(xxx:5454) is 
no longer reachable.
    at 
org.apache.flink.runtime.jobmaster.JobMaster$TaskManagerHeartbeatListener.notifyTargetUnreachable(JobMaster.java:1515)
    at 
org.apache.flink.runtime.heartbeat.DefaultHeartbeatMonitor.reportHeartbeatRpcFailure(DefaultHeartbeatMonitor.java:126)
    at 
org.apache.flink.runtime.heartbeat.HeartbeatManagerImpl.runIfHeartbeatMonitorExists(HeartbeatManagerImpl.java:275)
    at 
org.apache.flink.runtime.heartbeat.HeartbeatManagerImpl.reportHeartbeatTargetUnreachable(HeartbeatManagerImpl.java:267)
    at 
org.apache.flink.runtime.heartbeat.HeartbeatManagerImpl.handleHeartbeatRpcFailure(HeartbeatManagerImpl.java:262)
    at 
org.apache.flink.runtime.heartbeat.HeartbeatManagerImpl.lambda$handleHeartbeatRpc$0(HeartbeatManagerImpl.java:248)........//
 Deploy and run task
2024-02-27 15:10:32,426 INFO  
org.apache.flink.runtime.executiongraph.ExecutionGraph       [] - Source: 
datagen_source[1] -> Sink: print_sink[2] (70/100) 
(2a1d06e6610bc499475fa6e647f8cac9_d3f21cabc6fe0fdf76c8be915bdb22a2_69_1) 
switched from DEPLOYING to INITIALIZING.2024-02-27 15:10:32,427 INFO  
org.apache.flink.runtime.executiongraph.ExecutionGraph       [] - Source: 
datagen_source[1] -> Sink: print_sink[2] (69/100) 
(2a1d06e6610bc499475fa6e647f8cac9_d3f21cabc6fe0fdf76c8be915bdb22a2_68_1) 
switched from DEPLOYING to INITIALIZING.2024-02-27 15:10:33,347 INFO  
org.apache.flink.runtime.executiongraph.ExecutionGraph       [] - Source: 
datagen_source[1] -> Sink: print_sink[2] (70/100) 
(2a1d06e6610bc499475fa6e647f8cac9_d3f21cabc6fe0fdf76c8be915bdb22a2_69_1) 
switched from INITIALIZING to RUNNING.2024-02-27 15:10:33,421 INFO  
org.apache.flink.runtime.executiongraph.ExecutionGraph       [] - Source: 
datagen_source[1] -> Sink: print_sink[2] (69/100) 
(2a1d06e6610bc499475fa6e647f8cac9_d3f21cabc6fe0fdf76c8be915bdb22a2_68_1) 
switched from INITIALIZING to RUNNING. {code}
 
h3. Reason

When the RM received the message the TM was killed, the JobMaster still kept 
the connection with the killed TM.  And the JobMaster found the TM is no longer 
reachable after about 17 seconds.
h3. Solution:

We can reduce the restart time by disConnectTaskManager actively in 
ResourceManager
{code:java}
// class ResourceManager
protected Optional<WorkerType> closeTaskManagerConnection(
        final ResourceID resourceID, final Exception cause) {
   // ....
   // using JobManagerGateway to actively disconnect TaskManager 
    
workerRegistration.getTaskExecutorGateway().disconnectResourceManager(cause);
} {code}
 

 

  was:
In our test case, the pipeline is:

!image-2024-02-27-15-50-25-071.png!
 # parallelism = 100
 # taskmanager.numberOfTaskSlots = 2
 # disable checkpoint

h3. Phenomenon

When the worker was killed at 2024-02-27 15:10:13,691
{code:java}
2024-02-27 15:10:13,691 INFO  
org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager [] - 
Worker container_e2472_1706081484717_60538_01_000050 is terminated. 
Diagnostics: Container container_e2472_1706081484717_60538_01_000050 marked as 
failed. Exit code:137. Diagnostics:[2024-02-27 15:10:12.720]Container killed on 
request. Exit code is 137[2024-02-27 15:10:12.763]Container exited with a 
non-zero exit code 137. [2024-02-27 15:10:12.839]Killed by external signal 
{code}
It took about 20 seconds to restart the job.
{code:java}
2024-02-27 15:10:30,749 INFO  
org.apache.flink.runtime.executiongraph.ExecutionGraph       [] - Source: 
datagen_source[1] -> Sink: print_sink[2] (70/100) 
(2a1d06e6610bc499475fa6e647f8cac9_d3f21cabc6fe0fdf76c8be915bdb22a2_69_0) 
switched from RUNNING to FAILED on 
container_e2472_1706081484717_60538_01_000050 @ xxx 
(dataPort=38597).org.apache.flink.runtime.jobmaster.JobMasterException: 
TaskManager with id container_e2472_1706081484717_60538_01_000050(xxx:5454) is 
no longer reachable.
    at 
org.apache.flink.runtime.jobmaster.JobMaster$TaskManagerHeartbeatListener.notifyTargetUnreachable(JobMaster.java:1515)
    at 
org.apache.flink.runtime.heartbeat.DefaultHeartbeatMonitor.reportHeartbeatRpcFailure(DefaultHeartbeatMonitor.java:126)
    at 
org.apache.flink.runtime.heartbeat.HeartbeatManagerImpl.runIfHeartbeatMonitorExists(HeartbeatManagerImpl.java:275)
    at 
org.apache.flink.runtime.heartbeat.HeartbeatManagerImpl.reportHeartbeatTargetUnreachable(HeartbeatManagerImpl.java:267)
    at 
org.apache.flink.runtime.heartbeat.HeartbeatManagerImpl.handleHeartbeatRpcFailure(HeartbeatManagerImpl.java:262)
    at 
org.apache.flink.runtime.heartbeat.HeartbeatManagerImpl.lambda$handleHeartbeatRpc$0(HeartbeatManagerImpl.java:248)........//
 Deploy and run task
2024-02-27 15:10:32,426 INFO  
org.apache.flink.runtime.executiongraph.ExecutionGraph       [] - Source: 
datagen_source[1] -> Sink: print_sink[2] (70/100) 
(2a1d06e6610bc499475fa6e647f8cac9_d3f21cabc6fe0fdf76c8be915bdb22a2_69_1) 
switched from DEPLOYING to INITIALIZING.2024-02-27 15:10:32,427 INFO  
org.apache.flink.runtime.executiongraph.ExecutionGraph       [] - Source: 
datagen_source[1] -> Sink: print_sink[2] (69/100) 
(2a1d06e6610bc499475fa6e647f8cac9_d3f21cabc6fe0fdf76c8be915bdb22a2_68_1) 
switched from DEPLOYING to INITIALIZING.2024-02-27 15:10:33,347 INFO  
org.apache.flink.runtime.executiongraph.ExecutionGraph       [] - Source: 
datagen_source[1] -> Sink: print_sink[2] (70/100) 
(2a1d06e6610bc499475fa6e647f8cac9_d3f21cabc6fe0fdf76c8be915bdb22a2_69_1) 
switched from INITIALIZING to RUNNING.2024-02-27 15:10:33,421 INFO  
org.apache.flink.runtime.executiongraph.ExecutionGraph       [] - Source: 
datagen_source[1] -> Sink: print_sink[2] (69/100) 
(2a1d06e6610bc499475fa6e647f8cac9_d3f21cabc6fe0fdf76c8be915bdb22a2_68_1) 
switched from INITIALIZING to RUNNING. {code}
 
h3. Reason

When the RM received the message the TM was killed, the JobMaster still keep 
the connection with the killed TM.
h3. Solution:

We can fix this by disConnectTaskManager in ResourceManager

 
{code:java}
protected Optional<WorkerType> closeTaskManagerConnection(
        final ResourceID resourceID, final Exception cause) {
   // ....
   // using JobManagerGateway disconnect TaskManager
    
workerRegistration.getTaskExecutorGateway().disconnectResourceManager(cause);
} {code}
 

 


> Without shuffling data, when a Task Manager is killed, restarting the Flink 
> job takes a considerable amount of time
> -------------------------------------------------------------------------------------------------------------------
>
>                 Key: FLINK-34526
>                 URL: https://issues.apache.org/jira/browse/FLINK-34526
>             Project: Flink
>          Issue Type: Sub-task
>            Reporter: junzhong qin
>            Assignee: junzhong qin
>            Priority: Not a Priority
>         Attachments: image-2024-02-27-15-50-25-071.png, 
> image-2024-02-27-15-50-39-337.png
>
>
> In our test case, the pipeline is:
> !image-2024-02-27-15-50-25-071.png!
>  # parallelism = 100
>  # taskmanager.numberOfTaskSlots = 2
>  # disable checkpoint
> h3. Phenomenon
> When the worker was killed at 2024-02-27 15:10:13,691
> {code:java}
> 2024-02-27 15:10:13,691 INFO  
> org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager [] - 
> Worker container_e2472_1706081484717_60538_01_000050 is terminated. 
> Diagnostics: Container container_e2472_1706081484717_60538_01_000050 marked 
> as failed. Exit code:137. Diagnostics:[2024-02-27 15:10:12.720]Container 
> killed on request. Exit code is 137[2024-02-27 15:10:12.763]Container exited 
> with a non-zero exit code 137. [2024-02-27 15:10:12.839]Killed by external 
> signal {code}
> It took about 20 seconds to restart the job.
> {code:java}
> 2024-02-27 15:10:30,749 INFO  
> org.apache.flink.runtime.executiongraph.ExecutionGraph       [] - Source: 
> datagen_source[1] -> Sink: print_sink[2] (70/100) 
> (2a1d06e6610bc499475fa6e647f8cac9_d3f21cabc6fe0fdf76c8be915bdb22a2_69_0) 
> switched from RUNNING to FAILED on 
> container_e2472_1706081484717_60538_01_000050 @ xxx 
> (dataPort=38597).org.apache.flink.runtime.jobmaster.JobMasterException: 
> TaskManager with id container_e2472_1706081484717_60538_01_000050(xxx:5454) 
> is no longer reachable.
>     at 
> org.apache.flink.runtime.jobmaster.JobMaster$TaskManagerHeartbeatListener.notifyTargetUnreachable(JobMaster.java:1515)
>     at 
> org.apache.flink.runtime.heartbeat.DefaultHeartbeatMonitor.reportHeartbeatRpcFailure(DefaultHeartbeatMonitor.java:126)
>     at 
> org.apache.flink.runtime.heartbeat.HeartbeatManagerImpl.runIfHeartbeatMonitorExists(HeartbeatManagerImpl.java:275)
>     at 
> org.apache.flink.runtime.heartbeat.HeartbeatManagerImpl.reportHeartbeatTargetUnreachable(HeartbeatManagerImpl.java:267)
>     at 
> org.apache.flink.runtime.heartbeat.HeartbeatManagerImpl.handleHeartbeatRpcFailure(HeartbeatManagerImpl.java:262)
>     at 
> org.apache.flink.runtime.heartbeat.HeartbeatManagerImpl.lambda$handleHeartbeatRpc$0(HeartbeatManagerImpl.java:248)........//
>  Deploy and run task
> 2024-02-27 15:10:32,426 INFO  
> org.apache.flink.runtime.executiongraph.ExecutionGraph       [] - Source: 
> datagen_source[1] -> Sink: print_sink[2] (70/100) 
> (2a1d06e6610bc499475fa6e647f8cac9_d3f21cabc6fe0fdf76c8be915bdb22a2_69_1) 
> switched from DEPLOYING to INITIALIZING.2024-02-27 15:10:32,427 INFO  
> org.apache.flink.runtime.executiongraph.ExecutionGraph       [] - Source: 
> datagen_source[1] -> Sink: print_sink[2] (69/100) 
> (2a1d06e6610bc499475fa6e647f8cac9_d3f21cabc6fe0fdf76c8be915bdb22a2_68_1) 
> switched from DEPLOYING to INITIALIZING.2024-02-27 15:10:33,347 INFO  
> org.apache.flink.runtime.executiongraph.ExecutionGraph       [] - Source: 
> datagen_source[1] -> Sink: print_sink[2] (70/100) 
> (2a1d06e6610bc499475fa6e647f8cac9_d3f21cabc6fe0fdf76c8be915bdb22a2_69_1) 
> switched from INITIALIZING to RUNNING.2024-02-27 15:10:33,421 INFO  
> org.apache.flink.runtime.executiongraph.ExecutionGraph       [] - Source: 
> datagen_source[1] -> Sink: print_sink[2] (69/100) 
> (2a1d06e6610bc499475fa6e647f8cac9_d3f21cabc6fe0fdf76c8be915bdb22a2_68_1) 
> switched from INITIALIZING to RUNNING. {code}
>  
> h3. Reason
> When the RM received the message the TM was killed, the JobMaster still kept 
> the connection with the killed TM.  And the JobMaster found the TM is no 
> longer reachable after about 17 seconds.
> h3. Solution:
> We can reduce the restart time by disConnectTaskManager actively in 
> ResourceManager
> {code:java}
> // class ResourceManager
> protected Optional<WorkerType> closeTaskManagerConnection(
>         final ResourceID resourceID, final Exception cause) {
>    // ....
>    // using JobManagerGateway to actively disconnect TaskManager 
>     
> workerRegistration.getTaskExecutorGateway().disconnectResourceManager(cause);
> } {code}
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to