Romain, Can you define what you mean by checkpoint? What are the semantics, what does it accomplish?
Reuven On Fri, Nov 17, 2017 at 1:40 PM, Romain Manni-Bucau <rmannibu...@gmail.com> wrote: > Yes, what I propose earlier was: > > I. checkpoint marker: > > @AnyBeamAnnotation > @CheckpointAfter > public void someHook(SomeContext ctx); > > > II. pipeline.apply(ParDo.of(new MyFn()).withCheckpointAlgorithm(new > CountingAlgo())) > > III. (I like this one less) > > // in the dofn > @CheckpointTester > public boolean shouldCheckpoint(); > > IV. @Checkpointer Serializable getCheckpoint(); in the dofn per element > > > > > Romain Manni-Bucau > @rmannibucau | Blog | Old Blog | Github | LinkedIn > > > 2017-11-17 6:06 GMT+01:00 Raghu Angadi <rang...@google.com.invalid>: > > How would you define it (rough API is fine)?. Without more details, it is > > not easy to see wider applicability and feasibility in runners. > > > > On Thu, Nov 16, 2017 at 1:13 PM, Romain Manni-Bucau < > rmannibu...@gmail.com> > > wrote: > > > >> This is a fair summary of the current state but also where beam can > have a > >> very strong added value and make big data great and smooth. > >> > >> Instead of this replay feature isnt checkpointing willable? In > particular > >> with SDF no? > >> > >> > >> Le 16 nov. 2017 19:50, "Raghu Angadi" <rang...@google.com.invalid> a > >> écrit : > >> > >> > Core issue here is that there is no explicit concept of 'checkpoint' > in > >> > Beam (UnboundedSource has a method 'getCheckpointMark' but that > refers to > >> > the checkoint on external source). Runners do checkpoint internally as > >> > implementation detail. Flink's checkpoint model is entirely different > >> from > >> > Dataflow's and Spark's. > >> > > >> > @StableReplay helps, but it does not explicitly talk about a > checkpoint > >> by > >> > design. > >> > > >> > If you are looking to achieve some guarantees with a sink/DoFn, I > think > >> it > >> > is better to start with the requirements. I worked on exactly-once > sink > >> for > >> > Kafka (see KafkaIO.write().withEOS()), where we essentially reshard > the > >> > elements and assign sequence numbers to elements with in each shard. > >> > Duplicates in replays are avoided based on these sequence numbers. > DoFn > >> > state API is used to buffer out-of order replays. The implementation > >> > strategy works in Dataflow but not in Flink which has a horizontal > >> > checkpoint. KafkaIO checks for compatibility. > >> > > >> > On Wed, Nov 15, 2017 at 12:38 AM, Romain Manni-Bucau < > >> > rmannibu...@gmail.com> > >> > wrote: > >> > > >> > > Hi guys, > >> > > > >> > > The subject is a bit provocative but the topic is real and coming > >> > > again and again with the beam usage: how a dofn can handle some > >> > > "chunking". > >> > > > >> > > The need is to be able to commit each N records but with N not too > big. > >> > > > >> > > The natural API for that in beam is the bundle one but bundles are > not > >> > > reliable since they can be very small (flink) - we can say it is > "ok" > >> > > even if it has some perf impacts - or too big (spark does full size > / > >> > > #workers). > >> > > > >> > > The workaround is what we see in the ES I/O: a maxSize which does an > >> > > eager flush. The issue is that then the checkpoint is not respected > >> > > and you can process multiple times the same records. > >> > > > >> > > Any plan to make this API reliable and controllable from a beam > point > >> > > of view (at least in a max manner)? > >> > > > >> > > Thanks, > >> > > Romain Manni-Bucau > >> > > @rmannibucau | Blog | Old Blog | Github | LinkedIn > >> > > > >> > > >> >