[
https://issues.apache.org/jira/browse/FLINK-6141?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15943094#comment-15943094
]
Ventura Del Monte commented on FLINK-6141:
------------------------------------------
I would like to tackle this issue, if it is fine with you. I see the complexity
behind this task (i hope) and i would proceed as follows: each element coming
from the target stream will be buffered through a per-namespace, per-keygroup
BufferSpiller-like object; when performing a checkpoint/savepoint, I would
include all the buffered items in the resulting snapshot. Everything will be
executed at AbstractStreamOperator level, although buffering would add
serialization overhead. This is just a thought, but perhaps, a more efficient
way could be to buffer at task level, i.e., directly handling items buffers
(prior their deserialization); yet for keyed streams we would need to extract
the key of every item. By the way, [~StephanEwen], do you have more input about
this issue?
> Add buffering service for stream operators
> ------------------------------------------
>
> Key: FLINK-6141
> URL: https://issues.apache.org/jira/browse/FLINK-6141
> Project: Flink
> Issue Type: Sub-task
> Components: DataStream API
> Reporter: Aljoscha Krettek
>
> As mentioned in
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-17+Side+Inputs+for+DataStream+API
> we need a way of buffering incoming elements until a side input that is
> required for processing them is ready.
> There has to be an implementation for non-keyed operators and for keyed
> operators because in keyed operators we need to ensure that we store the
> buffered elements in the correct key group when checkpointing.
> For the interface, I propose this:
> {code}
> @PublicEvolving
> public interface ElementBuffer<T, N> {
> /**
> * Adds the given element to the buffer for the given namespace.
> */
> void add(N namespace, T element);
> /**
> * Returns an {@code Iterable} over all buffered elements for the given
> namespace.
> */
> Iterable<T> values(N namespace);
> /**
> * Clears all buffered elements for the given namespace.
> */
> void clear(N namespace);
> }
> {code}
> {{AbstractStreamOperator}} would provide a method {{getElementBuffer()}} that
> would return the appropriate implementation for a non-keyed or keyed operator.
--
This message was sent by Atlassian JIRA
(v6.3.15#6346)