[ 
https://issues.apache.org/jira/browse/FLINK-36295?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17882730#comment-17882730
 ] 

Matthias Pohl edited comment on FLINK-36295 at 9/18/24 2:21 PM:
----------------------------------------------------------------

I guess I found the reason why we do not retrieve an exception history in the 
failed test run:
# The job is started and reaches WaitingForResources state with only one TM 
being available (2 slots, i.e. only sufficient resources are met).
# The 2nd TM is not added while being in WaitingForResources state which makes 
the StateTransitionManager (STM) trigger the state transition into 
CreatingExecutionGraph state.
# While creating the ExecutionGraph, the 2nd TM becomes available. The 
ExecutionGraph creation is on-going already and doesn't consider the newly 
added slots.
# The AdaptiveScheduler (AS) reaches Executing state where the onChange and 
onTrigger events are initiated which triggers the STM's onChange and onTrigger 
event. This events do not consider the newly added TM slots, yet, because of 
FLINK-36279 (only free slots are considered but not the ones that are already 
allocated to the job). Hence, we see that the desired resources are not met. 
The STM changes into `Stabilized` Phase and waits for a new onTrigger (which 
would be a new checkpoint)
# The job is running with parallelism of 2 until the checkpoint is triggered. 
That makes the STM trigger the rescale cancelling the two subtasks.
# While the job is restarting, one TM is stopped by the test code.
# The AS transitions into CreateExecutionGraph state right away from Restarting 
state (FLINK-36013) while one TM is still in the process of stopping.
# The ExecutionGraph is now picked up with a parallelism of 4 (because the 
slots of the TM that is subject to shutdown are still available)
# At the end of the CreateExecutionGraph state, a transitioning to 
WaitingForResources state is performed because of 2 slots being gone.
# The job reaches Executing state with parallelism of 2.

On the test codes side, the following things happen after job submission:
# The tests wait for the checkpoint to be created (item 5 above)
# Next, we're waiting for a job parallelism of 2. That happens in above item 4 
already. We're already past above item 5 in Restarting state. But Restarting 
does contain the ExecutionGraph of the previous Executing state (Restarting 
state implements StateWithExecutionGraph) which will be returned and the test 
proceeds with checkpoint count > 0.
# Next, we're checking the ExecutionGraph history for the existence of 
checkpoint #1 which fails because the returned CheckpointStatsSnapshot doesn't 
contain any checkpoint history. This is due to the fact that we had the hiccup 
with WaitingForResources (above item 8 and 9). The WaitingForResources state 
keeps the previousExecutionGraph but doesn't return it because it's 
implementing StateWithoutExecutionGraph. Instead, it returns an 
ArchivedExecutionGraph dummy instance (see 
[StateWithoutExecutionGraph#getJob|https://github.com/apache/flink/blob/4aebc43721873480dfbbaf2cfe784c3833f1cdc9/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/StateWithoutExecutionGraph.java#L58])
 that doesn't include the checkpoint history.


was (Author: mapohl):
I guess I found the reason why we do not retrieve an exception history in the 
failed test run:
1. The job is started and reaches WaitingForResources state with only one TM 
being available (2 slots, i.e. only sufficient resources are met).
2. The 2nd TM is not added while being in WaitingForResources state which makes 
the StateTransitionManager (STM) trigger the state transition into 
CreatingExecutionGraph state.
3. While creating the ExecutionGraph, the 2nd TM becomes available. The 
ExecutionGraph creation is on-going already and doesn't consider the newly 
added slots.
4. The AdaptiveScheduler (AS) reaches Executing state where the onChange and 
onTrigger events are initiated which triggers the STM's onChange and onTrigger 
event. This events do not consider the newly added TM slots, yet, because of 
FLINK-36279 (only free slots are considered but not the ones that are already 
allocated to the job). Hence, we see that the desired resources are not met. 
The STM changes into `Stabilized` Phase and waits for a new onTrigger (which 
would be a new checkpoint)
5. The job is running with parallelism of 2 until the checkpoint is triggered. 
That makes the STM trigger the rescale cancelling the two subtasks.
6. While the job is restarting, one TM is stopped by the test code.
7. The AS transitions into CreateExecutionGraph state right away from 
Restarting state (FLINK-36013) while one TM is still in the process of stopping.
8. The ExecutionGraph is now picked up with a parallelism of 4 (because the 
slots of the TM that is subject to shutdown are still available)
9. At the end of the CreateExecutionGraph state, a transitioning to 
WaitingForResources state is performed because of 2 slots being gone.
10. The job reaches Executing state with parallelism of 2.

On the test codes side, the following things happen after job submission:
1. The tests wait for the checkpoint to be created (item 5 above)
2. Next, we're waiting for a job parallelism of 2. That happens in above item 4 
already. We're already past above item 5 in Restarting state. But Restarting 
does contain the ExecutionGraph of the previous Executing state (Restarting 
state implements StateWithExecutionGraph) which will be returned and the test 
proceeds with checkpoint count > 0.
3. Next, we're checking the ExecutionGraph history for the existence of 
checkpoint #1 which fails because the returned CheckpointStatsSnapshot doesn't 
contain any checkpoint history. This is due to the fact that we had the hiccup 
with WaitingForResources (above item 8 and 9). The WaitingForResources state 
keeps the previousExecutionGraph but doesn't return it because it's 
implementing StateWithoutExecutionGraph. Instead, it returns an 
ArchivedExecutionGraph dummy instance (see 
[StateWithoutExecutionGraph#getJob|https://github.com/apache/flink/blob/4aebc43721873480dfbbaf2cfe784c3833f1cdc9/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/StateWithoutExecutionGraph.java#L58])
 that doesn't include the checkpoint history.

> AdaptiveSchedulerClusterITCase. testCheckpointStatsPersistedAcrossRescale 
> failed with 
> --------------------------------------------------------------------------------------
>
>                 Key: FLINK-36295
>                 URL: https://issues.apache.org/jira/browse/FLINK-36295
>             Project: Flink
>          Issue Type: Sub-task
>          Components: Runtime / Coordination
>    Affects Versions: 2.0-preview
>            Reporter: Matthias Pohl
>            Assignee: Matthias Pohl
>            Priority: Blocker
>              Labels: test-stability
>         Attachments: 
> FLINK-36295.failure.62156.20240916.1.logs-cron_jdk17-test_cron_jdk17_core-1726454552.log,
>  FLINK-36295.failure.with-revert.debug.log
>
>
> https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=62156&view=logs&j=675bf62c-8558-587e-2555-dcad13acefb5&t=5878eed3-cc1e-5b12-1ed0-9e7139ce0992&l=10234
> {code}
> Sep 16 03:06:30 03:06:30.168 [ERROR] Tests run: 3, Failures: 0, Errors: 1, 
> Skipped: 0, Time elapsed: 5.275 s <<< FAILURE! -- in 
> org.apache.flink.runtime.scheduler.adaptive.AdaptiveSchedulerClusterITCase
> Sep 16 03:06:30 03:06:30.168 [ERROR] 
> org.apache.flink.runtime.scheduler.adaptive.AdaptiveSchedulerClusterITCase.testCheckpointStatsPersistedAcrossRescale
>  -- Time elapsed: 0.676 s <<< ERROR!
> Sep 16 03:06:30 java.lang.IndexOutOfBoundsException: Index: -1
> Sep 16 03:06:30       at 
> java.base/java.util.Collections$EmptyList.get(Collections.java:4586)
> Sep 16 03:06:30       at 
> org.apache.flink.runtime.scheduler.adaptive.AdaptiveSchedulerClusterITCase.testCheckpointStatsPersistedAcrossRescale(AdaptiveSchedulerClusterITCase.java:214)
> Sep 16 03:06:30       at 
> java.base/java.lang.reflect.Method.invoke(Method.java:568)
> Sep 16 03:06:30       at 
> java.base/java.util.concurrent.RecursiveAction.exec(RecursiveAction.java:194)
> Sep 16 03:06:30       at 
> java.base/java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:373)
> Sep 16 03:06:30       at 
> java.base/java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(ForkJoinPool.java:1182)
> Sep 16 03:06:30       at 
> java.base/java.util.concurrent.ForkJoinPool.scan(ForkJoinPool.java:1655)
> Sep 16 03:06:30       at 
> java.base/java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1622)
> Sep 16 03:06:30       at 
> java.base/java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:165)
> Sep 16 03:06:30
> {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to