It's in the dev list archives, not sure if there's a doc yet. I'm not quite sure I understand what you mean by a "flush" Can you describe the problem you're trying to solve?
Reuven On Wed, Nov 15, 2017 at 5:25 PM, Romain Manni-Bucau <rmannibu...@gmail.com> wrote: > Hmm, I didn't find the doc - if you have the link not far it would be > appreciated - but "before" sounds not enough, it should be "after" in > case there was a "flush" no? > > Romain Manni-Bucau > @rmannibucau | Blog | Old Blog | Github | LinkedIn > > > 2017-11-15 10:10 GMT+01:00 Reuven Lax <re...@google.com.invalid>: > > If you set @StableReplay before a ParDo, it forces a checkpoint before > that > > ParDo. > > > > On Wed, Nov 15, 2017 at 5:07 PM, Romain Manni-Bucau < > rmannibu...@gmail.com> > > wrote: > > > >> It sounds a good start. I'm not sure how a group by key (and not by > >> size) can help controlling the checkpointing interval. Wonder if we > >> shouldn't be able to have a CheckpointPolicy { boolean > >> shouldCheckpoint() } used in the processing event loop. Default could > >> be up to the runner but if set on the transform (or dofn) it would be > >> used to control when the checkpoint is done. Thinking out loud it > >> sounds close to jbatch checkpoint algorithm > >> (https://docs.oracle.com/javaee/7/api/javax/batch/api/ > >> chunk/CheckpointAlgorithm.html) > >> > >> Romain Manni-Bucau > >> @rmannibucau | Blog | Old Blog | Github | LinkedIn > >> > >> > >> 2017-11-15 9:55 GMT+01:00 Jean-Baptiste Onofré <j...@nanthrax.net>: > >> > Yes, @StableReplay, that's the annotation. Thanks. > >> > > >> > > >> > On 11/15/2017 09:52 AM, Reuven Lax wrote: > >> >> > >> >> Romain, > >> >> > >> >> I think the @StableReplay semantic that Kenn proposed a month or so > ago > >> is > >> >> what is needed here. > >> >> > >> >> Essentially it will ensure that the GroupByKey iterable is stable and > >> >> checkpointed. So on replay, the GroupByKey is guaranteed to receive > the > >> >> exact same iterable as it did before. The annotation can be put on a > >> ParDo > >> >> as well, in which case it ensures stability (and checkpointing) of > the > >> >> individual ParDo elements. > >> >> > >> >> Reuven > >> >> > >> >> On Wed, Nov 15, 2017 at 4:49 PM, Romain Manni-Bucau > >> >> <rmannibu...@gmail.com> > >> >> wrote: > >> >> > >> >>> 2017-11-15 9:46 GMT+01:00 Jean-Baptiste Onofré <j...@nanthrax.net>: > >> >>>> > >> >>>> Hi Romain, > >> >>>> > >> >>>> You are right: currently, the chunking is related to bundles. > Today, > >> the > >> >>>> bundle size is under the runner responsibility. > >> >>>> > >> >>>> I think it's fine because only the runner know an efficient bundle > >> size. > >> >>> > >> >>> I'm > >> >>>> > >> >>>> afraid giving the "control" of the bundle size to the end user (via > >> >>>> pipeline) can result to huge performances issue depending of the > >> runner. > >> >>>> > >> >>>> It doesn't mean that we can't use an uber layer: it's what we do in > >> >>>> ParDoWithBatch or DoFn in IO Sink where we have a batch size. > >> >>>> > >> >>>> Anyway, the core problem is about the checkpoint: why a checkpoint > is > >> >>>> not > >> >>>> "respected" by an IO or runner ? > >> >>> > >> >>> > >> >>> > >> >>> Take the example of a runner deciding the bundle size is 4 and the > IO > >> >>> deciding the commit-interval (batch semantic) is 2, what happens if > >> >>> the 3rd record fails? You have pushed to the store 2 records which > can > >> >>> be reprocessed by a restart of the bundle and you can get > duplicates. > >> >>> > >> >>> Rephrased: I think we need as a framework a batch/chunk solution > which > >> >>> is reliable. I understand bundles is mapped on the runner and not > >> >>> really controlled but can we get something more reliable for the > user? > >> >>> Maybe we need a @BeforeBatch or something like that. > >> >>> > >> >>>> > >> >>>> Regards > >> >>>> JB > >> >>>> > >> >>>> > >> >>>> On 11/15/2017 09:38 AM, Romain Manni-Bucau wrote: > >> >>>>> > >> >>>>> > >> >>>>> Hi guys, > >> >>>>> > >> >>>>> The subject is a bit provocative but the topic is real and coming > >> >>>>> again and again with the beam usage: how a dofn can handle some > >> >>>>> "chunking". > >> >>>>> > >> >>>>> The need is to be able to commit each N records but with N not too > >> big. > >> >>>>> > >> >>>>> The natural API for that in beam is the bundle one but bundles are > >> not > >> >>>>> reliable since they can be very small (flink) - we can say it is > "ok" > >> >>>>> even if it has some perf impacts - or too big (spark does full > size / > >> >>>>> #workers). > >> >>>>> > >> >>>>> The workaround is what we see in the ES I/O: a maxSize which does > an > >> >>>>> eager flush. The issue is that then the checkpoint is not > respected > >> >>>>> and you can process multiple times the same records. > >> >>>>> > >> >>>>> Any plan to make this API reliable and controllable from a beam > point > >> >>>>> of view (at least in a max manner)? > >> >>>>> > >> >>>>> Thanks, > >> >>>>> Romain Manni-Bucau > >> >>>>> @rmannibucau | Blog | Old Blog | Github | LinkedIn > >> >>>>> > >> >>>> > >> >>>> -- > >> >>>> Jean-Baptiste Onofré > >> >>>> jbono...@apache.org > >> >>>> http://blog.nanthrax.net > >> >>>> Talend - http://www.talend.com > >> >>> > >> >>> > >> >> > >> > > >> > -- > >> > Jean-Baptiste Onofré > >> > jbono...@apache.org > >> > http://blog.nanthrax.net > >> > Talend - http://www.talend.com > >> >