can be, here the options I have in mind: I. checkpoint marker:
@AnyBeamAnnotation @CheckpointAfter public void someHook(SomeContext ctx); II. pipeline.apply(ParDo.of(new MyFn()).withCheckpointAlgorithm(new CountingAlgo())) III. (I like this one less) // in the dofn @CheckpointTester public boolean shouldCheckpoint(); Romain Manni-Bucau @rmannibucau | Blog | Old Blog | Github | LinkedIn 2017-11-15 11:04 GMT+01:00 Jean-Baptiste Onofré <j...@nanthrax.net>: > And the control is given to the DoFn developer via annotations, right ? > > So, bundle would be "hidden" and be internal to the runner (which makes > sense I think) and we introduce "control" points for the DoFn developer that > the runner will deal with. > > Correct ? > > Regards > JB > > > On 11/15/2017 10:58 AM, Romain Manni-Bucau wrote: >> >> Overall goal is to ensure each 100 elements max, a "backend" (as >> datastore) flush/commit/push is done and is aligned with beam >> checkpoints. You can see it as bringing the "general" commit-interval >> notion to beam and kind of get rid of the bundle notion which is >> almost impossible to use today. >> >> Romain Manni-Bucau >> @rmannibucau | Blog | Old Blog | Github | LinkedIn >> >> >> 2017-11-15 10:27 GMT+01:00 Reuven Lax <re...@google.com.invalid>: >>> >>> It's in the dev list archives, not sure if there's a doc yet. >>> >>> I'm not quite sure I understand what you mean by a "flush" Can you >>> describe >>> the problem you're trying to solve? >>> >>> Reuven >>> >>> On Wed, Nov 15, 2017 at 5:25 PM, Romain Manni-Bucau >>> <rmannibu...@gmail.com> >>> wrote: >>> >>>> Hmm, I didn't find the doc - if you have the link not far it would be >>>> appreciated - but "before" sounds not enough, it should be "after" in >>>> case there was a "flush" no? >>>> >>>> Romain Manni-Bucau >>>> @rmannibucau | Blog | Old Blog | Github | LinkedIn >>>> >>>> >>>> 2017-11-15 10:10 GMT+01:00 Reuven Lax <re...@google.com.invalid>: >>>>> >>>>> If you set @StableReplay before a ParDo, it forces a checkpoint before >>>> >>>> that >>>>> >>>>> ParDo. >>>>> >>>>> On Wed, Nov 15, 2017 at 5:07 PM, Romain Manni-Bucau < >>>> >>>> rmannibu...@gmail.com> >>>>> >>>>> wrote: >>>>> >>>>>> It sounds a good start. I'm not sure how a group by key (and not by >>>>>> size) can help controlling the checkpointing interval. Wonder if we >>>>>> shouldn't be able to have a CheckpointPolicy { boolean >>>>>> shouldCheckpoint() } used in the processing event loop. Default could >>>>>> be up to the runner but if set on the transform (or dofn) it would be >>>>>> used to control when the checkpoint is done. Thinking out loud it >>>>>> sounds close to jbatch checkpoint algorithm >>>>>> (https://docs.oracle.com/javaee/7/api/javax/batch/api/ >>>>>> chunk/CheckpointAlgorithm.html) >>>>>> >>>>>> Romain Manni-Bucau >>>>>> @rmannibucau | Blog | Old Blog | Github | LinkedIn >>>>>> >>>>>> >>>>>> 2017-11-15 9:55 GMT+01:00 Jean-Baptiste Onofré <j...@nanthrax.net>: >>>>>>> >>>>>>> Yes, @StableReplay, that's the annotation. Thanks. >>>>>>> >>>>>>> >>>>>>> On 11/15/2017 09:52 AM, Reuven Lax wrote: >>>>>>>> >>>>>>>> >>>>>>>> Romain, >>>>>>>> >>>>>>>> I think the @StableReplay semantic that Kenn proposed a month or so >>>> >>>> ago >>>>>> >>>>>> is >>>>>>>> >>>>>>>> what is needed here. >>>>>>>> >>>>>>>> Essentially it will ensure that the GroupByKey iterable is stable >>>>>>>> and >>>>>>>> checkpointed. So on replay, the GroupByKey is guaranteed to receive >>>> >>>> the >>>>>>>> >>>>>>>> exact same iterable as it did before. The annotation can be put on a >>>>>> >>>>>> ParDo >>>>>>>> >>>>>>>> as well, in which case it ensures stability (and checkpointing) of >>>> >>>> the >>>>>>>> >>>>>>>> individual ParDo elements. >>>>>>>> >>>>>>>> Reuven >>>>>>>> >>>>>>>> On Wed, Nov 15, 2017 at 4:49 PM, Romain Manni-Bucau >>>>>>>> <rmannibu...@gmail.com> >>>>>>>> wrote: >>>>>>>> >>>>>>>>> 2017-11-15 9:46 GMT+01:00 Jean-Baptiste Onofré <j...@nanthrax.net>: >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> Hi Romain, >>>>>>>>>> >>>>>>>>>> You are right: currently, the chunking is related to bundles. >>>> >>>> Today, >>>>>> >>>>>> the >>>>>>>>>> >>>>>>>>>> bundle size is under the runner responsibility. >>>>>>>>>> >>>>>>>>>> I think it's fine because only the runner know an efficient bundle >>>>>> >>>>>> size. >>>>>>>>> >>>>>>>>> >>>>>>>>> I'm >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> afraid giving the "control" of the bundle size to the end user >>>>>>>>>> (via >>>>>>>>>> pipeline) can result to huge performances issue depending of the >>>>>> >>>>>> runner. >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> It doesn't mean that we can't use an uber layer: it's what we do >>>>>>>>>> in >>>>>>>>>> ParDoWithBatch or DoFn in IO Sink where we have a batch size. >>>>>>>>>> >>>>>>>>>> Anyway, the core problem is about the checkpoint: why a checkpoint >>>> >>>> is >>>>>>>>>> >>>>>>>>>> not >>>>>>>>>> "respected" by an IO or runner ? >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> Take the example of a runner deciding the bundle size is 4 and the >>>> >>>> IO >>>>>>>>> >>>>>>>>> deciding the commit-interval (batch semantic) is 2, what happens if >>>>>>>>> the 3rd record fails? You have pushed to the store 2 records which >>>> >>>> can >>>>>>>>> >>>>>>>>> be reprocessed by a restart of the bundle and you can get >>>> >>>> duplicates. >>>>>>>>> >>>>>>>>> >>>>>>>>> Rephrased: I think we need as a framework a batch/chunk solution >>>> >>>> which >>>>>>>>> >>>>>>>>> is reliable. I understand bundles is mapped on the runner and not >>>>>>>>> really controlled but can we get something more reliable for the >>>> >>>> user? >>>>>>>>> >>>>>>>>> Maybe we need a @BeforeBatch or something like that. >>>>>>>>> >>>>>>>>>> >>>>>>>>>> Regards >>>>>>>>>> JB >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> On 11/15/2017 09:38 AM, Romain Manni-Bucau wrote: >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> Hi guys, >>>>>>>>>>> >>>>>>>>>>> The subject is a bit provocative but the topic is real and coming >>>>>>>>>>> again and again with the beam usage: how a dofn can handle some >>>>>>>>>>> "chunking". >>>>>>>>>>> >>>>>>>>>>> The need is to be able to commit each N records but with N not >>>>>>>>>>> too >>>>>> >>>>>> big. >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> The natural API for that in beam is the bundle one but bundles >>>>>>>>>>> are >>>>>> >>>>>> not >>>>>>>>>>> >>>>>>>>>>> reliable since they can be very small (flink) - we can say it is >>>> >>>> "ok" >>>>>>>>>>> >>>>>>>>>>> even if it has some perf impacts - or too big (spark does full >>>> >>>> size / >>>>>>>>>>> >>>>>>>>>>> #workers). >>>>>>>>>>> >>>>>>>>>>> The workaround is what we see in the ES I/O: a maxSize which does >>>> >>>> an >>>>>>>>>>> >>>>>>>>>>> eager flush. The issue is that then the checkpoint is not >>>> >>>> respected >>>>>>>>>>> >>>>>>>>>>> and you can process multiple times the same records. >>>>>>>>>>> >>>>>>>>>>> Any plan to make this API reliable and controllable from a beam >>>> >>>> point >>>>>>>>>>> >>>>>>>>>>> of view (at least in a max manner)? >>>>>>>>>>> >>>>>>>>>>> Thanks, >>>>>>>>>>> Romain Manni-Bucau >>>>>>>>>>> @rmannibucau | Blog | Old Blog | Github | LinkedIn >>>>>>>>>>> >>>>>>>>>> >>>>>>>>>> -- >>>>>>>>>> Jean-Baptiste Onofré >>>>>>>>>> jbono...@apache.org >>>>>>>>>> http://blog.nanthrax.net >>>>>>>>>> Talend - http://www.talend.com >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>> >>>>>>> >>>>>>> -- >>>>>>> Jean-Baptiste Onofré >>>>>>> jbono...@apache.org >>>>>>> http://blog.nanthrax.net >>>>>>> Talend - http://www.talend.com >>>>>> >>>>>> >>>> > > -- > Jean-Baptiste Onofré > jbono...@apache.org > http://blog.nanthrax.net > Talend - http://www.talend.com