[ 
https://issues.apache.org/jira/browse/FLINK-25305?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17459219#comment-17459219
 ] 

Yun Gao edited comment on FLINK-25305 at 12/14/21, 3:52 PM:
------------------------------------------------------------

We would need to 
1. Always including the input channel state and result partition state no 
matter the status of the operators and the tasks.
2. Always waiting for the states get done in the asynchronous part. 

Since if a task is fully finished, it must not have actual inputs and outputs, 
thus we could still desert the results of the state futures.



was (Author: gaoyunhaii):
We would need to 
1. Always including the input channel state and result partition state no 
matter the status of the operators and the tasks.
2. Always waiting for the states get done in the asynchronous part. 



> Always wait for input channel state and result partition state get completed 
> in AsyncRunnable
> ---------------------------------------------------------------------------------------------
>
>                 Key: FLINK-25305
>                 URL: https://issues.apache.org/jira/browse/FLINK-25305
>             Project: Flink
>          Issue Type: Sub-task
>          Components: Runtime / Checkpointing
>    Affects Versions: 1.15.0
>            Reporter: Yun Gao
>            Assignee: Yun Gao
>            Priority: Major
>             Fix For: 1.15.0
>
>
>  
> {code:java}
> 29245 [jobmanager-io-thread-12] INFO  
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator [] - Decline 
> checkpoint 16 by task 07fea3eb73acb4898317b4aa2c9fea30 of job 
> da6de908107aa847cde5e9e0beb4812b at 064277c9-73dc-4bf2-8729-91ab16bbe8c6 @ 
> localhost (dataPort=-1).org.apache.flink.util.SerializedThrowable: 
> Asynchronous task checkpoint failed.
>     at 
> org.apache.flink.streaming.runtime.tasks.AsyncCheckpointRunnable.handleExecutionException(AsyncCheckpointRunnable.java:321)
>  ~[classes/:?]
>     at 
> org.apache.flink.streaming.runtime.tasks.AsyncCheckpointRunnable.run(AsyncCheckpointRunnable.java:158)
>  ~[classes/:?]
>     at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>  [?:1.8.0_271]
>     at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>  [?:1.8.0_271]
>     at java.lang.Thread.run(Thread.java:748) [?:1.8.0_271]
> Caused by: org.apache.flink.util.SerializedThrowable: Could not materialize 
> checkpoint 16 for operator keyed (1/5)#5.
>     at 
> org.apache.flink.streaming.runtime.tasks.AsyncCheckpointRunnable.handleExecutionException(AsyncCheckpointRunnable.java:299)
>  ~[classes/:?]
>     ... 4 more
> Caused by: org.apache.flink.util.SerializedThrowable
>     at org.apache.flink.util.Preconditions.checkState(Preconditions.java:177) 
> ~[classes/:?]
>     at 
> org.apache.flink.util.Preconditions.checkCompletedNormally(Preconditions.java:261)
>  ~[classes/:?]
>     at 
> org.apache.flink.util.concurrent.FutureUtils.checkStateAndGet(FutureUtils.java:1193)
>  ~[classes/:?]
>     at 
> org.apache.flink.runtime.checkpoint.CheckpointMetricsBuilder.build(CheckpointMetricsBuilder.java:133)
>  ~[classes/:?]
>     at 
> org.apache.flink.streaming.runtime.tasks.AsyncCheckpointRunnable.reportCompletedSnapshotStates(AsyncCheckpointRunnable.java:248)
>  ~[classes/:?]
>     at 
> org.apache.flink.streaming.runtime.tasks.AsyncCheckpointRunnable.run(AsyncCheckpointRunnable.java:139)
>  ~[classes/:?]
>     ... 3 more
>  {code}
>  
> When both unaligned checkpoint and final checkpoint is enabled, some 
> checkpoints would fail due to the above exception at the async phase, 
> indicating that the checkpoint metric futures are not fully fulfilled. 
> The exception should be caused by when a task is restored with previously 
> fully finished, when taking checkpoint, we would skip snapshotting the state 
> of the operators. Specially, we would also not includes the 
> InputChannelStates and the ResultPartitionState attached to the operator. 
> Then with unaligned checkpoint, there would be the following bad case:
> 1. The task received the first barrier.
> 2. With the process of unaligned checkpoint, the task would snapshot the 
> state of the operators.
> 3. The checkpoint would start the asynchronous part.
> 4. Normally in the asynchronous part, it would wait till all the state 
> futures get done, including the channel states and result partition states. 
> With this method, it ensures the asynchronous part would wait till the last 
> barrier arrived. But if the task has been fully finished before, these states 
> are ignored and the assumption is broken.
> 5. Then the asynchronous part would fail since when it try to build the 
> CheckpointMetrics, the alignment for this checkpoint is in fact not done yet.
>  



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

Reply via email to