After giving this thread my best attempt at understanding exactly what is the problem and the proposed solution, I'm afraid I still fail to understand both. To reiterate, I think the only way to make progress here is to be more concrete: (quote) take some IO that you think could be easier to write with your proposed API, give the contents of a hypothetical PCollection being written to this IO, give the code of a hypothetical DoFn implementing the write using your API, and explain what you'd expect to happen at runtime. I'm going to re-engage in this thread after such an example is given.
On Sat, Nov 18, 2017, 5:00 AM Romain Manni-Bucau <rmannibu...@gmail.com> wrote: > First bundle retry is unusable with dome runners like spark where the > bundle size is the collection size / number of work. This means a user cant > use bundle API or feature reliably and portably - which is beam promise. > Aligning chunking and bundles would guarantee that bit can be not desired, > that is why i thought it can be another feature. > > GBK works until the IO knows about that and both concepts are not always > orthogonal - backpressure like systems is a trivial common example. This > means the IO (dofn) must be able to do it itself at some point. > > Also note the GBK works only if the IO can take a list which is never the > case today. > > Big questions for me are: is SDF the way to go since it provides the needed > API bit is not yet supported? What about existing IO? Should beam provide > an auto wrapping of dofn for that pre-aggregated support and simulate > bundles to the actual IO impl to keep the existing API? > > > Le 17 nov. 2017 19:20, "Raghu Angadi" <rang...@google.com.invalid> a > écrit : > > On Fri, Nov 17, 2017 at 1:02 AM, Romain Manni-Bucau <rmannibu...@gmail.com > > > wrote: > > > Yep, just take ES IO, if a part of a bundle fails you are in an > > unmanaged state. This is the case for all O (of IO ;)). Issue is not > > much about "1" (the code it takes) but more the fact it doesn't > > integrate with runner features and retries potentially: what happens > > if a bundle has a failure? => undefined today. 2. I'm fine with it > > while we know exactly what happens when we restart after a bundle > > failure. With ES the timestamp can be used for instance. > > > > This deterministic batching can be achieved even now with an extra > GroupByKey (and if you want ordering on top of that, will need another > GBK). Don't know if that is too costly in your case. I would need bit more > details on handling ES IO write retries to see it could be simplified. Note > that retries occur with or without any failures in your DoFn. > > The biggest negative with GBK approach is that it doesn't provide same > guarantees on Flink. > > I don't see how GroubIntoBatches in Beam provides specific guarantees on > deterministic batches. > > Thinking about it the SDF is really a way to do it since the SDF will > > manage the bulking and associated with the runner "retry" it seems it > > covers the needs. > > > > Romain Manni-Bucau > > @rmannibucau | Blog | Old Blog | Github | LinkedIn > > > > > > 2017-11-17 9:23 GMT+01:00 Eugene Kirpichov <kirpic...@google.com.invalid > >: > > > I must admit I'm still failing to understand the problem, so let's step > > > back even further. > > > > > > Could you give an example of an IO that is currently difficult to > > implement > > > specifically because of lack of the feature you're talking about? > > > > > > I'm asking because I've reviewed almost all Beam IOs and don't recall > > > seeing a similar problem. Sure, a lot of IOs do batching within a > bundle, > > > but 1) it doesn't take up much code (granted, it would be even easier > if > > > Beam did it for us) and 2) I don't remember any of them requiring the > > > batches to be deterministic, and I'm having a hard time imagining what > > kind > > > of storage system would be able to deduplicate if batches were > > > deterministic but wouldn't be able to deduplicate if they weren't. > > > > > > On Thu, Nov 16, 2017 at 11:50 PM Romain Manni-Bucau < > > rmannibu...@gmail.com> > > > wrote: > > > > > >> Ok, let me try to step back and summarize what we have today and what > I > > >> miss: > > >> > > >> 1. we can handle chunking in beam through group in batch (or > equivalent) > > >> but: > > >> > it is not built-in into the transforms (IO) and it is controlled > > >> from outside the transforms so no way for a transform to do it > > >> properly without handling itself a composition and links between > > >> multiple dofns to have notifications and potentially react properly or > > >> handle backpressure from its backend > > >> 2. there is no restart feature because there is no real state handling > > >> at the moment. this sounds fully delegated to the runner but I was > > >> hoping to have more guarantees from the used API to be able to restart > > >> a pipeline (mainly batch since it can be irrelevant or delegates to > > >> the backend for streams) and handle only not commited records so it > > >> requires some persistence outside the main IO storages to do it > > >> properly > > >> > note this is somehow similar to the monitoring topic which miss > > >> persistence ATM so it can end up to beam to have a pluggable storage > > >> for a few concerns > > >> > > >> > > >> Short term I would be happy with 1 solved properly, long term I hope 2 > > >> will be tackled without workarounds requiring custom wrapping of IO to > > >> use a custom state persistence. > > >> > > >> > > >> > > >> Romain Manni-Bucau > > >> @rmannibucau | Blog | Old Blog | Github | LinkedIn > > >> > > >> > > >> 2017-11-17 7:44 GMT+01:00 Jean-Baptiste Onofré <j...@nanthrax.net>: > > >> > Thanks for the explanation. Agree, we might talk about different > > things > > >> > using the same wording. > > >> > > > >> > I'm also struggling to understand the use case (for a generic DoFn). > > >> > > > >> > Regards > > >> > JB > > >> > > > >> > > > >> > On 11/17/2017 07:40 AM, Eugene Kirpichov wrote: > > >> >> > > >> >> To avoid spending a lot of time pursuing a false path, I'd like to > > say > > >> >> straight up that SDF is definitely not going to help here, despite > > the > > >> >> fact > > >> >> that its API includes the term "checkpoint". In SDF, the > "checkpoint" > > >> >> captures the state of processing within a single element. If you're > > >> >> applying an SDF to 1000 elements, it will, like any other DoFn, be > > >> applied > > >> >> to each of them independently and in parallel, and you'll have 1000 > > >> >> checkpoints capturing the state of processing each of these > elements, > > >> >> which > > >> >> is probably not what you want. > > >> >> > > >> >> I'm afraid I still don't understand what kind of checkpoint you > > need, if > > >> >> it > > >> >> is not just deterministic grouping into batches. "Checkpoint" is a > > very > > >> >> broad term and it's very possible that everybody in this thread is > > >> talking > > >> >> about different things when saying it. So it would help if you > could > > >> give > > >> >> a > > >> >> more concrete example: for example, take some IO that you think > > could be > > >> >> easier to write with your proposed API, give the contents of a > > >> >> hypothetical > > >> >> PCollection being written to this IO, give the code of a > hypothetical > > >> DoFn > > >> >> implementing the write using your API, and explain what you'd > expect > > to > > >> >> happen at runtime. > > >> >> > > >> >> On Thu, Nov 16, 2017 at 10:33 PM Romain Manni-Bucau > > >> >> <rmannibu...@gmail.com> > > >> >> wrote: > > >> >> > > >> >>> @Eugene: yes and the other alternative of Reuven too but it is > still > > >> >>> 1. relying on timers, 2. not really checkpointed > > >> >>> > > >> >>> In other words it seems all solutions are to create a chunk of > size > > 1 > > >> >>> and replayable to fake the lack of chunking in the framework. This > > >> >>> always implies a chunk handling outside the component (typically > > >> >>> before for an output). My point is I think IO need it in their own > > >> >>> "internal" or at least control it themselves since the chunk size > is > > >> >>> part of the IO handling most of the time. > > >> >>> > > >> >>> I think JB spoke of the same "group before" trick using > restrictions > > >> >>> which can work I have to admit if SDF are implemented by runners. > Is > > >> >>> there a roadmap/status on that? Last time I checked SDF was a > great > > >> >>> API without support :(. > > >> >>> > > >> >>> > > >> >>> > > >> >>> Romain Manni-Bucau > > >> >>> @rmannibucau | Blog | Old Blog | Github | LinkedIn > > >> >>> > > >> >>> > > >> >>> 2017-11-17 7:25 GMT+01:00 Eugene Kirpichov > > >> >>> <kirpic...@google.com.invalid>: > > >> >>>> > > >> >>>> JB, not sure what you mean? SDFs and triggers are unrelated, and > > the > > >> >>>> post > > >> >>>> doesn't mention the word. Did you mean something else, e.g. > > >> restriction > > >> >>>> perhaps? Either way I don't think SDFs are the solution here; > SDFs > > >> have > > >> >>> > > >> >>> to > > >> >>>> > > >> >>>> do with the ability to split the processing of *a single element* > > over > > >> >>>> multiple calls, whereas Romain I think is asking for repeatable > > >> grouping > > >> >>> > > >> >>> of > > >> >>>> > > >> >>>> *multiple* elements. > > >> >>>> > > >> >>>> Romain - does > > >> >>>> > > >> >>> > > >> >>> > > >> https://github.com/apache/beam/blob/master/sdks/java/ > > core/src/main/java/org/apache/beam/sdk/transforms/GroupIntoBatches.java > > >> >>>> > > >> >>>> do what > > >> >>>> you want? > > >> >>>> > > >> >>>> On Thu, Nov 16, 2017 at 10:19 PM Jean-Baptiste Onofré < > > >> j...@nanthrax.net> > > >> >>>> wrote: > > >> >>>> > > >> >>>>> It sounds like the "Trigger" in the Splittable DoFn, no ? > > >> >>>>> > > >> >>>>> https://beam.apache.org/blog/2017/08/16/splittable-do-fn.html > > >> >>>>> > > >> >>>>> Regards > > >> >>>>> JB > > >> >>>>> > > >> >>>>> > > >> >>>>> On 11/17/2017 06:56 AM, Romain Manni-Bucau wrote: > > >> >>>>>> > > >> >>>>>> it gives the fn/transform the ability to save a state - it can > > get > > >> >>>>>> back on "restart" / whatever unit we can use, probably runner > > >> >>>>>> dependent? Without that you need to rewrite all IO usage with > > >> >>>>>> something like the previous pattern which makes the IO not self > > >> >>>>>> sufficient and kind of makes the entry cost and usage of beam > way > > >> >>>>>> further. > > >> >>>>>> > > >> >>>>>> In my mind it is exactly what jbatch/spring-batch uses but > > adapted > > >> to > > >> >>>>>> beam (stream in particular) case. > > >> >>>>>> > > >> >>>>>> Romain Manni-Bucau > > >> >>>>>> @rmannibucau | Blog | Old Blog | Github | LinkedIn > > >> >>>>>> > > >> >>>>>> > > >> >>>>>> 2017-11-17 6:49 GMT+01:00 Reuven Lax <re...@google.com.invalid > >: > > >> >>>>>>> > > >> >>>>>>> Romain, > > >> >>>>>>> > > >> >>>>>>> Can you define what you mean by checkpoint? What are the > > semantics, > > >> >>> > > >> >>> what > > >> >>>>>>> > > >> >>>>>>> does it accomplish? > > >> >>>>>>> > > >> >>>>>>> Reuven > > >> >>>>>>> > > >> >>>>>>> On Fri, Nov 17, 2017 at 1:40 PM, Romain Manni-Bucau < > > >> >>>>> > > >> >>>>> rmannibu...@gmail.com> > > >> >>>>>>> > > >> >>>>>>> wrote: > > >> >>>>>>> > > >> >>>>>>>> Yes, what I propose earlier was: > > >> >>>>>>>> > > >> >>>>>>>> I. checkpoint marker: > > >> >>>>>>>> > > >> >>>>>>>> @AnyBeamAnnotation > > >> >>>>>>>> @CheckpointAfter > > >> >>>>>>>> public void someHook(SomeContext ctx); > > >> >>>>>>>> > > >> >>>>>>>> > > >> >>>>>>>> II. pipeline.apply(ParDo.of(new > > >> MyFn()).withCheckpointAlgorithm(new > > >> >>>>>>>> CountingAlgo())) > > >> >>>>>>>> > > >> >>>>>>>> III. (I like this one less) > > >> >>>>>>>> > > >> >>>>>>>> // in the dofn > > >> >>>>>>>> @CheckpointTester > > >> >>>>>>>> public boolean shouldCheckpoint(); > > >> >>>>>>>> > > >> >>>>>>>> IV. @Checkpointer Serializable getCheckpoint(); in the dofn > per > > >> >>> > > >> >>> element > > >> >>>>>>>> > > >> >>>>>>>> > > >> >>>>>>>> > > >> >>>>>>>> > > >> >>>>>>>> > > >> >>>>>>>> Romain Manni-Bucau > > >> >>>>>>>> @rmannibucau | Blog | Old Blog | Github | LinkedIn > > >> >>>>>>>> > > >> >>>>>>>> > > >> >>>>>>>> 2017-11-17 6:06 GMT+01:00 Raghu Angadi > > <rang...@google.com.invalid > > >> >>>> > > >> >>>> : > > >> >>>>>>>>> > > >> >>>>>>>>> How would you define it (rough API is fine)?. Without more > > >> details, > > >> >>>>> > > >> >>>>> it is > > >> >>>>>>>>> > > >> >>>>>>>>> not easy to see wider applicability and feasibility in > > runners. > > >> >>>>>>>>> > > >> >>>>>>>>> On Thu, Nov 16, 2017 at 1:13 PM, Romain Manni-Bucau < > > >> >>>>>>>> > > >> >>>>>>>> rmannibu...@gmail.com> > > >> >>>>>>>>> > > >> >>>>>>>>> wrote: > > >> >>>>>>>>> > > >> >>>>>>>>>> This is a fair summary of the current state but also where > > beam > > >> >>> > > >> >>> can > > >> >>>>>>>> > > >> >>>>>>>> have a > > >> >>>>>>>>>> > > >> >>>>>>>>>> very strong added value and make big data great and smooth. > > >> >>>>>>>>>> > > >> >>>>>>>>>> Instead of this replay feature isnt checkpointing willable? > > In > > >> >>>>>>>> > > >> >>>>>>>> particular > > >> >>>>>>>>>> > > >> >>>>>>>>>> with SDF no? > > >> >>>>>>>>>> > > >> >>>>>>>>>> > > >> >>>>>>>>>> Le 16 nov. 2017 19:50, "Raghu Angadi" > > >> <rang...@google.com.invalid> > > >> >>> > > >> >>> a > > >> >>>>>>>>>> > > >> >>>>>>>>>> écrit : > > >> >>>>>>>>>> > > >> >>>>>>>>>>> Core issue here is that there is no explicit concept of > > >> >>> > > >> >>> 'checkpoint' > > >> >>>>>>>> > > >> >>>>>>>> in > > >> >>>>>>>>>>> > > >> >>>>>>>>>>> Beam (UnboundedSource has a method 'getCheckpointMark' but > > that > > >> >>>>>>>> > > >> >>>>>>>> refers to > > >> >>>>>>>>>>> > > >> >>>>>>>>>>> the checkoint on external source). Runners do checkpoint > > >> >>> > > >> >>> internally > > >> >>>>> > > >> >>>>> as > > >> >>>>>>>>>>> > > >> >>>>>>>>>>> implementation detail. Flink's checkpoint model is > entirely > > >> >>>>> > > >> >>>>> different > > >> >>>>>>>>>> > > >> >>>>>>>>>> from > > >> >>>>>>>>>>> > > >> >>>>>>>>>>> Dataflow's and Spark's. > > >> >>>>>>>>>>> > > >> >>>>>>>>>>> @StableReplay helps, but it does not explicitly talk about > a > > >> >>>>>>>> > > >> >>>>>>>> checkpoint > > >> >>>>>>>>>> > > >> >>>>>>>>>> by > > >> >>>>>>>>>>> > > >> >>>>>>>>>>> design. > > >> >>>>>>>>>>> > > >> >>>>>>>>>>> If you are looking to achieve some guarantees with a > > >> sink/DoFn, I > > >> >>>>>>>> > > >> >>>>>>>> think > > >> >>>>>>>>>> > > >> >>>>>>>>>> it > > >> >>>>>>>>>>> > > >> >>>>>>>>>>> is better to start with the requirements. I worked on > > >> >>> > > >> >>> exactly-once > > >> >>>>>>>> > > >> >>>>>>>> sink > > >> >>>>>>>>>> > > >> >>>>>>>>>> for > > >> >>>>>>>>>>> > > >> >>>>>>>>>>> Kafka (see KafkaIO.write().withEOS()), where we > essentially > > >> >>> > > >> >>> reshard > > >> >>>>>>>> > > >> >>>>>>>> the > > >> >>>>>>>>>>> > > >> >>>>>>>>>>> elements and assign sequence numbers to elements with in > > each > > >> >>> > > >> >>> shard. > > >> >>>>>>>>>>> > > >> >>>>>>>>>>> Duplicates in replays are avoided based on these sequence > > >> >>> > > >> >>> numbers. > > >> >>>>>>>> > > >> >>>>>>>> DoFn > > >> >>>>>>>>>>> > > >> >>>>>>>>>>> state API is used to buffer out-of order replays. The > > >> >>> > > >> >>> implementation > > >> >>>>>>>>>>> > > >> >>>>>>>>>>> strategy works in Dataflow but not in Flink which has a > > >> >>> > > >> >>> horizontal > > >> >>>>>>>>>>> > > >> >>>>>>>>>>> checkpoint. KafkaIO checks for compatibility. > > >> >>>>>>>>>>> > > >> >>>>>>>>>>> On Wed, Nov 15, 2017 at 12:38 AM, Romain Manni-Bucau < > > >> >>>>>>>>>>> rmannibu...@gmail.com> > > >> >>>>>>>>>>> wrote: > > >> >>>>>>>>>>> > > >> >>>>>>>>>>>> Hi guys, > > >> >>>>>>>>>>>> > > >> >>>>>>>>>>>> The subject is a bit provocative but the topic is real > and > > >> >>> > > >> >>> coming > > >> >>>>>>>>>>>> > > >> >>>>>>>>>>>> again and again with the beam usage: how a dofn can > handle > > >> some > > >> >>>>>>>>>>>> "chunking". > > >> >>>>>>>>>>>> > > >> >>>>>>>>>>>> The need is to be able to commit each N records but with > N > > not > > >> >>> > > >> >>> too > > >> >>>>>>>> > > >> >>>>>>>> big. > > >> >>>>>>>>>>>> > > >> >>>>>>>>>>>> > > >> >>>>>>>>>>>> The natural API for that in beam is the bundle one but > > bundles > > >> >>> > > >> >>> are > > >> >>>>>>>> > > >> >>>>>>>> not > > >> >>>>>>>>>>>> > > >> >>>>>>>>>>>> reliable since they can be very small (flink) - we can > say > > it > > >> is > > >> >>>>>>>> > > >> >>>>>>>> "ok" > > >> >>>>>>>>>>>> > > >> >>>>>>>>>>>> even if it has some perf impacts - or too big (spark does > > full > > >> >>> > > >> >>> size > > >> >>>>>>>> > > >> >>>>>>>> / > > >> >>>>>>>>>>>> > > >> >>>>>>>>>>>> #workers). > > >> >>>>>>>>>>>> > > >> >>>>>>>>>>>> The workaround is what we see in the ES I/O: a maxSize > > which > > >> >>> > > >> >>> does > > >> >>>>> > > >> >>>>> an > > >> >>>>>>>>>>>> > > >> >>>>>>>>>>>> eager flush. The issue is that then the checkpoint is not > > >> >>> > > >> >>> respected > > >> >>>>>>>>>>>> > > >> >>>>>>>>>>>> and you can process multiple times the same records. > > >> >>>>>>>>>>>> > > >> >>>>>>>>>>>> Any plan to make this API reliable and controllable from > a > > >> beam > > >> >>>>>>>> > > >> >>>>>>>> point > > >> >>>>>>>>>>>> > > >> >>>>>>>>>>>> of view (at least in a max manner)? > > >> >>>>>>>>>>>> > > >> >>>>>>>>>>>> Thanks, > > >> >>>>>>>>>>>> Romain Manni-Bucau > > >> >>>>>>>>>>>> @rmannibucau | Blog | Old Blog | Github | LinkedIn > > >> >>>>>>>>>>>> > > >> >>>>>>>>>>> > > >> >>>>>>>>>> > > >> >>>>>>>> > > >> >>>>> > > >> >>>>> -- > > >> >>>>> Jean-Baptiste Onofré > > >> >>>>> jbono...@apache.org > > >> >>>>> http://blog.nanthrax.net > > >> >>>>> Talend - http://www.talend.com > > >> >>>>> > > >> >>> > > >> >> > > >> > > > >> > -- > > >> > Jean-Baptiste Onofré > > >> > jbono...@apache.org > > >> > http://blog.nanthrax.net > > >> > Talend - http://www.talend.com > > >> > > >