Thanks for the update, @yanghua. Looking at the checkpoint coordinator more 
deeply, I think we firstly have to work a bit more on design for this kind of 
change. We have to take into account at least the following points (roughly):

- Introduce the separate component/class responsible for failure management, 
counting e.g. `CheckpointFailureManager` or something

- Job manager should 
    - construct `CheckpointFailureManager`
    - configure with the max failure count and a proper action how to fail.
    - pass it to `CheckpointCoordinator`

- `CheckpointCoordinator`
    - should give callbacks to `CheckpointFailureManager` about failures and 
successes of checkpoints
    - needs some refactoring to distinguish better failures of 
`PendingCheckpoint`. Now it is `abortXXX()` methods which do not provide enough 
information to count it as a failure for `CheckpointFailureManager` or not 
(like we have for `triggerSavepoint` in `CheckpointDeclineReason`). At the end 
there should be clear place in the `CheckpointCoordinator` where to give 
callbacks to `CheckpointFailureManager`, e.g.:
        - `CheckpointDeclineReason.EXCEPTION` result of `triggerSavepoint`
        - some cases of `PendingCheckpoint.abortDeclined()`, `abortError()`, 
maybe `abortExpired()`

- Consider having only `DecliningCheckpointExceptionHandler` on `TaskExecutor` 
side and letting now to handle all failure cases only in `CheckpointCoordinator`

There might be more points. I suggest we step back and continue discussion in 
the jira issue. Once we have clear design, a PR can be opened again.


[ Full content available at: https://github.com/apache/flink/pull/6567 ]
This message was relayed via gitbox.apache.org for [email protected]

Reply via email to