2017-11-16 12:18 GMT+01:00 Reuven Lax <re...@google.com.invalid>: > On Wed, Nov 15, 2017 at 9:16 PM, Romain Manni-Bucau <rmannibu...@gmail.com> > wrote: > >> @Reuven: it looks like a good workaround >> @Ken: thks a lot for the link! >> >> @all: >> >> 1. do you think it is doable without windowing usage (to have >> something more reliable in term of runner since it will depend on less >> primitives? >> > > This depends on trigggering, not on windowing. Triggering is a pretty core > component of the model - no unbounded inputs can be processed at all > without trigggering. "Checkpointing" is a harder thing to pin down, as it > means different things to different runners (e.g. "checkpointing" in Flink > means something very different than in Datafalow and different than in > Spark).
My bad, sorry about that semantic abuse. The point is however still here. Several IO (all) are trigger independent so I wonder if we want to couple them for a "core feature" (chunking is needed in almost all IO). > > > >> 2. what about allowing the user to define when to checkpoint? >> > > As I mentioned, "checkpoint" is sometimes an ill-defined operation, > especially across different runners . Instead I think it's better to have > an annotation that defines the semantics you want (e.g. stable replay), and > let the runner decide how to implement it (possibly by checkpointing). Issue with that solution is it doesn't scale and requires an implementation per semantic instead of relying on a single primitive for all cases. Because of the runner design and portability goal of beam we should probably avoid that, no? > > 3. can we get this kind of "composite" pattern in the beam core? >> > > I don't see why not. Though we first need to get @StableReplay implemented. > > >> >> >> >> Romain Manni-Bucau >> @rmannibucau | Blog | Old Blog | Github | LinkedIn >> >> >> 2017-11-15 14:12 GMT+01:00 Kenneth Knowles <k...@google.com.invalid>: >> > In case the connection is not clear to folks on this thread, I pinged the >> > thread on @StableReplay / @RequiresStableInput / etc and opened a draft >> PR >> > at https://github.com/apache/beam/pull/4135. >> > >> > On Wed, Nov 15, 2017 at 3:24 AM, Reuven Lax <re...@google.com.invalid> >> > wrote: >> > >> >> so I think the following will do exactly that and can be easily factored >> >> into a reusable transform (modulo Java type boilerplate): >> >> >> >> pCollection.apply(WithKeys.of((Element e) -> >> >> ThreadLocalRandom.current().nextInt(N)) >> >> .apply(Window.into(new GlobalWindows()) >> >> >> >> .triggering(AfterWatermark.pastEndOfWindow(). >> withEarlyFirings(AfterPane. >> >> elementCountAtLeast(100)))) >> >> .apply(GroupByKey.create()) >> >> .apply(ParDo.of(new DoFn<>() { >> >> @ProcessElement >> >> @StableReplay >> >> public void processElement(ProcessContext c) { >> >> // Insert c.element().getValue() into backend. >> >> } >> >> }); >> >> >> >> On Wed, Nov 15, 2017 at 7:00 PM, Romain Manni-Bucau < >> rmannibu...@gmail.com >> >> > >> >> wrote: >> >> >> >> > 2017-11-15 11:42 GMT+01:00 Reuven Lax <re...@google.com.invalid>: >> >> > > Can we describe this at a higher level? >> >> > > >> >> > > I think what you want is the following. Please correct if I'm >> >> > > misunderstanding. >> >> > > >> >> > > Batches of 100 elements (is this a hard requirement, or do they >> have to >> >> > be >> >> > > "approximately" 100 element?) >> >> > >> >> > Approximately is fine while documented (what is not is 1000000 instead >> >> > of 10 for instance) >> >> > >> >> > > >> >> > > Once you see a batch, you're guaranteed to see the same batch on >> >> retries. >> >> > >> >> > +1 >> >> > >> >> > > >> >> > > You want to then idempotently insert this batch into some backend. >> >> Things >> >> > > may fail, workers may crash, but in that case you want to get the >> exact >> >> > > same batch back so you can insert it again. >> >> > >> >> > +1 >> >> > >> >> > > >> >> > > Do you care about ordering? On failure do you have to see the same >> >> > batches >> >> > > in the same order as before, or is it sufficient to see the same >> >> batches? >> >> > >> >> > Beam doesnt everywhere so I guess it is not important - at least for >> >> > my cases this statement is true. >> >> > >> >> > > >> >> > > Reuven >> >> > > >> >> > > On Wed, Nov 15, 2017 at 5:58 PM, Romain Manni-Bucau < >> >> > rmannibu...@gmail.com> >> >> > > wrote: >> >> > > >> >> > >> Overall goal is to ensure each 100 elements max, a "backend" (as >> >> > >> datastore) flush/commit/push is done and is aligned with beam >> >> > >> checkpoints. You can see it as bringing the "general" >> commit-interval >> >> > >> notion to beam and kind of get rid of the bundle notion which is >> >> > >> almost impossible to use today. >> >> > >> >> >> > >> Romain Manni-Bucau >> >> > >> @rmannibucau | Blog | Old Blog | Github | LinkedIn >> >> > >> >> >> > >> >> >> > >> 2017-11-15 10:27 GMT+01:00 Reuven Lax <re...@google.com.invalid>: >> >> > >> > It's in the dev list archives, not sure if there's a doc yet. >> >> > >> > >> >> > >> > I'm not quite sure I understand what you mean by a "flush" Can >> you >> >> > >> describe >> >> > >> > the problem you're trying to solve? >> >> > >> > >> >> > >> > Reuven >> >> > >> > >> >> > >> > On Wed, Nov 15, 2017 at 5:25 PM, Romain Manni-Bucau < >> >> > >> rmannibu...@gmail.com> >> >> > >> > wrote: >> >> > >> > >> >> > >> >> Hmm, I didn't find the doc - if you have the link not far it >> would >> >> be >> >> > >> >> appreciated - but "before" sounds not enough, it should be >> "after" >> >> in >> >> > >> >> case there was a "flush" no? >> >> > >> >> >> >> > >> >> Romain Manni-Bucau >> >> > >> >> @rmannibucau | Blog | Old Blog | Github | LinkedIn >> >> > >> >> >> >> > >> >> >> >> > >> >> 2017-11-15 10:10 GMT+01:00 Reuven Lax <re...@google.com.invalid >> >: >> >> > >> >> > If you set @StableReplay before a ParDo, it forces a >> checkpoint >> >> > before >> >> > >> >> that >> >> > >> >> > ParDo. >> >> > >> >> > >> >> > >> >> > On Wed, Nov 15, 2017 at 5:07 PM, Romain Manni-Bucau < >> >> > >> >> rmannibu...@gmail.com> >> >> > >> >> > wrote: >> >> > >> >> > >> >> > >> >> >> It sounds a good start. I'm not sure how a group by key (and >> not >> >> > by >> >> > >> >> >> size) can help controlling the checkpointing interval. >> Wonder if >> >> > we >> >> > >> >> >> shouldn't be able to have a CheckpointPolicy { boolean >> >> > >> >> >> shouldCheckpoint() } used in the processing event loop. >> Default >> >> > could >> >> > >> >> >> be up to the runner but if set on the transform (or dofn) it >> >> > would be >> >> > >> >> >> used to control when the checkpoint is done. Thinking out >> loud >> >> it >> >> > >> >> >> sounds close to jbatch checkpoint algorithm >> >> > >> >> >> (https://docs.oracle.com/javaee/7/api/javax/batch/api/ >> >> > >> >> >> chunk/CheckpointAlgorithm.html) >> >> > >> >> >> >> >> > >> >> >> Romain Manni-Bucau >> >> > >> >> >> @rmannibucau | Blog | Old Blog | Github | LinkedIn >> >> > >> >> >> >> >> > >> >> >> >> >> > >> >> >> 2017-11-15 9:55 GMT+01:00 Jean-Baptiste Onofré < >> j...@nanthrax.net >> >> >: >> >> > >> >> >> > Yes, @StableReplay, that's the annotation. Thanks. >> >> > >> >> >> > >> >> > >> >> >> > >> >> > >> >> >> > On 11/15/2017 09:52 AM, Reuven Lax wrote: >> >> > >> >> >> >> >> >> > >> >> >> >> Romain, >> >> > >> >> >> >> >> >> > >> >> >> >> I think the @StableReplay semantic that Kenn proposed a >> month >> >> > or >> >> > >> so >> >> > >> >> ago >> >> > >> >> >> is >> >> > >> >> >> >> what is needed here. >> >> > >> >> >> >> >> >> > >> >> >> >> Essentially it will ensure that the GroupByKey iterable is >> >> > stable >> >> > >> and >> >> > >> >> >> >> checkpointed. So on replay, the GroupByKey is guaranteed >> to >> >> > >> receive >> >> > >> >> the >> >> > >> >> >> >> exact same iterable as it did before. The annotation can >> be >> >> put >> >> > >> on a >> >> > >> >> >> ParDo >> >> > >> >> >> >> as well, in which case it ensures stability (and >> >> > checkpointing) of >> >> > >> >> the >> >> > >> >> >> >> individual ParDo elements. >> >> > >> >> >> >> >> >> > >> >> >> >> Reuven >> >> > >> >> >> >> >> >> > >> >> >> >> On Wed, Nov 15, 2017 at 4:49 PM, Romain Manni-Bucau >> >> > >> >> >> >> <rmannibu...@gmail.com> >> >> > >> >> >> >> wrote: >> >> > >> >> >> >> >> >> > >> >> >> >>> 2017-11-15 9:46 GMT+01:00 Jean-Baptiste Onofré < >> >> > j...@nanthrax.net >> >> > >> >: >> >> > >> >> >> >>>> >> >> > >> >> >> >>>> Hi Romain, >> >> > >> >> >> >>>> >> >> > >> >> >> >>>> You are right: currently, the chunking is related to >> >> bundles. >> >> > >> >> Today, >> >> > >> >> >> the >> >> > >> >> >> >>>> bundle size is under the runner responsibility. >> >> > >> >> >> >>>> >> >> > >> >> >> >>>> I think it's fine because only the runner know an >> efficient >> >> > >> bundle >> >> > >> >> >> size. >> >> > >> >> >> >>> >> >> > >> >> >> >>> I'm >> >> > >> >> >> >>>> >> >> > >> >> >> >>>> afraid giving the "control" of the bundle size to the >> end >> >> > user >> >> > >> (via >> >> > >> >> >> >>>> pipeline) can result to huge performances issue >> depending >> >> of >> >> > the >> >> > >> >> >> runner. >> >> > >> >> >> >>>> >> >> > >> >> >> >>>> It doesn't mean that we can't use an uber layer: it's >> what >> >> we >> >> > >> do in >> >> > >> >> >> >>>> ParDoWithBatch or DoFn in IO Sink where we have a batch >> >> size. >> >> > >> >> >> >>>> >> >> > >> >> >> >>>> Anyway, the core problem is about the checkpoint: why a >> >> > >> checkpoint >> >> > >> >> is >> >> > >> >> >> >>>> not >> >> > >> >> >> >>>> "respected" by an IO or runner ? >> >> > >> >> >> >>> >> >> > >> >> >> >>> >> >> > >> >> >> >>> >> >> > >> >> >> >>> Take the example of a runner deciding the bundle size is >> 4 >> >> and >> >> > >> the >> >> > >> >> IO >> >> > >> >> >> >>> deciding the commit-interval (batch semantic) is 2, what >> >> > happens >> >> > >> if >> >> > >> >> >> >>> the 3rd record fails? You have pushed to the store 2 >> records >> >> > >> which >> >> > >> >> can >> >> > >> >> >> >>> be reprocessed by a restart of the bundle and you can get >> >> > >> >> duplicates. >> >> > >> >> >> >>> >> >> > >> >> >> >>> Rephrased: I think we need as a framework a batch/chunk >> >> > solution >> >> > >> >> which >> >> > >> >> >> >>> is reliable. I understand bundles is mapped on the runner >> >> and >> >> > not >> >> > >> >> >> >>> really controlled but can we get something more reliable >> for >> >> > the >> >> > >> >> user? >> >> > >> >> >> >>> Maybe we need a @BeforeBatch or something like that. >> >> > >> >> >> >>> >> >> > >> >> >> >>>> >> >> > >> >> >> >>>> Regards >> >> > >> >> >> >>>> JB >> >> > >> >> >> >>>> >> >> > >> >> >> >>>> >> >> > >> >> >> >>>> On 11/15/2017 09:38 AM, Romain Manni-Bucau wrote: >> >> > >> >> >> >>>>> >> >> > >> >> >> >>>>> >> >> > >> >> >> >>>>> Hi guys, >> >> > >> >> >> >>>>> >> >> > >> >> >> >>>>> The subject is a bit provocative but the topic is real >> and >> >> > >> coming >> >> > >> >> >> >>>>> again and again with the beam usage: how a dofn can >> handle >> >> > some >> >> > >> >> >> >>>>> "chunking". >> >> > >> >> >> >>>>> >> >> > >> >> >> >>>>> The need is to be able to commit each N records but >> with N >> >> > not >> >> > >> too >> >> > >> >> >> big. >> >> > >> >> >> >>>>> >> >> > >> >> >> >>>>> The natural API for that in beam is the bundle one but >> >> > bundles >> >> > >> are >> >> > >> >> >> not >> >> > >> >> >> >>>>> reliable since they can be very small (flink) - we can >> say >> >> > it >> >> > >> is >> >> > >> >> "ok" >> >> > >> >> >> >>>>> even if it has some perf impacts - or too big (spark >> does >> >> > full >> >> > >> >> size / >> >> > >> >> >> >>>>> #workers). >> >> > >> >> >> >>>>> >> >> > >> >> >> >>>>> The workaround is what we see in the ES I/O: a maxSize >> >> which >> >> > >> does >> >> > >> >> an >> >> > >> >> >> >>>>> eager flush. The issue is that then the checkpoint is >> not >> >> > >> >> respected >> >> > >> >> >> >>>>> and you can process multiple times the same records. >> >> > >> >> >> >>>>> >> >> > >> >> >> >>>>> Any plan to make this API reliable and controllable >> from a >> >> > beam >> >> > >> >> point >> >> > >> >> >> >>>>> of view (at least in a max manner)? >> >> > >> >> >> >>>>> >> >> > >> >> >> >>>>> Thanks, >> >> > >> >> >> >>>>> Romain Manni-Bucau >> >> > >> >> >> >>>>> @rmannibucau | Blog | Old Blog | Github | LinkedIn >> >> > >> >> >> >>>>> >> >> > >> >> >> >>>> >> >> > >> >> >> >>>> -- >> >> > >> >> >> >>>> Jean-Baptiste Onofré >> >> > >> >> >> >>>> jbono...@apache.org >> >> > >> >> >> >>>> http://blog.nanthrax.net >> >> > >> >> >> >>>> Talend - http://www.talend.com >> >> > >> >> >> >>> >> >> > >> >> >> >>> >> >> > >> >> >> >> >> >> > >> >> >> > >> >> > >> >> >> > -- >> >> > >> >> >> > Jean-Baptiste Onofré >> >> > >> >> >> > jbono...@apache.org >> >> > >> >> >> > http://blog.nanthrax.net >> >> > >> >> >> > Talend - http://www.talend.com >> >> > >> >> >> >> >> > >> >> >> >> > >> >> >> > >> >> >>