[ 
https://issues.apache.org/jira/browse/FLINK-21990?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ming li updated FLINK-21990:
----------------------------
    Description: 
If the source in SourceStreamTask implements {{CheckpointedFunction}} and an 
exception is thrown in the snapshotState method, then the SourceStreamTask will 
always hang.

The main reason is that the checkpoint is executed in the mailbox. When the 
CheckpointedFunction#snapshotState  of the source throws an exception, the 
StreamTask#cleanUpInvoke will be called, where it will wait for the end of the 
{{LegacySourceFunctionThread}} of the source. However, the source thread does 
not end by itself (this requires the user to control it), the Task will hang at 
this time, and the JobMaster has no perception of this behavior.

 

  was:
We need to double check Task status when making Checkpoint. Otherwise, after a 
Task failed, the checkpoint may still be made successfully.

For example, I try to throw an exception at 17:10:24.069, get the lock at 
17:10:24.070 and start making Checkpoint, and finish making Checkpoint at 
17:10:24.373.
{code:java}
17:10:24.069 [Legacy Source Thread - Source_Custom_Source -> Sink_Unnamed 
(2/4)- execution # 0] INFO  
org.apache.flink.test.checkpointing.RegionCheckpointITCase  - throw expected 
exception
17:10:24.070 [Source_Custom_Source -> Sink_Unnamed (2/4)- execution # 0] INFO  
org.apache.flink.runtime.state.AbstractSnapshotStrategy  - 
DefaultOperatorStateBackend snapshot (In-Memory Stream Factory, synchronous 
part) in thread Thread[Source_Custom_Source -> Sink_Unnamed (2/4)- execution # 
0,5,Flink Task Threads] took 0 ms.
17:10:24.070 [Source_Custom_Source -> Sink_Unnamed (2/4)- execution # 0] INFO  
org.apache.flink.test.checkpointing.RegionCheckpointITCase  - sleep 300 ms
17:10:24.372 [Source_Custom_Source -> Sink_Unnamed (2/4)- execution # 0] INFO  
org.apache.flink.runtime.state.AbstractSnapshotStrategy  - 
DefaultOperatorStateBackend snapshot (In-Memory Stream Factory, synchronous 
part) in thread Thread[Source_Custom_Source -> Sink_Unnamed (2/4)- execution # 
0,5,Flink Task Threads] took 0 ms.
17:10:24.373 [jobmanager-future-thread-1] INFO  
org.apache.flink.runtime.checkpoint.CheckpointCoordinator  - Completed 
checkpoint 2 for job 4a08c4a50d00dfd56f86eb6ccb83b89c (0 bytes in 1137 ms).

{code}
>From the code point of view, we only judged the state of the task at the 
>beginning, and when the lock was obtained, we directly started to make the 
>Checkpoint.
{code:java}
private boolean performCheckpoint(
        CheckpointMetaData checkpointMetaData,
        CheckpointOptions checkpointOptions,
        CheckpointMetricsBuilder checkpointMetrics)
        throws Exception {

    if (isRunning) {
        actionExecutor.runThrowing(
                () -> {//do checkpoint});
        return true;
    } else {
        ...
    }
}{code}
However, during the period of acquiring the lock, the task state is likely to 
change. Compared with the Flink 1.9 version code, the 1.9 version judges the 
task status after acquiring the lock.
{code:java}
private boolean performCheckpoint(
      CheckpointMetaData checkpointMetaData,
      CheckpointOptions checkpointOptions,
      CheckpointMetrics checkpointMetrics,
      boolean advanceToEndOfTime) throws Exception {

   LOG.debug("Starting checkpoint ({}) {} on task {}",
      checkpointMetaData.getCheckpointId(), 
checkpointOptions.getCheckpointType(), getName());

   final long checkpointId = checkpointMetaData.getCheckpointId();

   synchronized (lock) {
      if (isRunning) {
         //do checkpoint
      } else {
        ...
      }
 }{code}
Therefore, I think we need to double check the task status to avoid the 
situation where the task fails but the Checkpoint can still succeed in the 
process of acquiring the lock.

 


> SourceStreamTask will always hang if the CheckpointedFunction#snapshotState 
> throws an exception.
> ------------------------------------------------------------------------------------------------
>
>                 Key: FLINK-21990
>                 URL: https://issues.apache.org/jira/browse/FLINK-21990
>             Project: Flink
>          Issue Type: Bug
>    Affects Versions: 1.11.0
>            Reporter: ming li
>            Priority: Major
>
> If the source in SourceStreamTask implements {{CheckpointedFunction}} and an 
> exception is thrown in the snapshotState method, then the SourceStreamTask 
> will always hang.
> The main reason is that the checkpoint is executed in the mailbox. When the 
> CheckpointedFunction#snapshotState  of the source throws an exception, the 
> StreamTask#cleanUpInvoke will be called, where it will wait for the end of 
> the {{LegacySourceFunctionThread}} of the source. However, the source thread 
> does not end by itself (this requires the user to control it), the Task will 
> hang at this time, and the JobMaster has no perception of this behavior.
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to