[
https://issues.apache.org/jira/browse/FLINK-21990?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
ming li updated FLINK-21990:
----------------------------
Summary: SourceStreamTask will always hang if the
CheckpointedFunction#snapshotState throws an exception. (was: Double check
Task status when perform checkpoint.)
> SourceStreamTask will always hang if the CheckpointedFunction#snapshotState
> throws an exception.
> ------------------------------------------------------------------------------------------------
>
> Key: FLINK-21990
> URL: https://issues.apache.org/jira/browse/FLINK-21990
> Project: Flink
> Issue Type: Bug
> Affects Versions: 1.11.0
> Reporter: ming li
> Priority: Major
>
> We need to double check Task status when making Checkpoint. Otherwise, after
> a Task failed, the checkpoint may still be made successfully.
> For example, I try to throw an exception at 17:10:24.069, get the lock at
> 17:10:24.070 and start making Checkpoint, and finish making Checkpoint at
> 17:10:24.373.
> {code:java}
> 17:10:24.069 [Legacy Source Thread - Source_Custom_Source -> Sink_Unnamed
> (2/4)- execution # 0] INFO
> org.apache.flink.test.checkpointing.RegionCheckpointITCase - throw expected
> exception
> 17:10:24.070 [Source_Custom_Source -> Sink_Unnamed (2/4)- execution # 0] INFO
> org.apache.flink.runtime.state.AbstractSnapshotStrategy -
> DefaultOperatorStateBackend snapshot (In-Memory Stream Factory, synchronous
> part) in thread Thread[Source_Custom_Source -> Sink_Unnamed (2/4)- execution
> # 0,5,Flink Task Threads] took 0 ms.
> 17:10:24.070 [Source_Custom_Source -> Sink_Unnamed (2/4)- execution # 0] INFO
> org.apache.flink.test.checkpointing.RegionCheckpointITCase - sleep 300 ms
> 17:10:24.372 [Source_Custom_Source -> Sink_Unnamed (2/4)- execution # 0] INFO
> org.apache.flink.runtime.state.AbstractSnapshotStrategy -
> DefaultOperatorStateBackend snapshot (In-Memory Stream Factory, synchronous
> part) in thread Thread[Source_Custom_Source -> Sink_Unnamed (2/4)- execution
> # 0,5,Flink Task Threads] took 0 ms.
> 17:10:24.373 [jobmanager-future-thread-1] INFO
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Completed
> checkpoint 2 for job 4a08c4a50d00dfd56f86eb6ccb83b89c (0 bytes in 1137 ms).
> {code}
> From the code point of view, we only judged the state of the task at the
> beginning, and when the lock was obtained, we directly started to make the
> Checkpoint.
> {code:java}
> private boolean performCheckpoint(
> CheckpointMetaData checkpointMetaData,
> CheckpointOptions checkpointOptions,
> CheckpointMetricsBuilder checkpointMetrics)
> throws Exception {
> if (isRunning) {
> actionExecutor.runThrowing(
> () -> {//do checkpoint});
> return true;
> } else {
> ...
> }
> }{code}
> However, during the period of acquiring the lock, the task state is likely to
> change. Compared with the Flink 1.9 version code, the 1.9 version judges the
> task status after acquiring the lock.
> {code:java}
> private boolean performCheckpoint(
> CheckpointMetaData checkpointMetaData,
> CheckpointOptions checkpointOptions,
> CheckpointMetrics checkpointMetrics,
> boolean advanceToEndOfTime) throws Exception {
> LOG.debug("Starting checkpoint ({}) {} on task {}",
> checkpointMetaData.getCheckpointId(),
> checkpointOptions.getCheckpointType(), getName());
> final long checkpointId = checkpointMetaData.getCheckpointId();
> synchronized (lock) {
> if (isRunning) {
> //do checkpoint
> } else {
> ...
> }
> }{code}
> Therefore, I think we need to double check the task status to avoid the
> situation where the task fails but the Checkpoint can still succeed in the
> process of acquiring the lock.
>
--
This message was sent by Atlassian Jira
(v8.3.4#803005)