[
https://issues.apache.org/jira/browse/FLINK-21990?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
ming li updated FLINK-21990:
----------------------------
Description:
If the source in SourceStreamTask implements {{CheckpointedFunction}} and an
exception is thrown in the snapshotState method, then the SourceStreamTask will
always hang.
The main reason is that the checkpoint is executed in the mailbox. When the
CheckpointedFunction#snapshotState of the source throws an exception, the
StreamTask#cleanUpInvoke will be called, where it will wait for the end of the
{{LegacySourceFunctionThread}} of the source. However, the source thread does
not end by itself (this requires the user to control it), the Task will hang at
this time, and the JobMaster has no perception of this behavior.
was:
We need to double check Task status when making Checkpoint. Otherwise, after a
Task failed, the checkpoint may still be made successfully.
For example, I try to throw an exception at 17:10:24.069, get the lock at
17:10:24.070 and start making Checkpoint, and finish making Checkpoint at
17:10:24.373.
{code:java}
17:10:24.069 [Legacy Source Thread - Source_Custom_Source -> Sink_Unnamed
(2/4)- execution # 0] INFO
org.apache.flink.test.checkpointing.RegionCheckpointITCase - throw expected
exception
17:10:24.070 [Source_Custom_Source -> Sink_Unnamed (2/4)- execution # 0] INFO
org.apache.flink.runtime.state.AbstractSnapshotStrategy -
DefaultOperatorStateBackend snapshot (In-Memory Stream Factory, synchronous
part) in thread Thread[Source_Custom_Source -> Sink_Unnamed (2/4)- execution #
0,5,Flink Task Threads] took 0 ms.
17:10:24.070 [Source_Custom_Source -> Sink_Unnamed (2/4)- execution # 0] INFO
org.apache.flink.test.checkpointing.RegionCheckpointITCase - sleep 300 ms
17:10:24.372 [Source_Custom_Source -> Sink_Unnamed (2/4)- execution # 0] INFO
org.apache.flink.runtime.state.AbstractSnapshotStrategy -
DefaultOperatorStateBackend snapshot (In-Memory Stream Factory, synchronous
part) in thread Thread[Source_Custom_Source -> Sink_Unnamed (2/4)- execution #
0,5,Flink Task Threads] took 0 ms.
17:10:24.373 [jobmanager-future-thread-1] INFO
org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Completed
checkpoint 2 for job 4a08c4a50d00dfd56f86eb6ccb83b89c (0 bytes in 1137 ms).
{code}
>From the code point of view, we only judged the state of the task at the
>beginning, and when the lock was obtained, we directly started to make the
>Checkpoint.
{code:java}
private boolean performCheckpoint(
CheckpointMetaData checkpointMetaData,
CheckpointOptions checkpointOptions,
CheckpointMetricsBuilder checkpointMetrics)
throws Exception {
if (isRunning) {
actionExecutor.runThrowing(
() -> {//do checkpoint});
return true;
} else {
...
}
}{code}
However, during the period of acquiring the lock, the task state is likely to
change. Compared with the Flink 1.9 version code, the 1.9 version judges the
task status after acquiring the lock.
{code:java}
private boolean performCheckpoint(
CheckpointMetaData checkpointMetaData,
CheckpointOptions checkpointOptions,
CheckpointMetrics checkpointMetrics,
boolean advanceToEndOfTime) throws Exception {
LOG.debug("Starting checkpoint ({}) {} on task {}",
checkpointMetaData.getCheckpointId(),
checkpointOptions.getCheckpointType(), getName());
final long checkpointId = checkpointMetaData.getCheckpointId();
synchronized (lock) {
if (isRunning) {
//do checkpoint
} else {
...
}
}{code}
Therefore, I think we need to double check the task status to avoid the
situation where the task fails but the Checkpoint can still succeed in the
process of acquiring the lock.
> SourceStreamTask will always hang if the CheckpointedFunction#snapshotState
> throws an exception.
> ------------------------------------------------------------------------------------------------
>
> Key: FLINK-21990
> URL: https://issues.apache.org/jira/browse/FLINK-21990
> Project: Flink
> Issue Type: Bug
> Affects Versions: 1.11.0
> Reporter: ming li
> Priority: Major
>
> If the source in SourceStreamTask implements {{CheckpointedFunction}} and an
> exception is thrown in the snapshotState method, then the SourceStreamTask
> will always hang.
> The main reason is that the checkpoint is executed in the mailbox. When the
> CheckpointedFunction#snapshotState of the source throws an exception, the
> StreamTask#cleanUpInvoke will be called, where it will wait for the end of
> the {{LegacySourceFunctionThread}} of the source. However, the source thread
> does not end by itself (this requires the user to control it), the Task will
> hang at this time, and the JobMaster has no perception of this behavior.
>
--
This message was sent by Atlassian Jira
(v8.3.4#803005)