[
https://issues.apache.org/jira/browse/FLINK-23807?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Qingsheng Ren updated FLINK-23807:
----------------------------------
Description:
{{MiniClusterTestEnvironment#triggerTaskManagerFailover}} checks the job status
to detect a restart
{noformat}
terminateTaskManager();
CommonTestUtils.waitForJobStatus(
jobClient,
Arrays.asList(JobStatus.FAILING, JobStatus.FAILED,
JobStatus.RESTARTING),
Deadline.fromNow(Duration.ofMinutes(5)));
afterFailAction.run();
startTaskManager();
{noformat}
However, `waitForJobStatus` polls every 100ms while the restart can happen
within 100ms and thus can easily miss the actual restart and wait forever (or
when the next restart happens because slots are missing).
We should rather use the metric `numRestarts`, check before the induced error,
and wait until the counter increased.
Here is an excerpt from a log where the restart was not detected in time.
{noformat}
42769 [flink-akka.actor.default-dispatcher-26] INFO
org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Job TaskManager
Failover Test (543035cf9e19317f92ee559b70ac70bd) switched from state RUNNING to
RESTARTING.
42774 [flink-akka.actor.default-dispatcher-26] INFO
org.apache.flink.runtime.jobmaster.slotpool.DefaultDeclarativeSlotPool [] -
Releasing slot [ead7cad050ec7a264c0dba0b6e6a6ad9].
42775 [flink-akka.actor.default-dispatcher-23] INFO
org.apache.flink.runtime.resourcemanager.slotmanager.DeclarativeSlotManager []
- Clearing resource requirements of job 543035cf9e19317f92ee559b70ac70bd
42776 [flink-akka.actor.default-dispatcher-22] INFO
org.apache.flink.runtime.taskexecutor.slot.TaskSlotTableImpl [] - Free slot
TaskSlot(index:0, state:RELEASING, resource profile:
ResourceProfile{taskHeapMemory=170.667gb (183251937962 bytes),
taskOffHeapMemory=170.667gb (183251937962 bytes), managedMemory=13.333mb
(13981013 bytes), networkMemory=10.667mb (11184810 bytes)}, allocationId:
ead7cad050ec7a264c0dba0b6e6a6ad9, jobId: 543035cf9e19317f92ee559b70ac70bd).
43780 [flink-akka.actor.default-dispatcher-26] INFO
org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Job TaskManager
Failover Test (543035cf9e19317f92ee559b70ac70bd) switched from state RESTARTING
to RUNNING.
43783 [flink-akka.actor.default-dispatcher-26] INFO
org.apache.flink.runtime.checkpoint.CheckpointCoordinator [] - Restoring job
543035cf9e19317f92ee559b70ac70bd from Checkpoint 11 @ 1629093422900 for
543035cf9e19317f92ee559b70ac70bd located at
<checkpoint-not-externally-addressable>.
43798 [flink-akka.actor.default-dispatcher-26] INFO
org.apache.flink.runtime.checkpoint.CheckpointCoordinator [] - No master state
to restore
43800 [SourceCoordinator-Source: Tested Source -> Sink: Data stream collect
sink] INFO org.apache.flink.runtime.source.coordinator.SourceCoordinator [] -
Recovering subtask 0 to checkpoint 11 for source Source: Tested Source -> Sink:
Data stream collect sink to checkpoint.
43801 [flink-akka.actor.default-dispatcher-26] INFO
org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Source: Tested
Source -> Sink: Data stream collect sink (1/1)
(35c0ee7183308af02db4b09152f1457e) switched from CREATED to SCHEDULED.
{noformat}
UPDATE: A better implementation would be using RestClient to detect if tasks
are failed
was:
{{MiniClusterTestEnvironment#triggerTaskManagerFailover}} checks the job status
to detect a restart
{noformat}
terminateTaskManager();
CommonTestUtils.waitForJobStatus(
jobClient,
Arrays.asList(JobStatus.FAILING, JobStatus.FAILED,
JobStatus.RESTARTING),
Deadline.fromNow(Duration.ofMinutes(5)));
afterFailAction.run();
startTaskManager();
{noformat}
However, `waitForJobStatus` polls every 100ms while the restart can happen
within 100ms and thus can easily miss the actual restart and wait forever (or
when the next restart happens because slots are missing).
We should rather use the metric `numRestarts`, check before the induced error,
and wait until the counter increased.
Here is an excerpt from a log where the restart was not detected in time.
{noformat}
42769 [flink-akka.actor.default-dispatcher-26] INFO
org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Job TaskManager
Failover Test (543035cf9e19317f92ee559b70ac70bd) switched from state RUNNING to
RESTARTING.
42774 [flink-akka.actor.default-dispatcher-26] INFO
org.apache.flink.runtime.jobmaster.slotpool.DefaultDeclarativeSlotPool [] -
Releasing slot [ead7cad050ec7a264c0dba0b6e6a6ad9].
42775 [flink-akka.actor.default-dispatcher-23] INFO
org.apache.flink.runtime.resourcemanager.slotmanager.DeclarativeSlotManager []
- Clearing resource requirements of job 543035cf9e19317f92ee559b70ac70bd
42776 [flink-akka.actor.default-dispatcher-22] INFO
org.apache.flink.runtime.taskexecutor.slot.TaskSlotTableImpl [] - Free slot
TaskSlot(index:0, state:RELEASING, resource profile:
ResourceProfile{taskHeapMemory=170.667gb (183251937962 bytes),
taskOffHeapMemory=170.667gb (183251937962 bytes), managedMemory=13.333mb
(13981013 bytes), networkMemory=10.667mb (11184810 bytes)}, allocationId:
ead7cad050ec7a264c0dba0b6e6a6ad9, jobId: 543035cf9e19317f92ee559b70ac70bd).
43780 [flink-akka.actor.default-dispatcher-26] INFO
org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Job TaskManager
Failover Test (543035cf9e19317f92ee559b70ac70bd) switched from state RESTARTING
to RUNNING.
43783 [flink-akka.actor.default-dispatcher-26] INFO
org.apache.flink.runtime.checkpoint.CheckpointCoordinator [] - Restoring job
543035cf9e19317f92ee559b70ac70bd from Checkpoint 11 @ 1629093422900 for
543035cf9e19317f92ee559b70ac70bd located at
<checkpoint-not-externally-addressable>.
43798 [flink-akka.actor.default-dispatcher-26] INFO
org.apache.flink.runtime.checkpoint.CheckpointCoordinator [] - No master state
to restore
43800 [SourceCoordinator-Source: Tested Source -> Sink: Data stream collect
sink] INFO org.apache.flink.runtime.source.coordinator.SourceCoordinator [] -
Recovering subtask 0 to checkpoint 11 for source Source: Tested Source -> Sink:
Data stream collect sink to checkpoint.
43801 [flink-akka.actor.default-dispatcher-26] INFO
org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Source: Tested
Source -> Sink: Data stream collect sink (1/1)
(35c0ee7183308af02db4b09152f1457e) switched from CREATED to SCHEDULED.
{noformat}
> Use RestClient to detect restarts in
> MiniClusterTestEnvironment#triggerTaskManagerFailover
> ------------------------------------------------------------------------------------------
>
> Key: FLINK-23807
> URL: https://issues.apache.org/jira/browse/FLINK-23807
> Project: Flink
> Issue Type: Bug
> Components: Connectors / Common
> Reporter: Arvid Heise
> Priority: Major
> Labels: pull-request-available
> Fix For: 1.14.0
>
>
> {{MiniClusterTestEnvironment#triggerTaskManagerFailover}} checks the job
> status to detect a restart
> {noformat}
> terminateTaskManager();
> CommonTestUtils.waitForJobStatus(
> jobClient,
> Arrays.asList(JobStatus.FAILING, JobStatus.FAILED,
> JobStatus.RESTARTING),
> Deadline.fromNow(Duration.ofMinutes(5)));
> afterFailAction.run();
> startTaskManager();
> {noformat}
> However, `waitForJobStatus` polls every 100ms while the restart can happen
> within 100ms and thus can easily miss the actual restart and wait forever (or
> when the next restart happens because slots are missing).
> We should rather use the metric `numRestarts`, check before the induced
> error, and wait until the counter increased.
> Here is an excerpt from a log where the restart was not detected in time.
> {noformat}
> 42769 [flink-akka.actor.default-dispatcher-26] INFO
> org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Job TaskManager
> Failover Test (543035cf9e19317f92ee559b70ac70bd) switched from state RUNNING
> to RESTARTING.
> 42774 [flink-akka.actor.default-dispatcher-26] INFO
> org.apache.flink.runtime.jobmaster.slotpool.DefaultDeclarativeSlotPool [] -
> Releasing slot [ead7cad050ec7a264c0dba0b6e6a6ad9].
> 42775 [flink-akka.actor.default-dispatcher-23] INFO
> org.apache.flink.runtime.resourcemanager.slotmanager.DeclarativeSlotManager
> [] - Clearing resource requirements of job 543035cf9e19317f92ee559b70ac70bd
> 42776 [flink-akka.actor.default-dispatcher-22] INFO
> org.apache.flink.runtime.taskexecutor.slot.TaskSlotTableImpl [] - Free slot
> TaskSlot(index:0, state:RELEASING, resource profile:
> ResourceProfile{taskHeapMemory=170.667gb (183251937962 bytes),
> taskOffHeapMemory=170.667gb (183251937962 bytes), managedMemory=13.333mb
> (13981013 bytes), networkMemory=10.667mb (11184810 bytes)}, allocationId:
> ead7cad050ec7a264c0dba0b6e6a6ad9, jobId: 543035cf9e19317f92ee559b70ac70bd).
> 43780 [flink-akka.actor.default-dispatcher-26] INFO
> org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Job TaskManager
> Failover Test (543035cf9e19317f92ee559b70ac70bd) switched from state
> RESTARTING to RUNNING.
> 43783 [flink-akka.actor.default-dispatcher-26] INFO
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator [] - Restoring job
> 543035cf9e19317f92ee559b70ac70bd from Checkpoint 11 @ 1629093422900 for
> 543035cf9e19317f92ee559b70ac70bd located at
> <checkpoint-not-externally-addressable>.
> 43798 [flink-akka.actor.default-dispatcher-26] INFO
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator [] - No master
> state to restore
> 43800 [SourceCoordinator-Source: Tested Source -> Sink: Data stream collect
> sink] INFO org.apache.flink.runtime.source.coordinator.SourceCoordinator []
> - Recovering subtask 0 to checkpoint 11 for source Source: Tested Source ->
> Sink: Data stream collect sink to checkpoint.
> 43801 [flink-akka.actor.default-dispatcher-26] INFO
> org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Source: Tested
> Source -> Sink: Data stream collect sink (1/1)
> (35c0ee7183308af02db4b09152f1457e) switched from CREATED to SCHEDULED.
> {noformat}
>
> UPDATE: A better implementation would be using RestClient to detect if tasks
> are failed
--
This message was sent by Atlassian Jira
(v8.3.4#803005)