[ 
https://issues.apache.org/jira/browse/FLINK-23807?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Qingsheng Ren updated FLINK-23807:
----------------------------------
    Description: 
{{MiniClusterTestEnvironment#triggerTaskManagerFailover}} checks the job status 
to detect a restart
{noformat}
        terminateTaskManager();
        CommonTestUtils.waitForJobStatus(
                jobClient,
                Arrays.asList(JobStatus.FAILING, JobStatus.FAILED, 
JobStatus.RESTARTING),
                Deadline.fromNow(Duration.ofMinutes(5)));
        afterFailAction.run();
        startTaskManager();
{noformat}
However, `waitForJobStatus` polls every 100ms while the restart can happen 
within 100ms and thus can easily miss the actual restart and wait forever (or 
when the next restart happens because slots are missing).

We should rather use the metric `numRestarts`, check before the induced error, 
and wait until the counter increased.

Here is an excerpt from a log where the restart was not detected in time.
{noformat}
42769 [flink-akka.actor.default-dispatcher-26] INFO  
org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Job TaskManager 
Failover Test (543035cf9e19317f92ee559b70ac70bd) switched from state RUNNING to 
RESTARTING.
42774 [flink-akka.actor.default-dispatcher-26] INFO  
org.apache.flink.runtime.jobmaster.slotpool.DefaultDeclarativeSlotPool [] - 
Releasing slot [ead7cad050ec7a264c0dba0b6e6a6ad9].
42775 [flink-akka.actor.default-dispatcher-23] INFO  
org.apache.flink.runtime.resourcemanager.slotmanager.DeclarativeSlotManager [] 
- Clearing resource requirements of job 543035cf9e19317f92ee559b70ac70bd
42776 [flink-akka.actor.default-dispatcher-22] INFO  
org.apache.flink.runtime.taskexecutor.slot.TaskSlotTableImpl [] - Free slot 
TaskSlot(index:0, state:RELEASING, resource profile: 
ResourceProfile{taskHeapMemory=170.667gb (183251937962 bytes), 
taskOffHeapMemory=170.667gb (183251937962 bytes), managedMemory=13.333mb 
(13981013 bytes), networkMemory=10.667mb (11184810 bytes)}, allocationId: 
ead7cad050ec7a264c0dba0b6e6a6ad9, jobId: 543035cf9e19317f92ee559b70ac70bd).
43780 [flink-akka.actor.default-dispatcher-26] INFO  
org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Job TaskManager 
Failover Test (543035cf9e19317f92ee559b70ac70bd) switched from state RESTARTING 
to RUNNING.
43783 [flink-akka.actor.default-dispatcher-26] INFO  
org.apache.flink.runtime.checkpoint.CheckpointCoordinator [] - Restoring job 
543035cf9e19317f92ee559b70ac70bd from Checkpoint 11 @ 1629093422900 for 
543035cf9e19317f92ee559b70ac70bd located at 
<checkpoint-not-externally-addressable>.
43798 [flink-akka.actor.default-dispatcher-26] INFO  
org.apache.flink.runtime.checkpoint.CheckpointCoordinator [] - No master state 
to restore
43800 [SourceCoordinator-Source: Tested Source -> Sink: Data stream collect 
sink] INFO  org.apache.flink.runtime.source.coordinator.SourceCoordinator [] - 
Recovering subtask 0 to checkpoint 11 for source Source: Tested Source -> Sink: 
Data stream collect sink to checkpoint.
43801 [flink-akka.actor.default-dispatcher-26] INFO  
org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Source: Tested 
Source -> Sink: Data stream collect sink (1/1) 
(35c0ee7183308af02db4b09152f1457e) switched from CREATED to SCHEDULED.
{noformat}
 

UPDATE: A better implementation would be using RestClient to detect if tasks 
are failed

  was:
{{MiniClusterTestEnvironment#triggerTaskManagerFailover}} checks the job status 
to detect a restart 
{noformat}
        terminateTaskManager();
        CommonTestUtils.waitForJobStatus(
                jobClient,
                Arrays.asList(JobStatus.FAILING, JobStatus.FAILED, 
JobStatus.RESTARTING),
                Deadline.fromNow(Duration.ofMinutes(5)));
        afterFailAction.run();
        startTaskManager();
{noformat}
However, `waitForJobStatus` polls every 100ms while the restart can happen 
within 100ms and thus can easily miss the actual restart and wait forever (or 
when the next restart happens because slots are missing).

We should rather use the metric `numRestarts`, check before the induced error, 
and wait until the counter increased.

Here is an excerpt from a log where the restart was not detected in time.


{noformat}
42769 [flink-akka.actor.default-dispatcher-26] INFO  
org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Job TaskManager 
Failover Test (543035cf9e19317f92ee559b70ac70bd) switched from state RUNNING to 
RESTARTING.
42774 [flink-akka.actor.default-dispatcher-26] INFO  
org.apache.flink.runtime.jobmaster.slotpool.DefaultDeclarativeSlotPool [] - 
Releasing slot [ead7cad050ec7a264c0dba0b6e6a6ad9].
42775 [flink-akka.actor.default-dispatcher-23] INFO  
org.apache.flink.runtime.resourcemanager.slotmanager.DeclarativeSlotManager [] 
- Clearing resource requirements of job 543035cf9e19317f92ee559b70ac70bd
42776 [flink-akka.actor.default-dispatcher-22] INFO  
org.apache.flink.runtime.taskexecutor.slot.TaskSlotTableImpl [] - Free slot 
TaskSlot(index:0, state:RELEASING, resource profile: 
ResourceProfile{taskHeapMemory=170.667gb (183251937962 bytes), 
taskOffHeapMemory=170.667gb (183251937962 bytes), managedMemory=13.333mb 
(13981013 bytes), networkMemory=10.667mb (11184810 bytes)}, allocationId: 
ead7cad050ec7a264c0dba0b6e6a6ad9, jobId: 543035cf9e19317f92ee559b70ac70bd).
43780 [flink-akka.actor.default-dispatcher-26] INFO  
org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Job TaskManager 
Failover Test (543035cf9e19317f92ee559b70ac70bd) switched from state RESTARTING 
to RUNNING.
43783 [flink-akka.actor.default-dispatcher-26] INFO  
org.apache.flink.runtime.checkpoint.CheckpointCoordinator [] - Restoring job 
543035cf9e19317f92ee559b70ac70bd from Checkpoint 11 @ 1629093422900 for 
543035cf9e19317f92ee559b70ac70bd located at 
<checkpoint-not-externally-addressable>.
43798 [flink-akka.actor.default-dispatcher-26] INFO  
org.apache.flink.runtime.checkpoint.CheckpointCoordinator [] - No master state 
to restore
43800 [SourceCoordinator-Source: Tested Source -> Sink: Data stream collect 
sink] INFO  org.apache.flink.runtime.source.coordinator.SourceCoordinator [] - 
Recovering subtask 0 to checkpoint 11 for source Source: Tested Source -> Sink: 
Data stream collect sink to checkpoint.
43801 [flink-akka.actor.default-dispatcher-26] INFO  
org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Source: Tested 
Source -> Sink: Data stream collect sink (1/1) 
(35c0ee7183308af02db4b09152f1457e) switched from CREATED to SCHEDULED.
{noformat}



> Use RestClient to detect restarts in 
> MiniClusterTestEnvironment#triggerTaskManagerFailover
> ------------------------------------------------------------------------------------------
>
>                 Key: FLINK-23807
>                 URL: https://issues.apache.org/jira/browse/FLINK-23807
>             Project: Flink
>          Issue Type: Bug
>          Components: Connectors / Common
>            Reporter: Arvid Heise
>            Priority: Major
>              Labels: pull-request-available
>             Fix For: 1.14.0
>
>
> {{MiniClusterTestEnvironment#triggerTaskManagerFailover}} checks the job 
> status to detect a restart
> {noformat}
>         terminateTaskManager();
>         CommonTestUtils.waitForJobStatus(
>                 jobClient,
>                 Arrays.asList(JobStatus.FAILING, JobStatus.FAILED, 
> JobStatus.RESTARTING),
>                 Deadline.fromNow(Duration.ofMinutes(5)));
>         afterFailAction.run();
>         startTaskManager();
> {noformat}
> However, `waitForJobStatus` polls every 100ms while the restart can happen 
> within 100ms and thus can easily miss the actual restart and wait forever (or 
> when the next restart happens because slots are missing).
> We should rather use the metric `numRestarts`, check before the induced 
> error, and wait until the counter increased.
> Here is an excerpt from a log where the restart was not detected in time.
> {noformat}
> 42769 [flink-akka.actor.default-dispatcher-26] INFO  
> org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Job TaskManager 
> Failover Test (543035cf9e19317f92ee559b70ac70bd) switched from state RUNNING 
> to RESTARTING.
> 42774 [flink-akka.actor.default-dispatcher-26] INFO  
> org.apache.flink.runtime.jobmaster.slotpool.DefaultDeclarativeSlotPool [] - 
> Releasing slot [ead7cad050ec7a264c0dba0b6e6a6ad9].
> 42775 [flink-akka.actor.default-dispatcher-23] INFO  
> org.apache.flink.runtime.resourcemanager.slotmanager.DeclarativeSlotManager 
> [] - Clearing resource requirements of job 543035cf9e19317f92ee559b70ac70bd
> 42776 [flink-akka.actor.default-dispatcher-22] INFO  
> org.apache.flink.runtime.taskexecutor.slot.TaskSlotTableImpl [] - Free slot 
> TaskSlot(index:0, state:RELEASING, resource profile: 
> ResourceProfile{taskHeapMemory=170.667gb (183251937962 bytes), 
> taskOffHeapMemory=170.667gb (183251937962 bytes), managedMemory=13.333mb 
> (13981013 bytes), networkMemory=10.667mb (11184810 bytes)}, allocationId: 
> ead7cad050ec7a264c0dba0b6e6a6ad9, jobId: 543035cf9e19317f92ee559b70ac70bd).
> 43780 [flink-akka.actor.default-dispatcher-26] INFO  
> org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Job TaskManager 
> Failover Test (543035cf9e19317f92ee559b70ac70bd) switched from state 
> RESTARTING to RUNNING.
> 43783 [flink-akka.actor.default-dispatcher-26] INFO  
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator [] - Restoring job 
> 543035cf9e19317f92ee559b70ac70bd from Checkpoint 11 @ 1629093422900 for 
> 543035cf9e19317f92ee559b70ac70bd located at 
> <checkpoint-not-externally-addressable>.
> 43798 [flink-akka.actor.default-dispatcher-26] INFO  
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator [] - No master 
> state to restore
> 43800 [SourceCoordinator-Source: Tested Source -> Sink: Data stream collect 
> sink] INFO  org.apache.flink.runtime.source.coordinator.SourceCoordinator [] 
> - Recovering subtask 0 to checkpoint 11 for source Source: Tested Source -> 
> Sink: Data stream collect sink to checkpoint.
> 43801 [flink-akka.actor.default-dispatcher-26] INFO  
> org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Source: Tested 
> Source -> Sink: Data stream collect sink (1/1) 
> (35c0ee7183308af02db4b09152f1457e) switched from CREATED to SCHEDULED.
> {noformat}
>  
> UPDATE: A better implementation would be using RestClient to detect if tasks 
> are failed



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to