[ 
https://issues.apache.org/jira/browse/FLINK-21233?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Roman Khachatryan updated FLINK-21233:
--------------------------------------
    Description: 
I'm writing an integration test and see a failure from time to time (1 per 100 
on my machine):
{code:java}
Caused by: java.util.concurrent.CompletionException: 
org.apache.flink.runtime.checkpoint.CheckpointException: CheckpointCoordinator 
shutdown.
{code}
 The test uses a very small graph and takes a synchronous savepoint:
{code:java}
client.submitJob(jobGraph).get(); // InfiniteTestSource -> 
BoundedPassThroughOperator -> DiscardingSink, dop = 1

BoundedPassThroughOperator.getProgressLatch().await();
InfiniteTestSource.suspendAll(); // prevent deadlock in cancelAllAndAwait
CompletableFuture<String> stop = client.stopWithSavepoint(jobId, false, null);
Thread.sleep(500); // await checkpoint start (not explicit signals to avoid 
deadlocks)
InfiniteTestSource.cancelAllAndAwait(); // emulate end of input
BoundedPassThroughOperator.allowSnapshots();
stop.get(); // <-- completion exception (sometimes)
{code}
 This is what I believe happens in the final stage of the savepoint:
 # The last subtask ACKs the checkpoint
 # CheckpointCoordinator finalizes the checkpoint and sends out confirmations
 # EndOfPartition is generated on sources and flows through the graph
 # Each Subtask notifies the Scheduler about its completion
 # Upon receiving the last notification Scheduler shuts down 
CheckpointCoordinator
 # CheckpointCoordinator aborts all pending checkpoints

Not that Scheduler and CheckpointCoordinator run in different threads.

So if savepoint finalization takes longer then it can be aborted before 
completion.

  was:
I'm writing an integration test and see a failure from time to time (1 per 100 
on my machine):
{code:java}
Caused by: java.util.concurrent.CompletionException: 
org.apache.flink.runtime.checkpoint.CheckpointException: CheckpointCoordinator 
shutdown.
{code}
 The test uses a very small graph and takes a synchronous savepoint:
{code}
client.submitJob(jobGraph).get(); // InfiniteTestSource -> 
BoundedPassThroughOperator -> DiscardingSink, dop = 1

BoundedPassThroughOperator.getProgressLatch().await();
InfiniteTestSource.suspendAll(); // prevent deadlock in cancelAllAndAwait
CompletableFuture<String> stop = client.stopWithSavepoint(jobId, false, null);
Thread.sleep(500); // await checkpoint start (not explicit signals to avoid 
deadlocks)
InfiniteTestSource.cancelAllAndAwait(); // emulate end of input
BoundedPassThroughOperator.allowSnapshots();
stop.get(); // <-- completion exception (sometimes)
{code}
 This is what I believe happens in the final stage of the savepoint:
 # The last subtask ACKs the checkpoint
 # CheckpointCoordinator finalizes the checkpoint and sends out confirmations
 # EndOfPartition is generated on sources and flows through the graph
 # Each Subtask notifies the Scheduler about its completion
 # Upon receiving the last notification Scheduler shuts down 
CheckpointCoordinator
 # CheckpointCoordinator aborts all pending checkpoing

Not that Scheduler and CheckpointCoordinator run in different threads.

So if savepoint finalization takes longer then it can be aborted before 
completion.


> Race condition in CheckpointCoordinator in finishing sync savepoint
> -------------------------------------------------------------------
>
>                 Key: FLINK-21233
>                 URL: https://issues.apache.org/jira/browse/FLINK-21233
>             Project: Flink
>          Issue Type: Bug
>          Components: Runtime / Checkpointing
>    Affects Versions: 1.11.3, 1.12.1, 1.13.0
>            Reporter: Roman Khachatryan
>            Priority: Major
>
> I'm writing an integration test and see a failure from time to time (1 per 
> 100 on my machine):
> {code:java}
> Caused by: java.util.concurrent.CompletionException: 
> org.apache.flink.runtime.checkpoint.CheckpointException: 
> CheckpointCoordinator shutdown.
> {code}
>  The test uses a very small graph and takes a synchronous savepoint:
> {code:java}
> client.submitJob(jobGraph).get(); // InfiniteTestSource -> 
> BoundedPassThroughOperator -> DiscardingSink, dop = 1
> BoundedPassThroughOperator.getProgressLatch().await();
> InfiniteTestSource.suspendAll(); // prevent deadlock in cancelAllAndAwait
> CompletableFuture<String> stop = client.stopWithSavepoint(jobId, false, null);
> Thread.sleep(500); // await checkpoint start (not explicit signals to avoid 
> deadlocks)
> InfiniteTestSource.cancelAllAndAwait(); // emulate end of input
> BoundedPassThroughOperator.allowSnapshots();
> stop.get(); // <-- completion exception (sometimes)
> {code}
>  This is what I believe happens in the final stage of the savepoint:
>  # The last subtask ACKs the checkpoint
>  # CheckpointCoordinator finalizes the checkpoint and sends out confirmations
>  # EndOfPartition is generated on sources and flows through the graph
>  # Each Subtask notifies the Scheduler about its completion
>  # Upon receiving the last notification Scheduler shuts down 
> CheckpointCoordinator
>  # CheckpointCoordinator aborts all pending checkpoints
> Not that Scheduler and CheckpointCoordinator run in different threads.
> So if savepoint finalization takes longer then it can be aborted before 
> completion.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to