[ 
https://issues.apache.org/jira/browse/FLINK-3257?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15154030#comment-15154030
 ] 

ASF GitHub Bot commented on FLINK-3257:
---------------------------------------

Github user senorcarbone commented on the pull request:

    https://github.com/apache/flink/pull/1668#issuecomment-186147597
  
    Thanks for going through it Gyula!
    
    I agree, the `ForwardingOneInputStreamTask` can be integrated in the 
`StreamIterationTail`, I will probably do that. I just wanted to get the code 
look less messy in the tail part but you are right.
    
    About the operators, the `RecordPusher` was already there (even though it 
was not initialised correctly). I just added the `UpstreamLogger` because I 
wanted it to follow the operator lifecycle. I think the way it is we do not 
need to override the `checkpointStatesInternal` to do the changes we discuss. 
We just need to change the operator callback method and this could also be more 
robust to changes in the StreamTask and operator interfaces and default 
behavior, just my personal view but I see your point too.
    
    I agree with the spill buffer logic. I am only confused a bit with the 
output stream thing (the other part of the problem), is there something already 
I can use? I haven't check recent changes. How does this work if we use 
in-memory backend for example? The blob with all the messages will be anyway 
packaged and sent within the stateHandle to the job manager in a single message 
(potentially being over the limits), even if we use a stream API, or?


> Add Exactly-Once Processing Guarantees in Iterative DataStream Jobs
> -------------------------------------------------------------------
>
>                 Key: FLINK-3257
>                 URL: https://issues.apache.org/jira/browse/FLINK-3257
>             Project: Flink
>          Issue Type: Improvement
>            Reporter: Paris Carbone
>            Assignee: Paris Carbone
>
> The current snapshotting algorithm cannot support cycles in the execution 
> graph. An alternative scheme can potentially include records in-transit 
> through the back-edges of a cyclic execution graph (ABS [1]) to achieve the 
> same guarantees.
> One straightforward implementation of ABS for cyclic graphs can work as 
> follows along the lines:
> 1) Upon triggering a barrier in an IterationHead from the TaskManager start 
> block output and start upstream backup of all records forwarded from the 
> respective IterationSink.
> 2) The IterationSink should eventually forward the current snapshotting epoch 
> barrier to the IterationSource.
> 3) Upon receiving a barrier from the IterationSink, the IterationSource 
> should finalize the snapshot, unblock its output and emit all records 
> in-transit in FIFO order and continue the usual execution.
> --
> Upon restart the IterationSource should emit all records from the injected 
> snapshot first and then continue its usual execution.
> Several optimisations and slight variations can be potentially achieved but 
> this can be the initial implementation take.
> [1] http://arxiv.org/abs/1506.08603



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to