[ 
https://issues.apache.org/jira/browse/FLINK-3257?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15152692#comment-15152692
 ] 

ASF GitHub Bot commented on FLINK-3257:
---------------------------------------

GitHub user senorcarbone opened a pull request:

    https://github.com/apache/flink/pull/1668

    [FLINK-3257] Add Exactly-Once Processing Guarantees for Iterative 
DataStream Jobs

    # **[WIP]**
    This is a first version of the adapted snapshot algorithm to support 
iterations. It is correct and works in practice...well, when memory capacity is 
enough for its logging requirements but I am working on that, hopefully with a 
little help from you. Before we go into the implementation details let me 
describe briefly the new algorithm.
    
    ## Algorithm
    
    Our existing checkpoint algorithm has a very fluid and straightforward 
protocol. It just makes sure that all checkpoint barriers are aligned in each 
operator so that all records before barriers (pre-shot) are processed before 
taking a snapshot. Since waiting indefinitely for all records in-transit within 
a cycle of an execution graph can violate termination (crucial liveness 
property) we have to...save any unprocessed records for later during the 
snapshot. In this take of the algorithm on Flink we assign that role to the 
`Iteration Head`. The steps this version varies from the vanilla algorithm are 
simply the following:
    
    1. An `Iteration Head` receives a barrier from the system runtime (as 
before) and:
    
    -  Goes into **Logging Mode**. That means that from that point on every 
record it receives from its `Iteration Sink` is buffered in its own operator 
state and **not** forwarded further until it goes back to normal mode.
    - **Forwards** the barrier to its downstream nodes (this guarantees 
liveness, otherwise we have a deadlock).
    
    2. Eventually, the `Iteration Head` receives
     
    
    ## Example
    
    blabla
    
    
![ftloops-topology](https://cloud.githubusercontent.com/assets/858078/13151679/7f150538-d66b-11e5-98c8-7bbe2243b810.png)
    
    
    blabla
    
    
![diagram](https://cloud.githubusercontent.com/assets/858078/13151664/7361a638-d66b-11e5-94e9-64f70a8130d7.png)
    
    ## Current Implementation Details
    
    ## Open/Pending Issues
    
    


You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/senorcarbone/flink ftloops

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/flink/pull/1668.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #1668
    
----
commit 38256e4c4bb00183794699027e8e4298787c66fa
Author: Paris Carbone <[email protected]>
Date:   2016-01-19T10:18:54Z

    exactly-once processing test for stream iterations

commit dbf2625536289dc70724ef798fd02989e586d874
Author: Paris Carbone <[email protected]>
Date:   2016-02-18T15:49:19Z

    [wip] adapt snapshot mechanism for iterative jobs

----


> Add Exactly-Once Processing Guarantees in Iterative DataStream Jobs
> -------------------------------------------------------------------
>
>                 Key: FLINK-3257
>                 URL: https://issues.apache.org/jira/browse/FLINK-3257
>             Project: Flink
>          Issue Type: Improvement
>            Reporter: Paris Carbone
>            Assignee: Paris Carbone
>
> The current snapshotting algorithm cannot support cycles in the execution 
> graph. An alternative scheme can potentially include records in-transit 
> through the back-edges of a cyclic execution graph (ABS [1]) to achieve the 
> same guarantees.
> One straightforward implementation of ABS for cyclic graphs can work as 
> follows along the lines:
> 1) Upon triggering a barrier in an IterationHead from the TaskManager start 
> block output and start upstream backup of all records forwarded from the 
> respective IterationSink.
> 2) The IterationSink should eventually forward the current snapshotting epoch 
> barrier to the IterationSource.
> 3) Upon receiving a barrier from the IterationSink, the IterationSource 
> should finalize the snapshot, unblock its output and emit all records 
> in-transit in FIFO order and continue the usual execution.
> --
> Upon restart the IterationSource should emit all records from the injected 
> snapshot first and then continue its usual execution.
> Several optimisations and slight variations can be potentially achieved but 
> this can be the initial implementation take.
> [1] http://arxiv.org/abs/1506.08603



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to