It's in the dev list archives, not sure if there's a doc yet.

I'm not quite sure I understand what you mean by a "flush" Can you describe
the problem you're trying to solve?

Reuven

On Wed, Nov 15, 2017 at 5:25 PM, Romain Manni-Bucau <rmannibu...@gmail.com>
wrote:

> Hmm, I didn't find the doc - if you have the link not far it would be
> appreciated - but "before" sounds not enough, it should be "after" in
> case there was a "flush" no?
>
> Romain Manni-Bucau
> @rmannibucau |  Blog | Old Blog | Github | LinkedIn
>
>
> 2017-11-15 10:10 GMT+01:00 Reuven Lax <re...@google.com.invalid>:
> > If you set @StableReplay before a ParDo, it forces a checkpoint before
> that
> > ParDo.
> >
> > On Wed, Nov 15, 2017 at 5:07 PM, Romain Manni-Bucau <
> rmannibu...@gmail.com>
> > wrote:
> >
> >> It sounds a good start. I'm not sure how a group by key (and not by
> >> size) can help controlling the checkpointing interval. Wonder if we
> >> shouldn't be able to have a CheckpointPolicy { boolean
> >> shouldCheckpoint() } used in the processing event loop. Default could
> >> be up to the runner but if set on the transform (or dofn) it would be
> >> used to control when the checkpoint is done. Thinking out loud it
> >> sounds close to jbatch checkpoint algorithm
> >> (https://docs.oracle.com/javaee/7/api/javax/batch/api/
> >> chunk/CheckpointAlgorithm.html)
> >>
> >> Romain Manni-Bucau
> >> @rmannibucau |  Blog | Old Blog | Github | LinkedIn
> >>
> >>
> >> 2017-11-15 9:55 GMT+01:00 Jean-Baptiste Onofré <j...@nanthrax.net>:
> >> > Yes, @StableReplay, that's the annotation. Thanks.
> >> >
> >> >
> >> > On 11/15/2017 09:52 AM, Reuven Lax wrote:
> >> >>
> >> >> Romain,
> >> >>
> >> >> I think the @StableReplay semantic that Kenn proposed a month or so
> ago
> >> is
> >> >> what is needed here.
> >> >>
> >> >> Essentially it will ensure that the GroupByKey iterable is stable and
> >> >> checkpointed. So on replay, the GroupByKey is guaranteed to receive
> the
> >> >> exact same iterable as it did before. The annotation can be put on a
> >> ParDo
> >> >> as well, in which case it ensures stability (and checkpointing) of
> the
> >> >> individual ParDo elements.
> >> >>
> >> >> Reuven
> >> >>
> >> >> On Wed, Nov 15, 2017 at 4:49 PM, Romain Manni-Bucau
> >> >> <rmannibu...@gmail.com>
> >> >> wrote:
> >> >>
> >> >>> 2017-11-15 9:46 GMT+01:00 Jean-Baptiste Onofré <j...@nanthrax.net>:
> >> >>>>
> >> >>>> Hi Romain,
> >> >>>>
> >> >>>> You are right: currently, the chunking is related to bundles.
> Today,
> >> the
> >> >>>> bundle size is under the runner responsibility.
> >> >>>>
> >> >>>> I think it's fine because only the runner know an efficient bundle
> >> size.
> >> >>>
> >> >>> I'm
> >> >>>>
> >> >>>> afraid giving the "control" of the bundle size to the end user (via
> >> >>>> pipeline) can result to huge performances issue depending of the
> >> runner.
> >> >>>>
> >> >>>> It doesn't mean that we can't use an uber layer: it's what we do in
> >> >>>> ParDoWithBatch or DoFn in IO Sink where we have a batch size.
> >> >>>>
> >> >>>> Anyway, the core problem is about the checkpoint: why a checkpoint
> is
> >> >>>> not
> >> >>>> "respected" by an IO or runner ?
> >> >>>
> >> >>>
> >> >>>
> >> >>> Take the example of a runner deciding the bundle size is 4 and the
> IO
> >> >>> deciding the commit-interval (batch semantic) is 2, what happens if
> >> >>> the 3rd record fails? You have pushed to the store 2 records which
> can
> >> >>> be reprocessed by a restart of the bundle and you can get
> duplicates.
> >> >>>
> >> >>> Rephrased: I think we need as a framework a batch/chunk solution
> which
> >> >>> is reliable. I understand bundles is mapped on the runner and not
> >> >>> really controlled but can we get something more reliable for the
> user?
> >> >>> Maybe we need a @BeforeBatch or something like that.
> >> >>>
> >> >>>>
> >> >>>> Regards
> >> >>>> JB
> >> >>>>
> >> >>>>
> >> >>>> On 11/15/2017 09:38 AM, Romain Manni-Bucau wrote:
> >> >>>>>
> >> >>>>>
> >> >>>>> Hi guys,
> >> >>>>>
> >> >>>>> The subject is a bit provocative but the topic is real and coming
> >> >>>>> again and again with the beam usage: how a dofn can handle some
> >> >>>>> "chunking".
> >> >>>>>
> >> >>>>> The need is to be able to commit each N records but with N not too
> >> big.
> >> >>>>>
> >> >>>>> The natural API for that in beam is the bundle one but bundles are
> >> not
> >> >>>>> reliable since they can be very small (flink) - we can say it is
> "ok"
> >> >>>>> even if it has some perf impacts - or too big (spark does full
> size /
> >> >>>>> #workers).
> >> >>>>>
> >> >>>>> The workaround is what we see in the ES I/O: a maxSize which does
> an
> >> >>>>> eager flush. The issue is that then the checkpoint is not
> respected
> >> >>>>> and you can process multiple times the same records.
> >> >>>>>
> >> >>>>> Any plan to make this API reliable and controllable from a beam
> point
> >> >>>>> of view (at least in a max manner)?
> >> >>>>>
> >> >>>>> Thanks,
> >> >>>>> Romain Manni-Bucau
> >> >>>>> @rmannibucau |  Blog | Old Blog | Github | LinkedIn
> >> >>>>>
> >> >>>>
> >> >>>> --
> >> >>>> Jean-Baptiste Onofré
> >> >>>> jbono...@apache.org
> >> >>>> http://blog.nanthrax.net
> >> >>>> Talend - http://www.talend.com
> >> >>>
> >> >>>
> >> >>
> >> >
> >> > --
> >> > Jean-Baptiste Onofré
> >> > jbono...@apache.org
> >> > http://blog.nanthrax.net
> >> > Talend - http://www.talend.com
> >>
>

Reply via email to