[
https://issues.apache.org/jira/browse/FLINK-17350?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17090608#comment-17090608
]
Piotr Nowojski commented on FLINK-17350:
----------------------------------------
There are two potential solutions.
Option I.
We should just fail the task immediately on any failure in synchronous
checkpoint part as we did before
https://issues.apache.org/jira/browse/FLINK-4809 . This also includes failures
in {{notifyCheckpointCompleted}} calls.
Option II.
If we want to keep the {{setTolerableCheckpointFailureNumber(...)}} for
synchronous failures, as some operators may be able to tolerate
snapshot/committing failures, It would have to be implemented inside operators:
FlinkKafkaProducer would have to remember an exception and keep re-throwing it,
or throw "Rejecting writes because of a previous failure". Or provide some API
(interface? getter boolean canTolerateCheckpointingFailure()? wrapping an
exception with FlinkAbortCheckpointException(...) and tolerate failures of only
exception type?) for StreamTask to detect if operator can recover from
checkpoint failures or not and implement this logic somewhere in the
StreamTask/OperatorChain/OperatorWrapper
I'm strongly in favour of Option I, as it's much simpler and Option II doesn't
seem to be adding much value and would require support from the operators side.
Asynchronous failures are acceptable, as the in-memory state of
operators/functions is kept consistent, just waiting for
{{notifyCheckpointCompleted}} call, so from the operators' perspective,
ignoring asynchronous failure of Checkpoint N and successfully completing
Checkpoint N+1 is indistinguishable from {{notifyCheckpointCompleted}} for
Checkpoint N being lost in transit - which is a valid and correctly handled
scenario by our exactly once sinks.
> StreamTask should always fail immediately on failures in synchronous part of
> a checkpoint
> -----------------------------------------------------------------------------------------
>
> Key: FLINK-17350
> URL: https://issues.apache.org/jira/browse/FLINK-17350
> Project: Flink
> Issue Type: Bug
> Components: Runtime / Checkpointing, Runtime / Task
> Affects Versions: 1.6.4, 1.7.2, 1.8.3, 1.9.2, 1.10.0
> Reporter: Piotr Nowojski
> Priority: Critical
> Fix For: 1.11.0
>
>
> This bugs also Affects 1.5.x branch.
> As described in point 1 here:
> https://issues.apache.org/jira/browse/FLINK-17327?focusedCommentId=17090576&page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-17090576
> {{setTolerableCheckpointFailureNumber(...)}} and its deprecated
> {{setFailTaskOnCheckpointError(...)}} predecessor are implemented
> incorrectly. Since Flink 1.5
> (https://issues.apache.org/jira/browse/FLINK-4809) they can lead to operators
> (and especially sinks with an external state) end up in an inconsistent
> state. That's also true even if they are not used, because of another issue:
> FLINK-17351
> If we mix this with intermittent external system failure. Sink reports an
> exception, transaction was lost/aborted, Sink is in failed state, but if
> there will be a happy coincidence that it manages to accept further records,
> this exception can be lost and all of the records in those failed checkpoints
> will be lost forever as well.
> For details please check FLINK-17327.
--
This message was sent by Atlassian Jira
(v8.3.4#803005)