2017-11-15 15:23 GMT+01:00 Jean-Baptiste Onofré <j...@nanthrax.net>: > Hi Romain, > > 1. you always have the GlobalWindow at least. It's more related to trigger.
Yep but having an implicit window and being able to rely on window features are 2 things, think some runners don't support it yet so can be something not reliable enough - this was my main concern. If not true then it is ok. > 2. How would you define this ? With annotation (on what in that case) or > using checkpoint method ? Both work, annotation on a new type of method in the dofn or an external checkpoint algorithm. Tend to think the first is easier. > 3. Agree to have a core PTransform for that. > > Regards > JB > > > On 11/15/2017 02:16 PM, Romain Manni-Bucau wrote: >> >> @Reuven: it looks like a good workaround >> @Ken: thks a lot for the link! >> >> @all: >> >> 1. do you think it is doable without windowing usage (to have >> something more reliable in term of runner since it will depend on less >> primitives? >> 2. what about allowing the user to define when to checkpoint? >> 3. can we get this kind of "composite" pattern in the beam core? >> >> >> >> Romain Manni-Bucau >> @rmannibucau | Blog | Old Blog | Github | LinkedIn >> >> >> 2017-11-15 14:12 GMT+01:00 Kenneth Knowles <k...@google.com.invalid>: >>> >>> In case the connection is not clear to folks on this thread, I pinged the >>> thread on @StableReplay / @RequiresStableInput / etc and opened a draft >>> PR >>> at https://github.com/apache/beam/pull/4135. >>> >>> On Wed, Nov 15, 2017 at 3:24 AM, Reuven Lax <re...@google.com.invalid> >>> wrote: >>> >>>> so I think the following will do exactly that and can be easily factored >>>> into a reusable transform (modulo Java type boilerplate): >>>> >>>> pCollection.apply(WithKeys.of((Element e) -> >>>> ThreadLocalRandom.current().nextInt(N)) >>>> .apply(Window.into(new GlobalWindows()) >>>> >>>> .triggering(AfterWatermark.pastEndOfWindow().withEarlyFirings(AfterPane. >>>> elementCountAtLeast(100)))) >>>> .apply(GroupByKey.create()) >>>> .apply(ParDo.of(new DoFn<>() { >>>> @ProcessElement >>>> @StableReplay >>>> public void processElement(ProcessContext c) { >>>> // Insert c.element().getValue() into backend. >>>> } >>>> }); >>>> >>>> On Wed, Nov 15, 2017 at 7:00 PM, Romain Manni-Bucau >>>> <rmannibu...@gmail.com >>>>> >>>>> >>>> wrote: >>>> >>>>> 2017-11-15 11:42 GMT+01:00 Reuven Lax <re...@google.com.invalid>: >>>>>> >>>>>> Can we describe this at a higher level? >>>>>> >>>>>> I think what you want is the following. Please correct if I'm >>>>>> misunderstanding. >>>>>> >>>>>> Batches of 100 elements (is this a hard requirement, or do they have >>>>>> to >>>>> >>>>> be >>>>>> >>>>>> "approximately" 100 element?) >>>>> >>>>> >>>>> Approximately is fine while documented (what is not is 1000000 instead >>>>> of 10 for instance) >>>>> >>>>>> >>>>>> Once you see a batch, you're guaranteed to see the same batch on >>>> >>>> retries. >>>>> >>>>> >>>>> +1 >>>>> >>>>>> >>>>>> You want to then idempotently insert this batch into some backend. >>>> >>>> Things >>>>>> >>>>>> may fail, workers may crash, but in that case you want to get the >>>>>> exact >>>>>> same batch back so you can insert it again. >>>>> >>>>> >>>>> +1 >>>>> >>>>>> >>>>>> Do you care about ordering? On failure do you have to see the same >>>>> >>>>> batches >>>>>> >>>>>> in the same order as before, or is it sufficient to see the same >>>> >>>> batches? >>>>> >>>>> >>>>> Beam doesnt everywhere so I guess it is not important - at least for >>>>> my cases this statement is true. >>>>> >>>>>> >>>>>> Reuven >>>>>> >>>>>> On Wed, Nov 15, 2017 at 5:58 PM, Romain Manni-Bucau < >>>>> >>>>> rmannibu...@gmail.com> >>>>>> >>>>>> wrote: >>>>>> >>>>>>> Overall goal is to ensure each 100 elements max, a "backend" (as >>>>>>> datastore) flush/commit/push is done and is aligned with beam >>>>>>> checkpoints. You can see it as bringing the "general" commit-interval >>>>>>> notion to beam and kind of get rid of the bundle notion which is >>>>>>> almost impossible to use today. >>>>>>> >>>>>>> Romain Manni-Bucau >>>>>>> @rmannibucau | Blog | Old Blog | Github | LinkedIn >>>>>>> >>>>>>> >>>>>>> 2017-11-15 10:27 GMT+01:00 Reuven Lax <re...@google.com.invalid>: >>>>>>>> >>>>>>>> It's in the dev list archives, not sure if there's a doc yet. >>>>>>>> >>>>>>>> I'm not quite sure I understand what you mean by a "flush" Can you >>>>>>> >>>>>>> describe >>>>>>>> >>>>>>>> the problem you're trying to solve? >>>>>>>> >>>>>>>> Reuven >>>>>>>> >>>>>>>> On Wed, Nov 15, 2017 at 5:25 PM, Romain Manni-Bucau < >>>>>>> >>>>>>> rmannibu...@gmail.com> >>>>>>>> >>>>>>>> wrote: >>>>>>>> >>>>>>>>> Hmm, I didn't find the doc - if you have the link not far it would >>>> >>>> be >>>>>>>>> >>>>>>>>> appreciated - but "before" sounds not enough, it should be "after" >>>> >>>> in >>>>>>>>> >>>>>>>>> case there was a "flush" no? >>>>>>>>> >>>>>>>>> Romain Manni-Bucau >>>>>>>>> @rmannibucau | Blog | Old Blog | Github | LinkedIn >>>>>>>>> >>>>>>>>> >>>>>>>>> 2017-11-15 10:10 GMT+01:00 Reuven Lax <re...@google.com.invalid>: >>>>>>>>>> >>>>>>>>>> If you set @StableReplay before a ParDo, it forces a checkpoint >>>>> >>>>> before >>>>>>>>> >>>>>>>>> that >>>>>>>>>> >>>>>>>>>> ParDo. >>>>>>>>>> >>>>>>>>>> On Wed, Nov 15, 2017 at 5:07 PM, Romain Manni-Bucau < >>>>>>>>> >>>>>>>>> rmannibu...@gmail.com> >>>>>>>>>> >>>>>>>>>> wrote: >>>>>>>>>> >>>>>>>>>>> It sounds a good start. I'm not sure how a group by key (and not >>>>> >>>>> by >>>>>>>>>>> >>>>>>>>>>> size) can help controlling the checkpointing interval. Wonder if >>>>> >>>>> we >>>>>>>>>>> >>>>>>>>>>> shouldn't be able to have a CheckpointPolicy { boolean >>>>>>>>>>> shouldCheckpoint() } used in the processing event loop. Default >>>>> >>>>> could >>>>>>>>>>> >>>>>>>>>>> be up to the runner but if set on the transform (or dofn) it >>>>> >>>>> would be >>>>>>>>>>> >>>>>>>>>>> used to control when the checkpoint is done. Thinking out loud >>>> >>>> it >>>>>>>>>>> >>>>>>>>>>> sounds close to jbatch checkpoint algorithm >>>>>>>>>>> (https://docs.oracle.com/javaee/7/api/javax/batch/api/ >>>>>>>>>>> chunk/CheckpointAlgorithm.html) >>>>>>>>>>> >>>>>>>>>>> Romain Manni-Bucau >>>>>>>>>>> @rmannibucau | Blog | Old Blog | Github | LinkedIn >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> 2017-11-15 9:55 GMT+01:00 Jean-Baptiste Onofré <j...@nanthrax.net >>>>> >>>>> : >>>>>>>>>>>> >>>>>>>>>>>> Yes, @StableReplay, that's the annotation. Thanks. >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> On 11/15/2017 09:52 AM, Reuven Lax wrote: >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> Romain, >>>>>>>>>>>>> >>>>>>>>>>>>> I think the @StableReplay semantic that Kenn proposed a month >>>>> >>>>> or >>>>>>> >>>>>>> so >>>>>>>>> >>>>>>>>> ago >>>>>>>>>>> >>>>>>>>>>> is >>>>>>>>>>>>> >>>>>>>>>>>>> what is needed here. >>>>>>>>>>>>> >>>>>>>>>>>>> Essentially it will ensure that the GroupByKey iterable is >>>>> >>>>> stable >>>>>>> >>>>>>> and >>>>>>>>>>>>> >>>>>>>>>>>>> checkpointed. So on replay, the GroupByKey is guaranteed to >>>>>>> >>>>>>> receive >>>>>>>>> >>>>>>>>> the >>>>>>>>>>>>> >>>>>>>>>>>>> exact same iterable as it did before. The annotation can be >>>> >>>> put >>>>>>> >>>>>>> on a >>>>>>>>>>> >>>>>>>>>>> ParDo >>>>>>>>>>>>> >>>>>>>>>>>>> as well, in which case it ensures stability (and >>>>> >>>>> checkpointing) of >>>>>>>>> >>>>>>>>> the >>>>>>>>>>>>> >>>>>>>>>>>>> individual ParDo elements. >>>>>>>>>>>>> >>>>>>>>>>>>> Reuven >>>>>>>>>>>>> >>>>>>>>>>>>> On Wed, Nov 15, 2017 at 4:49 PM, Romain Manni-Bucau >>>>>>>>>>>>> <rmannibu...@gmail.com> >>>>>>>>>>>>> wrote: >>>>>>>>>>>>> >>>>>>>>>>>>>> 2017-11-15 9:46 GMT+01:00 Jean-Baptiste Onofré < >>>>> >>>>> j...@nanthrax.net >>>>>>>> >>>>>>>> : >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> Hi Romain, >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> You are right: currently, the chunking is related to >>>> >>>> bundles. >>>>>>>>> >>>>>>>>> Today, >>>>>>>>>>> >>>>>>>>>>> the >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> bundle size is under the runner responsibility. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> I think it's fine because only the runner know an efficient >>>>>>> >>>>>>> bundle >>>>>>>>>>> >>>>>>>>>>> size. >>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>> I'm >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> afraid giving the "control" of the bundle size to the end >>>>> >>>>> user >>>>>>> >>>>>>> (via >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> pipeline) can result to huge performances issue depending >>>> >>>> of >>>>> >>>>> the >>>>>>>>>>> >>>>>>>>>>> runner. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> It doesn't mean that we can't use an uber layer: it's what >>>> >>>> we >>>>>>> >>>>>>> do in >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> ParDoWithBatch or DoFn in IO Sink where we have a batch >>>> >>>> size. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> Anyway, the core problem is about the checkpoint: why a >>>>>>> >>>>>>> checkpoint >>>>>>>>> >>>>>>>>> is >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> not >>>>>>>>>>>>>>> "respected" by an IO or runner ? >>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>> Take the example of a runner deciding the bundle size is 4 >>>> >>>> and >>>>>>> >>>>>>> the >>>>>>>>> >>>>>>>>> IO >>>>>>>>>>>>>> >>>>>>>>>>>>>> deciding the commit-interval (batch semantic) is 2, what >>>>> >>>>> happens >>>>>>> >>>>>>> if >>>>>>>>>>>>>> >>>>>>>>>>>>>> the 3rd record fails? You have pushed to the store 2 records >>>>>>> >>>>>>> which >>>>>>>>> >>>>>>>>> can >>>>>>>>>>>>>> >>>>>>>>>>>>>> be reprocessed by a restart of the bundle and you can get >>>>>>>>> >>>>>>>>> duplicates. >>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>> Rephrased: I think we need as a framework a batch/chunk >>>>> >>>>> solution >>>>>>>>> >>>>>>>>> which >>>>>>>>>>>>>> >>>>>>>>>>>>>> is reliable. I understand bundles is mapped on the runner >>>> >>>> and >>>>> >>>>> not >>>>>>>>>>>>>> >>>>>>>>>>>>>> really controlled but can we get something more reliable for >>>>> >>>>> the >>>>>>>>> >>>>>>>>> user? >>>>>>>>>>>>>> >>>>>>>>>>>>>> Maybe we need a @BeforeBatch or something like that. >>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> Regards >>>>>>>>>>>>>>> JB >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> On 11/15/2017 09:38 AM, Romain Manni-Bucau wrote: >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> Hi guys, >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> The subject is a bit provocative but the topic is real and >>>>>>> >>>>>>> coming >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> again and again with the beam usage: how a dofn can handle >>>>> >>>>> some >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> "chunking". >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> The need is to be able to commit each N records but with N >>>>> >>>>> not >>>>>>> >>>>>>> too >>>>>>>>>>> >>>>>>>>>>> big. >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> The natural API for that in beam is the bundle one but >>>>> >>>>> bundles >>>>>>> >>>>>>> are >>>>>>>>>>> >>>>>>>>>>> not >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> reliable since they can be very small (flink) - we can say >>>>> >>>>> it >>>>>>> >>>>>>> is >>>>>>>>> >>>>>>>>> "ok" >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> even if it has some perf impacts - or too big (spark does >>>>> >>>>> full >>>>>>>>> >>>>>>>>> size / >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> #workers). >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> The workaround is what we see in the ES I/O: a maxSize >>>> >>>> which >>>>>>> >>>>>>> does >>>>>>>>> >>>>>>>>> an >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> eager flush. The issue is that then the checkpoint is not >>>>>>>>> >>>>>>>>> respected >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> and you can process multiple times the same records. >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> Any plan to make this API reliable and controllable from a >>>>> >>>>> beam >>>>>>>>> >>>>>>>>> point >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> of view (at least in a max manner)? >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> Thanks, >>>>>>>>>>>>>>>> Romain Manni-Bucau >>>>>>>>>>>>>>>> @rmannibucau | Blog | Old Blog | Github | LinkedIn >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> -- >>>>>>>>>>>>>>> Jean-Baptiste Onofré >>>>>>>>>>>>>>> jbono...@apache.org >>>>>>>>>>>>>>> http://blog.nanthrax.net >>>>>>>>>>>>>>> Talend - http://www.talend.com >>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> -- >>>>>>>>>>>> Jean-Baptiste Onofré >>>>>>>>>>>> jbono...@apache.org >>>>>>>>>>>> http://blog.nanthrax.net >>>>>>>>>>>> Talend - http://www.talend.com >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>> >>>>>>> >>>>> >>>> > > -- > Jean-Baptiste Onofré > jbono...@apache.org > http://blog.nanthrax.net > Talend - http://www.talend.com