[ 
https://issues.apache.org/jira/browse/FLINK-3257?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15829678#comment-15829678
 ] 

ASF GitHub Bot commented on FLINK-3257:
---------------------------------------

Github user senorcarbone commented on the issue:

    https://github.com/apache/flink/pull/1668
  
    The last update implements a variant of what @StephanEwen proposes. We have 
put some more thought on this offline too thanks to @gyfora ! The idea is that 
instead of putting records to each `ListState`, the output log is partitioned 
into multiple log "slices", one per concurrent checkpoint.
    
    More specifically, the `UpstreamLogger` operator at the `IterationHead` 
slices logs proportionally to the number of concurrent snapshots. This also 
allows committed output logs to be uniquely identified and cleared after each 
complete checkpoint. The design is based on the following assumptions:
    
    - A slice is named after a checkpoint ID. Checkpoint IDs are numerically 
ordered within an execution.
    - Each checkpoint barrier arrives back in FIFO order, thus we discard log 
slices in respective FIFO order.
     - Upon restoration the logger sorts sliced logs in the same FIFO order and 
returns an Iterable that gives a singular view of the log.
    
    Before I polish this we need to close a memory leak. The `clear` operation 
of `State` cleans the state under the registered id but it does not seem to 
unregister the key itself. Does anyone have an idea on how to unregister state 
properly? Hope this gets some attention to wrap it up, it's been too long :). 



> Add Exactly-Once Processing Guarantees in Iterative DataStream Jobs
> -------------------------------------------------------------------
>
>                 Key: FLINK-3257
>                 URL: https://issues.apache.org/jira/browse/FLINK-3257
>             Project: Flink
>          Issue Type: Improvement
>            Reporter: Paris Carbone
>            Assignee: Paris Carbone
>
> The current snapshotting algorithm cannot support cycles in the execution 
> graph. An alternative scheme can potentially include records in-transit 
> through the back-edges of a cyclic execution graph (ABS [1]) to achieve the 
> same guarantees.
> One straightforward implementation of ABS for cyclic graphs can work as 
> follows along the lines:
> 1) Upon triggering a barrier in an IterationHead from the TaskManager start 
> block output and start upstream backup of all records forwarded from the 
> respective IterationSink.
> 2) The IterationSink should eventually forward the current snapshotting epoch 
> barrier to the IterationSource.
> 3) Upon receiving a barrier from the IterationSink, the IterationSource 
> should finalize the snapshot, unblock its output and emit all records 
> in-transit in FIFO order and continue the usual execution.
> --
> Upon restart the IterationSource should emit all records from the injected 
> snapshot first and then continue its usual execution.
> Several optimisations and slight variations can be potentially achieved but 
> this can be the initial implementation take.
> [1] http://arxiv.org/abs/1506.08603



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to