[ 
https://issues.apache.org/jira/browse/FLINK-19113?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17189117#comment-17189117
 ] 

Piotr Nowojski commented on FLINK-19113:
----------------------------------------

Another reason why this check in {{StreamingJobGraphGenerator}} exists, is a 
potential deadlock with aligned checkpoints. There must be data flow guaranteed 
in order for {{CheckpointBarriers}} to reach the input selectable operator 
eventually. As it is, if we are selecting the first input and blocking the 
second one, we might never receive checkpoint barrier for the second input.

Unaligned checkpoints are partially solving this problem, but only partially. 
The design goal for unaligned checkpoints is that the {{CheckpointBarriers}} 
should flow through the job graph, even if the pipeline is stalled. However for 
that to happen, tasks can not be blocked. If upstream tasks from second input 
are blocked for whatever the reason (like trying to serialise large record or 
exploding flat map case), they won't be able to process even unaligned 
checkpoints, so the checkpoint would again deadlock.

There is also a deadlock problem with diamond pattern even without 
checkpointing.

> Add support for checkpointing with selectable inputs
> ----------------------------------------------------
>
>                 Key: FLINK-19113
>                 URL: https://issues.apache.org/jira/browse/FLINK-19113
>             Project: Flink
>          Issue Type: Improvement
>          Components: Runtime / Checkpointing
>    Affects Versions: 1.12.0
>            Reporter: Roman Khachatryan
>            Priority: Major
>
> Currently, there is a validation in StreamingJobGraphGenerator that fails if 
> an operator implements InputSelectable and checkpointing is enabled.
>  
> One issue when removing this validation is that with Unaligned checkpoints 
> recovery would deadlock if there are records spanning multiple buffers.
> Consider the following case:
> - Two {{IntputGate}}s
> - Input selection is not ALL (say FIRST initially)
> - Unaligned Checkpoints ON
> - on recovery, there are "parts" of records in all channels (actually 1 is 
> enough I think)
> On recovery,
> 1. {{StreamTask}} initiates recovery and scedule partition request upon it's 
> end
> 2. All gates and channels will *receive* buffers from {{StateReader}}
> 3. All channels of a *single* gate will *consume* those state buffers - 
> completing that gate's {{StateConsumedFuture}}
> 4. {{InputProcessor}} will return {{NOTHING_AVAILABLE}} (see 
> {{StreamTwoInputProcessor.getInputStatus}})
> 5. {{StreamTask}} will suspend its default action
> 6. State of the 2nd gate won't be consumed - so its {{StateConsumedFuture}}s 
> won't be completed - so no partitions will be requested (edited) 
>  
> A simple solution is to request partitions for each channel independently.
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to