it gives the fn/transform the ability to save a state - it can get back on "restart" / whatever unit we can use, probably runner dependent? Without that you need to rewrite all IO usage with something like the previous pattern which makes the IO not self sufficient and kind of makes the entry cost and usage of beam way further.
In my mind it is exactly what jbatch/spring-batch uses but adapted to beam (stream in particular) case. Romain Manni-Bucau @rmannibucau | Blog | Old Blog | Github | LinkedIn 2017-11-17 6:49 GMT+01:00 Reuven Lax <[email protected]>: > Romain, > > Can you define what you mean by checkpoint? What are the semantics, what > does it accomplish? > > Reuven > > On Fri, Nov 17, 2017 at 1:40 PM, Romain Manni-Bucau <[email protected]> > wrote: > >> Yes, what I propose earlier was: >> >> I. checkpoint marker: >> >> @AnyBeamAnnotation >> @CheckpointAfter >> public void someHook(SomeContext ctx); >> >> >> II. pipeline.apply(ParDo.of(new MyFn()).withCheckpointAlgorithm(new >> CountingAlgo())) >> >> III. (I like this one less) >> >> // in the dofn >> @CheckpointTester >> public boolean shouldCheckpoint(); >> >> IV. @Checkpointer Serializable getCheckpoint(); in the dofn per element >> >> >> >> >> Romain Manni-Bucau >> @rmannibucau | Blog | Old Blog | Github | LinkedIn >> >> >> 2017-11-17 6:06 GMT+01:00 Raghu Angadi <[email protected]>: >> > How would you define it (rough API is fine)?. Without more details, it is >> > not easy to see wider applicability and feasibility in runners. >> > >> > On Thu, Nov 16, 2017 at 1:13 PM, Romain Manni-Bucau < >> [email protected]> >> > wrote: >> > >> >> This is a fair summary of the current state but also where beam can >> have a >> >> very strong added value and make big data great and smooth. >> >> >> >> Instead of this replay feature isnt checkpointing willable? In >> particular >> >> with SDF no? >> >> >> >> >> >> Le 16 nov. 2017 19:50, "Raghu Angadi" <[email protected]> a >> >> écrit : >> >> >> >> > Core issue here is that there is no explicit concept of 'checkpoint' >> in >> >> > Beam (UnboundedSource has a method 'getCheckpointMark' but that >> refers to >> >> > the checkoint on external source). Runners do checkpoint internally as >> >> > implementation detail. Flink's checkpoint model is entirely different >> >> from >> >> > Dataflow's and Spark's. >> >> > >> >> > @StableReplay helps, but it does not explicitly talk about a >> checkpoint >> >> by >> >> > design. >> >> > >> >> > If you are looking to achieve some guarantees with a sink/DoFn, I >> think >> >> it >> >> > is better to start with the requirements. I worked on exactly-once >> sink >> >> for >> >> > Kafka (see KafkaIO.write().withEOS()), where we essentially reshard >> the >> >> > elements and assign sequence numbers to elements with in each shard. >> >> > Duplicates in replays are avoided based on these sequence numbers. >> DoFn >> >> > state API is used to buffer out-of order replays. The implementation >> >> > strategy works in Dataflow but not in Flink which has a horizontal >> >> > checkpoint. KafkaIO checks for compatibility. >> >> > >> >> > On Wed, Nov 15, 2017 at 12:38 AM, Romain Manni-Bucau < >> >> > [email protected]> >> >> > wrote: >> >> > >> >> > > Hi guys, >> >> > > >> >> > > The subject is a bit provocative but the topic is real and coming >> >> > > again and again with the beam usage: how a dofn can handle some >> >> > > "chunking". >> >> > > >> >> > > The need is to be able to commit each N records but with N not too >> big. >> >> > > >> >> > > The natural API for that in beam is the bundle one but bundles are >> not >> >> > > reliable since they can be very small (flink) - we can say it is >> "ok" >> >> > > even if it has some perf impacts - or too big (spark does full size >> / >> >> > > #workers). >> >> > > >> >> > > The workaround is what we see in the ES I/O: a maxSize which does an >> >> > > eager flush. The issue is that then the checkpoint is not respected >> >> > > and you can process multiple times the same records. >> >> > > >> >> > > Any plan to make this API reliable and controllable from a beam >> point >> >> > > of view (at least in a max manner)? >> >> > > >> >> > > Thanks, >> >> > > Romain Manni-Bucau >> >> > > @rmannibucau | Blog | Old Blog | Github | LinkedIn >> >> > > >> >> > >> >> >>
