Yes, what I propose earlier was: I. checkpoint marker:
@AnyBeamAnnotation @CheckpointAfter public void someHook(SomeContext ctx); II. pipeline.apply(ParDo.of(new MyFn()).withCheckpointAlgorithm(new CountingAlgo())) III. (I like this one less) // in the dofn @CheckpointTester public boolean shouldCheckpoint(); IV. @Checkpointer Serializable getCheckpoint(); in the dofn per element Romain Manni-Bucau @rmannibucau | Blog | Old Blog | Github | LinkedIn 2017-11-17 6:06 GMT+01:00 Raghu Angadi <rang...@google.com.invalid>: > How would you define it (rough API is fine)?. Without more details, it is > not easy to see wider applicability and feasibility in runners. > > On Thu, Nov 16, 2017 at 1:13 PM, Romain Manni-Bucau <rmannibu...@gmail.com> > wrote: > >> This is a fair summary of the current state but also where beam can have a >> very strong added value and make big data great and smooth. >> >> Instead of this replay feature isnt checkpointing willable? In particular >> with SDF no? >> >> >> Le 16 nov. 2017 19:50, "Raghu Angadi" <rang...@google.com.invalid> a >> écrit : >> >> > Core issue here is that there is no explicit concept of 'checkpoint' in >> > Beam (UnboundedSource has a method 'getCheckpointMark' but that refers to >> > the checkoint on external source). Runners do checkpoint internally as >> > implementation detail. Flink's checkpoint model is entirely different >> from >> > Dataflow's and Spark's. >> > >> > @StableReplay helps, but it does not explicitly talk about a checkpoint >> by >> > design. >> > >> > If you are looking to achieve some guarantees with a sink/DoFn, I think >> it >> > is better to start with the requirements. I worked on exactly-once sink >> for >> > Kafka (see KafkaIO.write().withEOS()), where we essentially reshard the >> > elements and assign sequence numbers to elements with in each shard. >> > Duplicates in replays are avoided based on these sequence numbers. DoFn >> > state API is used to buffer out-of order replays. The implementation >> > strategy works in Dataflow but not in Flink which has a horizontal >> > checkpoint. KafkaIO checks for compatibility. >> > >> > On Wed, Nov 15, 2017 at 12:38 AM, Romain Manni-Bucau < >> > rmannibu...@gmail.com> >> > wrote: >> > >> > > Hi guys, >> > > >> > > The subject is a bit provocative but the topic is real and coming >> > > again and again with the beam usage: how a dofn can handle some >> > > "chunking". >> > > >> > > The need is to be able to commit each N records but with N not too big. >> > > >> > > The natural API for that in beam is the bundle one but bundles are not >> > > reliable since they can be very small (flink) - we can say it is "ok" >> > > even if it has some perf impacts - or too big (spark does full size / >> > > #workers). >> > > >> > > The workaround is what we see in the ES I/O: a maxSize which does an >> > > eager flush. The issue is that then the checkpoint is not respected >> > > and you can process multiple times the same records. >> > > >> > > Any plan to make this API reliable and controllable from a beam point >> > > of view (at least in a max manner)? >> > > >> > > Thanks, >> > > Romain Manni-Bucau >> > > @rmannibucau | Blog | Old Blog | Github | LinkedIn >> > > >> > >>