[ 
https://issues.apache.org/jira/browse/FLINK-17350?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17090608#comment-17090608
 ] 

Piotr Nowojski commented on FLINK-17350:
----------------------------------------

There are two potential solutions. 

Option I.

We should just fail the task immediately on any failure in synchronous 
checkpoint part as we did before 
https://issues.apache.org/jira/browse/FLINK-4809 . This also includes failures 
in {{notifyCheckpointCompleted}} calls. 

Option II.

If we want to keep the {{setTolerableCheckpointFailureNumber(...)}} for 
synchronous failures, as some operators may be able to tolerate 
snapshot/committing failures, It would have to be implemented inside operators:
FlinkKafkaProducer would have to remember an exception and keep re-throwing it, 
or throw "Rejecting writes because of a previous failure". Or provide some API 
(interface? getter boolean canTolerateCheckpointingFailure()? wrapping an 
exception with FlinkAbortCheckpointException(...) and tolerate failures of only 
exception type?) for StreamTask to detect if operator can recover from 
checkpoint failures or not and implement this logic somewhere in the 
StreamTask/OperatorChain/OperatorWrapper

I'm strongly in favour of Option I, as it's much simpler and Option II doesn't 
seem to be adding much value and would require support from the operators side.

Asynchronous failures are acceptable, as the in-memory state of 
operators/functions is kept consistent, just waiting for 
{{notifyCheckpointCompleted}} call, so from the operators' perspective, 
ignoring asynchronous failure of Checkpoint N and successfully completing 
Checkpoint N+1 is indistinguishable from {{notifyCheckpointCompleted}} for 
Checkpoint N being lost in transit - which is a valid and correctly handled 
scenario by our exactly once sinks.

> StreamTask should always fail immediately on failures in synchronous part of 
> a checkpoint
> -----------------------------------------------------------------------------------------
>
>                 Key: FLINK-17350
>                 URL: https://issues.apache.org/jira/browse/FLINK-17350
>             Project: Flink
>          Issue Type: Bug
>          Components: Runtime / Checkpointing, Runtime / Task
>    Affects Versions: 1.6.4, 1.7.2, 1.8.3, 1.9.2, 1.10.0
>            Reporter: Piotr Nowojski
>            Priority: Critical
>             Fix For: 1.11.0
>
>
> This bugs also Affects 1.5.x branch.
> As described in point 1 here: 
> https://issues.apache.org/jira/browse/FLINK-17327?focusedCommentId=17090576&page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-17090576
> {{setTolerableCheckpointFailureNumber(...)}} and its deprecated 
> {{setFailTaskOnCheckpointError(...)}} predecessor are implemented 
> incorrectly. Since Flink 1.5 
> (https://issues.apache.org/jira/browse/FLINK-4809) they can lead to operators 
> (and especially sinks with an external state) end up in an inconsistent 
> state. That's also true even if they are not used, because of another issue: 
> FLINK-17351
> If we mix this with intermittent external system failure. Sink reports an 
> exception, transaction was lost/aborted, Sink is in failed state, but if 
> there will be a happy coincidence that it manages to accept further records, 
> this exception can be lost and all of the records in those failed checkpoints 
> will be lost forever as well.
> For details please check FLINK-17327.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to