[
https://issues.apache.org/jira/browse/FLINK-3257?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15154001#comment-15154001
]
ASF GitHub Bot commented on FLINK-3257:
---------------------------------------
Github user gyfora commented on the pull request:
https://github.com/apache/flink/pull/1668#issuecomment-186142276
Thanks Paris, I like the idea, it's a correct modification of the original
algorithm to make it much easier to implement on Flink for the price of
buffering more records.
I have some comments to the implementation:
- Why did you introduce the ForwardingOneInputStreamTask if only the
Iteration tail will implement this behaviour? I am not sure if other
StreamTasks will do this in the future, so it might be better to just put the
logic in the StreamIterationTail instead of adding a new abstraction
- I think the RecordPusher and UpstreamLogger embedded operators on the
head and tail tasks are not really necessary and just add additional complexity
to debugging and understanding the code. The Upstream logger only makes
checkpointing "easier" for the implementer but we probably want to do some
custom logic there anyways. So I would probably just overwrite the
checkpointStatesInternal method directly.
I agree that we definitely need to spill the buffered records to disk or
use managed memory for this. This can be similar to the BarrierBuffer logic. We
can combine this with checkpointing to an output stream to never have to
materialize the full state.
> Add Exactly-Once Processing Guarantees in Iterative DataStream Jobs
> -------------------------------------------------------------------
>
> Key: FLINK-3257
> URL: https://issues.apache.org/jira/browse/FLINK-3257
> Project: Flink
> Issue Type: Improvement
> Reporter: Paris Carbone
> Assignee: Paris Carbone
>
> The current snapshotting algorithm cannot support cycles in the execution
> graph. An alternative scheme can potentially include records in-transit
> through the back-edges of a cyclic execution graph (ABS [1]) to achieve the
> same guarantees.
> One straightforward implementation of ABS for cyclic graphs can work as
> follows along the lines:
> 1) Upon triggering a barrier in an IterationHead from the TaskManager start
> block output and start upstream backup of all records forwarded from the
> respective IterationSink.
> 2) The IterationSink should eventually forward the current snapshotting epoch
> barrier to the IterationSource.
> 3) Upon receiving a barrier from the IterationSink, the IterationSource
> should finalize the snapshot, unblock its output and emit all records
> in-transit in FIFO order and continue the usual execution.
> --
> Upon restart the IterationSource should emit all records from the injected
> snapshot first and then continue its usual execution.
> Several optimisations and slight variations can be potentially achieved but
> this can be the initial implementation take.
> [1] http://arxiv.org/abs/1506.08603
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)