[ 
https://issues.apache.org/jira/browse/FLINK-21990?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ming li updated FLINK-21990:
----------------------------
    Summary: SourceStreamTask will always hang if the 
CheckpointedFunction#snapshotState throws an exception.  (was: Double check 
Task status when perform checkpoint.)

> SourceStreamTask will always hang if the CheckpointedFunction#snapshotState 
> throws an exception.
> ------------------------------------------------------------------------------------------------
>
>                 Key: FLINK-21990
>                 URL: https://issues.apache.org/jira/browse/FLINK-21990
>             Project: Flink
>          Issue Type: Bug
>    Affects Versions: 1.11.0
>            Reporter: ming li
>            Priority: Major
>
> We need to double check Task status when making Checkpoint. Otherwise, after 
> a Task failed, the checkpoint may still be made successfully.
> For example, I try to throw an exception at 17:10:24.069, get the lock at 
> 17:10:24.070 and start making Checkpoint, and finish making Checkpoint at 
> 17:10:24.373.
> {code:java}
> 17:10:24.069 [Legacy Source Thread - Source_Custom_Source -> Sink_Unnamed 
> (2/4)- execution # 0] INFO  
> org.apache.flink.test.checkpointing.RegionCheckpointITCase  - throw expected 
> exception
> 17:10:24.070 [Source_Custom_Source -> Sink_Unnamed (2/4)- execution # 0] INFO 
>  org.apache.flink.runtime.state.AbstractSnapshotStrategy  - 
> DefaultOperatorStateBackend snapshot (In-Memory Stream Factory, synchronous 
> part) in thread Thread[Source_Custom_Source -> Sink_Unnamed (2/4)- execution 
> # 0,5,Flink Task Threads] took 0 ms.
> 17:10:24.070 [Source_Custom_Source -> Sink_Unnamed (2/4)- execution # 0] INFO 
>  org.apache.flink.test.checkpointing.RegionCheckpointITCase  - sleep 300 ms
> 17:10:24.372 [Source_Custom_Source -> Sink_Unnamed (2/4)- execution # 0] INFO 
>  org.apache.flink.runtime.state.AbstractSnapshotStrategy  - 
> DefaultOperatorStateBackend snapshot (In-Memory Stream Factory, synchronous 
> part) in thread Thread[Source_Custom_Source -> Sink_Unnamed (2/4)- execution 
> # 0,5,Flink Task Threads] took 0 ms.
> 17:10:24.373 [jobmanager-future-thread-1] INFO  
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator  - Completed 
> checkpoint 2 for job 4a08c4a50d00dfd56f86eb6ccb83b89c (0 bytes in 1137 ms).
> {code}
> From the code point of view, we only judged the state of the task at the 
> beginning, and when the lock was obtained, we directly started to make the 
> Checkpoint.
> {code:java}
> private boolean performCheckpoint(
>         CheckpointMetaData checkpointMetaData,
>         CheckpointOptions checkpointOptions,
>         CheckpointMetricsBuilder checkpointMetrics)
>         throws Exception {
>     if (isRunning) {
>         actionExecutor.runThrowing(
>                 () -> {//do checkpoint});
>         return true;
>     } else {
>         ...
>     }
> }{code}
> However, during the period of acquiring the lock, the task state is likely to 
> change. Compared with the Flink 1.9 version code, the 1.9 version judges the 
> task status after acquiring the lock.
> {code:java}
> private boolean performCheckpoint(
>       CheckpointMetaData checkpointMetaData,
>       CheckpointOptions checkpointOptions,
>       CheckpointMetrics checkpointMetrics,
>       boolean advanceToEndOfTime) throws Exception {
>    LOG.debug("Starting checkpoint ({}) {} on task {}",
>       checkpointMetaData.getCheckpointId(), 
> checkpointOptions.getCheckpointType(), getName());
>    final long checkpointId = checkpointMetaData.getCheckpointId();
>    synchronized (lock) {
>       if (isRunning) {
>          //do checkpoint
>       } else {
>         ...
>       }
>  }{code}
> Therefore, I think we need to double check the task status to avoid the 
> situation where the task fails but the Checkpoint can still succeed in the 
> process of acquiring the lock.
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to