[
https://issues.apache.org/jira/browse/FLINK-34526?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
junzhong qin updated FLINK-34526:
---------------------------------
Description:
In our test case, the pipeline is:
!image-2024-02-27-15-50-25-071.png!
# parallelism = 100
# taskmanager.numberOfTaskSlots = 2
# disable checkpoint
h3. Phenomenon
When the worker was killed at 2024-02-27 15:10:13,691
{code:java}
2024-02-27 15:10:13,691 INFO
org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager [] -
Worker container_e2472_1706081484717_60538_01_000050 is terminated.
Diagnostics: Container container_e2472_1706081484717_60538_01_000050 marked as
failed. Exit code:137. Diagnostics:[2024-02-27 15:10:12.720]Container killed on
request. Exit code is 137[2024-02-27 15:10:12.763]Container exited with a
non-zero exit code 137. [2024-02-27 15:10:12.839]Killed by external signal
{code}
It took about 20 seconds to restart the job.
{code:java}
2024-02-27 15:10:30,749 INFO
org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Source:
datagen_source[1] -> Sink: print_sink[2] (70/100)
(2a1d06e6610bc499475fa6e647f8cac9_d3f21cabc6fe0fdf76c8be915bdb22a2_69_0)
switched from RUNNING to FAILED on
container_e2472_1706081484717_60538_01_000050 @ xxx
(dataPort=38597).org.apache.flink.runtime.jobmaster.JobMasterException:
TaskManager with id container_e2472_1706081484717_60538_01_000050(xxx:5454) is
no longer reachable.
at
org.apache.flink.runtime.jobmaster.JobMaster$TaskManagerHeartbeatListener.notifyTargetUnreachable(JobMaster.java:1515)
at
org.apache.flink.runtime.heartbeat.DefaultHeartbeatMonitor.reportHeartbeatRpcFailure(DefaultHeartbeatMonitor.java:126)
at
org.apache.flink.runtime.heartbeat.HeartbeatManagerImpl.runIfHeartbeatMonitorExists(HeartbeatManagerImpl.java:275)
at
org.apache.flink.runtime.heartbeat.HeartbeatManagerImpl.reportHeartbeatTargetUnreachable(HeartbeatManagerImpl.java:267)
at
org.apache.flink.runtime.heartbeat.HeartbeatManagerImpl.handleHeartbeatRpcFailure(HeartbeatManagerImpl.java:262)
at
org.apache.flink.runtime.heartbeat.HeartbeatManagerImpl.lambda$handleHeartbeatRpc$0(HeartbeatManagerImpl.java:248)........//
Deploy and run task
2024-02-27 15:10:32,426 INFO
org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Source:
datagen_source[1] -> Sink: print_sink[2] (70/100)
(2a1d06e6610bc499475fa6e647f8cac9_d3f21cabc6fe0fdf76c8be915bdb22a2_69_1)
switched from DEPLOYING to INITIALIZING.2024-02-27 15:10:32,427 INFO
org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Source:
datagen_source[1] -> Sink: print_sink[2] (69/100)
(2a1d06e6610bc499475fa6e647f8cac9_d3f21cabc6fe0fdf76c8be915bdb22a2_68_1)
switched from DEPLOYING to INITIALIZING.2024-02-27 15:10:33,347 INFO
org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Source:
datagen_source[1] -> Sink: print_sink[2] (70/100)
(2a1d06e6610bc499475fa6e647f8cac9_d3f21cabc6fe0fdf76c8be915bdb22a2_69_1)
switched from INITIALIZING to RUNNING.2024-02-27 15:10:33,421 INFO
org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Source:
datagen_source[1] -> Sink: print_sink[2] (69/100)
(2a1d06e6610bc499475fa6e647f8cac9_d3f21cabc6fe0fdf76c8be915bdb22a2_68_1)
switched from INITIALIZING to RUNNING. {code}
h3. Reason
When the RM received the message the TM was killed, the JobMaster still kept
the connection with the killed TM. And the JobMaster found the TM is no longer
reachable after about 17 seconds.
h3. Solution:
We can reduce the restart time by disConnectTaskManager actively in
ResourceManager
{code:java}
// class ResourceManager
protected Optional<WorkerType> closeTaskManagerConnection(
final ResourceID resourceID, final Exception cause) {
// ....
// using JobManagerGateway to actively disconnect TaskManager
workerRegistration.getTaskExecutorGateway().disconnectResourceManager(cause);
} {code}
was:
In our test case, the pipeline is:
!image-2024-02-27-15-50-25-071.png!
# parallelism = 100
# taskmanager.numberOfTaskSlots = 2
# disable checkpoint
h3. Phenomenon
When the worker was killed at 2024-02-27 15:10:13,691
{code:java}
2024-02-27 15:10:13,691 INFO
org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager [] -
Worker container_e2472_1706081484717_60538_01_000050 is terminated.
Diagnostics: Container container_e2472_1706081484717_60538_01_000050 marked as
failed. Exit code:137. Diagnostics:[2024-02-27 15:10:12.720]Container killed on
request. Exit code is 137[2024-02-27 15:10:12.763]Container exited with a
non-zero exit code 137. [2024-02-27 15:10:12.839]Killed by external signal
{code}
It took about 20 seconds to restart the job.
{code:java}
2024-02-27 15:10:30,749 INFO
org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Source:
datagen_source[1] -> Sink: print_sink[2] (70/100)
(2a1d06e6610bc499475fa6e647f8cac9_d3f21cabc6fe0fdf76c8be915bdb22a2_69_0)
switched from RUNNING to FAILED on
container_e2472_1706081484717_60538_01_000050 @ xxx
(dataPort=38597).org.apache.flink.runtime.jobmaster.JobMasterException:
TaskManager with id container_e2472_1706081484717_60538_01_000050(xxx:5454) is
no longer reachable.
at
org.apache.flink.runtime.jobmaster.JobMaster$TaskManagerHeartbeatListener.notifyTargetUnreachable(JobMaster.java:1515)
at
org.apache.flink.runtime.heartbeat.DefaultHeartbeatMonitor.reportHeartbeatRpcFailure(DefaultHeartbeatMonitor.java:126)
at
org.apache.flink.runtime.heartbeat.HeartbeatManagerImpl.runIfHeartbeatMonitorExists(HeartbeatManagerImpl.java:275)
at
org.apache.flink.runtime.heartbeat.HeartbeatManagerImpl.reportHeartbeatTargetUnreachable(HeartbeatManagerImpl.java:267)
at
org.apache.flink.runtime.heartbeat.HeartbeatManagerImpl.handleHeartbeatRpcFailure(HeartbeatManagerImpl.java:262)
at
org.apache.flink.runtime.heartbeat.HeartbeatManagerImpl.lambda$handleHeartbeatRpc$0(HeartbeatManagerImpl.java:248)........//
Deploy and run task
2024-02-27 15:10:32,426 INFO
org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Source:
datagen_source[1] -> Sink: print_sink[2] (70/100)
(2a1d06e6610bc499475fa6e647f8cac9_d3f21cabc6fe0fdf76c8be915bdb22a2_69_1)
switched from DEPLOYING to INITIALIZING.2024-02-27 15:10:32,427 INFO
org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Source:
datagen_source[1] -> Sink: print_sink[2] (69/100)
(2a1d06e6610bc499475fa6e647f8cac9_d3f21cabc6fe0fdf76c8be915bdb22a2_68_1)
switched from DEPLOYING to INITIALIZING.2024-02-27 15:10:33,347 INFO
org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Source:
datagen_source[1] -> Sink: print_sink[2] (70/100)
(2a1d06e6610bc499475fa6e647f8cac9_d3f21cabc6fe0fdf76c8be915bdb22a2_69_1)
switched from INITIALIZING to RUNNING.2024-02-27 15:10:33,421 INFO
org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Source:
datagen_source[1] -> Sink: print_sink[2] (69/100)
(2a1d06e6610bc499475fa6e647f8cac9_d3f21cabc6fe0fdf76c8be915bdb22a2_68_1)
switched from INITIALIZING to RUNNING. {code}
h3. Reason
When the RM received the message the TM was killed, the JobMaster still keep
the connection with the killed TM.
h3. Solution:
We can fix this by disConnectTaskManager in ResourceManager
{code:java}
protected Optional<WorkerType> closeTaskManagerConnection(
final ResourceID resourceID, final Exception cause) {
// ....
// using JobManagerGateway disconnect TaskManager
workerRegistration.getTaskExecutorGateway().disconnectResourceManager(cause);
} {code}
> Without shuffling data, when a Task Manager is killed, restarting the Flink
> job takes a considerable amount of time
> -------------------------------------------------------------------------------------------------------------------
>
> Key: FLINK-34526
> URL: https://issues.apache.org/jira/browse/FLINK-34526
> Project: Flink
> Issue Type: Sub-task
> Reporter: junzhong qin
> Assignee: junzhong qin
> Priority: Not a Priority
> Attachments: image-2024-02-27-15-50-25-071.png,
> image-2024-02-27-15-50-39-337.png
>
>
> In our test case, the pipeline is:
> !image-2024-02-27-15-50-25-071.png!
> # parallelism = 100
> # taskmanager.numberOfTaskSlots = 2
> # disable checkpoint
> h3. Phenomenon
> When the worker was killed at 2024-02-27 15:10:13,691
> {code:java}
> 2024-02-27 15:10:13,691 INFO
> org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager [] -
> Worker container_e2472_1706081484717_60538_01_000050 is terminated.
> Diagnostics: Container container_e2472_1706081484717_60538_01_000050 marked
> as failed. Exit code:137. Diagnostics:[2024-02-27 15:10:12.720]Container
> killed on request. Exit code is 137[2024-02-27 15:10:12.763]Container exited
> with a non-zero exit code 137. [2024-02-27 15:10:12.839]Killed by external
> signal {code}
> It took about 20 seconds to restart the job.
> {code:java}
> 2024-02-27 15:10:30,749 INFO
> org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Source:
> datagen_source[1] -> Sink: print_sink[2] (70/100)
> (2a1d06e6610bc499475fa6e647f8cac9_d3f21cabc6fe0fdf76c8be915bdb22a2_69_0)
> switched from RUNNING to FAILED on
> container_e2472_1706081484717_60538_01_000050 @ xxx
> (dataPort=38597).org.apache.flink.runtime.jobmaster.JobMasterException:
> TaskManager with id container_e2472_1706081484717_60538_01_000050(xxx:5454)
> is no longer reachable.
> at
> org.apache.flink.runtime.jobmaster.JobMaster$TaskManagerHeartbeatListener.notifyTargetUnreachable(JobMaster.java:1515)
> at
> org.apache.flink.runtime.heartbeat.DefaultHeartbeatMonitor.reportHeartbeatRpcFailure(DefaultHeartbeatMonitor.java:126)
> at
> org.apache.flink.runtime.heartbeat.HeartbeatManagerImpl.runIfHeartbeatMonitorExists(HeartbeatManagerImpl.java:275)
> at
> org.apache.flink.runtime.heartbeat.HeartbeatManagerImpl.reportHeartbeatTargetUnreachable(HeartbeatManagerImpl.java:267)
> at
> org.apache.flink.runtime.heartbeat.HeartbeatManagerImpl.handleHeartbeatRpcFailure(HeartbeatManagerImpl.java:262)
> at
> org.apache.flink.runtime.heartbeat.HeartbeatManagerImpl.lambda$handleHeartbeatRpc$0(HeartbeatManagerImpl.java:248)........//
> Deploy and run task
> 2024-02-27 15:10:32,426 INFO
> org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Source:
> datagen_source[1] -> Sink: print_sink[2] (70/100)
> (2a1d06e6610bc499475fa6e647f8cac9_d3f21cabc6fe0fdf76c8be915bdb22a2_69_1)
> switched from DEPLOYING to INITIALIZING.2024-02-27 15:10:32,427 INFO
> org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Source:
> datagen_source[1] -> Sink: print_sink[2] (69/100)
> (2a1d06e6610bc499475fa6e647f8cac9_d3f21cabc6fe0fdf76c8be915bdb22a2_68_1)
> switched from DEPLOYING to INITIALIZING.2024-02-27 15:10:33,347 INFO
> org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Source:
> datagen_source[1] -> Sink: print_sink[2] (70/100)
> (2a1d06e6610bc499475fa6e647f8cac9_d3f21cabc6fe0fdf76c8be915bdb22a2_69_1)
> switched from INITIALIZING to RUNNING.2024-02-27 15:10:33,421 INFO
> org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Source:
> datagen_source[1] -> Sink: print_sink[2] (69/100)
> (2a1d06e6610bc499475fa6e647f8cac9_d3f21cabc6fe0fdf76c8be915bdb22a2_68_1)
> switched from INITIALIZING to RUNNING. {code}
>
> h3. Reason
> When the RM received the message the TM was killed, the JobMaster still kept
> the connection with the killed TM. And the JobMaster found the TM is no
> longer reachable after about 17 seconds.
> h3. Solution:
> We can reduce the restart time by disConnectTaskManager actively in
> ResourceManager
> {code:java}
> // class ResourceManager
> protected Optional<WorkerType> closeTaskManagerConnection(
> final ResourceID resourceID, final Exception cause) {
> // ....
> // using JobManagerGateway to actively disconnect TaskManager
>
> workerRegistration.getTaskExecutorGateway().disconnectResourceManager(cause);
> } {code}
>
>
--
This message was sent by Atlassian Jira
(v8.20.10#820010)