After giving this thread my best attempt at understanding exactly what is
the problem and the proposed solution, I'm afraid I still fail to
understand both. To reiterate, I think the only way to make progress here
is to be more concrete: (quote) take some IO that you think could be easier
to write with your proposed API, give the contents of a hypothetical
PCollection being written to this IO, give the code of a hypothetical DoFn
implementing the write using your API, and explain what you'd expect to
happen at runtime. I'm going to re-engage in this thread after such an
example is given.

On Sat, Nov 18, 2017, 5:00 AM Romain Manni-Bucau <rmannibu...@gmail.com>
wrote:

> First bundle retry is unusable with dome runners like spark where the
> bundle size is the collection size / number of work. This means a user cant
> use bundle API or feature reliably and portably - which is beam promise.
> Aligning chunking and bundles would guarantee that bit can be not desired,
> that is why i thought it can be another feature.
>
> GBK works until the IO knows about that and both concepts are not always
> orthogonal - backpressure like systems is a trivial common example. This
> means the IO (dofn) must be able to do it itself at some point.
>
> Also note the GBK works only if the IO can take a list which is never the
> case today.
>
> Big questions for me are: is SDF the way to go since it provides the needed
> API bit is not yet supported? What about existing IO? Should beam provide
> an auto wrapping of dofn for that pre-aggregated support and simulate
> bundles to the actual IO impl to keep the existing API?
>
>
> Le 17 nov. 2017 19:20, "Raghu Angadi" <rang...@google.com.invalid> a
> écrit :
>
> On Fri, Nov 17, 2017 at 1:02 AM, Romain Manni-Bucau <rmannibu...@gmail.com
> >
> wrote:
>
> > Yep, just take ES IO, if a part of a bundle fails you are in an
> > unmanaged state. This is the case for all O (of IO ;)). Issue is not
> > much about "1" (the code it takes) but more the fact it doesn't
> > integrate with runner features and retries potentially: what happens
> > if a bundle has a failure? => undefined today. 2. I'm fine with it
> > while we know exactly what happens when we restart after a bundle
> > failure. With ES the timestamp can be used for instance.
> >
>
> This deterministic batching can be achieved even now with an extra
> GroupByKey (and if you want ordering on top of that, will need another
> GBK). Don't know if that is too costly in your case. I would need bit more
> details on handling ES IO write retries to see it could be simplified. Note
> that retries occur with or without any failures in your DoFn.
>
> The biggest negative with GBK approach is that it doesn't provide same
> guarantees on Flink.
>
> I don't see how GroubIntoBatches in Beam provides specific guarantees on
> deterministic batches.
>
> Thinking about it the SDF is really a way to do it since the SDF will
> > manage the bulking and associated with the runner "retry" it seems it
> > covers the needs.
> >
> > Romain Manni-Bucau
> > @rmannibucau |  Blog | Old Blog | Github | LinkedIn
> >
> >
> > 2017-11-17 9:23 GMT+01:00 Eugene Kirpichov <kirpic...@google.com.invalid
> >:
> > > I must admit I'm still failing to understand the problem, so let's step
> > > back even further.
> > >
> > > Could you give an example of an IO that is currently difficult to
> > implement
> > > specifically because of lack of the feature you're talking about?
> > >
> > > I'm asking because I've reviewed almost all Beam IOs and don't recall
> > > seeing a similar problem. Sure, a lot of IOs do batching within a
> bundle,
> > > but 1) it doesn't take up much code (granted, it would be even easier
> if
> > > Beam did it for us) and 2) I don't remember any of them requiring the
> > > batches to be deterministic, and I'm having a hard time imagining what
> > kind
> > > of storage system would be able to deduplicate if batches were
> > > deterministic but wouldn't be able to deduplicate if they weren't.
> > >
> > > On Thu, Nov 16, 2017 at 11:50 PM Romain Manni-Bucau <
> > rmannibu...@gmail.com>
> > > wrote:
> > >
> > >> Ok, let me try to step back and summarize what we have today and what
> I
> > >> miss:
> > >>
> > >> 1. we can handle chunking in beam through group in batch (or
> equivalent)
> > >> but:
> > >>    > it is not built-in into the transforms (IO) and it is controlled
> > >> from outside the transforms so no way for a transform to do it
> > >> properly without handling itself a composition and links between
> > >> multiple dofns to have notifications and potentially react properly or
> > >> handle backpressure from its backend
> > >> 2. there is no restart feature because there is no real state handling
> > >> at the moment. this sounds fully delegated to the runner but I was
> > >> hoping to have more guarantees from the used API to be able to restart
> > >> a pipeline (mainly batch since it can be irrelevant or delegates to
> > >> the backend for streams) and handle only not commited records so it
> > >> requires some persistence outside the main IO storages to do it
> > >> properly
> > >>    > note this is somehow similar to the monitoring topic which miss
> > >> persistence ATM so it can end up to beam to have a pluggable storage
> > >> for a few concerns
> > >>
> > >>
> > >> Short term I would be happy with 1 solved properly, long term I hope 2
> > >> will be tackled without workarounds requiring custom wrapping of IO to
> > >> use a custom state persistence.
> > >>
> > >>
> > >>
> > >> Romain Manni-Bucau
> > >> @rmannibucau |  Blog | Old Blog | Github | LinkedIn
> > >>
> > >>
> > >> 2017-11-17 7:44 GMT+01:00 Jean-Baptiste Onofré <j...@nanthrax.net>:
> > >> > Thanks for the explanation. Agree, we might talk about different
> > things
> > >> > using the same wording.
> > >> >
> > >> > I'm also struggling to understand the use case (for a generic DoFn).
> > >> >
> > >> > Regards
> > >> > JB
> > >> >
> > >> >
> > >> > On 11/17/2017 07:40 AM, Eugene Kirpichov wrote:
> > >> >>
> > >> >> To avoid spending a lot of time pursuing a false path, I'd like to
> > say
> > >> >> straight up that SDF is definitely not going to help here, despite
> > the
> > >> >> fact
> > >> >> that its API includes the term "checkpoint". In SDF, the
> "checkpoint"
> > >> >> captures the state of processing within a single element. If you're
> > >> >> applying an SDF to 1000 elements, it will, like any other DoFn, be
> > >> applied
> > >> >> to each of them independently and in parallel, and you'll have 1000
> > >> >> checkpoints capturing the state of processing each of these
> elements,
> > >> >> which
> > >> >> is probably not what you want.
> > >> >>
> > >> >> I'm afraid I still don't understand what kind of checkpoint you
> > need, if
> > >> >> it
> > >> >> is not just deterministic grouping into batches. "Checkpoint" is a
> > very
> > >> >> broad term and it's very possible that everybody in this thread is
> > >> talking
> > >> >> about different things when saying it. So it would help if you
> could
> > >> give
> > >> >> a
> > >> >> more concrete example: for example, take some IO that you think
> > could be
> > >> >> easier to write with your proposed API, give the contents of a
> > >> >> hypothetical
> > >> >> PCollection being written to this IO, give the code of a
> hypothetical
> > >> DoFn
> > >> >> implementing the write using your API, and explain what you'd
> expect
> > to
> > >> >> happen at runtime.
> > >> >>
> > >> >> On Thu, Nov 16, 2017 at 10:33 PM Romain Manni-Bucau
> > >> >> <rmannibu...@gmail.com>
> > >> >> wrote:
> > >> >>
> > >> >>> @Eugene: yes and the other alternative of Reuven too but it is
> still
> > >> >>> 1. relying on timers, 2. not really checkpointed
> > >> >>>
> > >> >>> In other words it seems all solutions are to create a chunk of
> size
> > 1
> > >> >>> and replayable to fake the lack of chunking in the framework. This
> > >> >>> always implies a chunk handling outside the component (typically
> > >> >>> before for an output). My point is I think IO need it in their own
> > >> >>> "internal" or at least control it themselves since the chunk size
> is
> > >> >>> part of the IO handling most of the time.
> > >> >>>
> > >> >>> I think JB spoke of the same "group before" trick using
> restrictions
> > >> >>> which can work I have to admit if SDF are implemented by runners.
> Is
> > >> >>> there a roadmap/status on that? Last time I checked SDF was a
> great
> > >> >>> API without support :(.
> > >> >>>
> > >> >>>
> > >> >>>
> > >> >>> Romain Manni-Bucau
> > >> >>> @rmannibucau |  Blog | Old Blog | Github | LinkedIn
> > >> >>>
> > >> >>>
> > >> >>> 2017-11-17 7:25 GMT+01:00 Eugene Kirpichov
> > >> >>> <kirpic...@google.com.invalid>:
> > >> >>>>
> > >> >>>> JB, not sure what you mean? SDFs and triggers are unrelated, and
> > the
> > >> >>>> post
> > >> >>>> doesn't mention the word. Did you mean something else, e.g.
> > >> restriction
> > >> >>>> perhaps? Either way I don't think SDFs are the solution here;
> SDFs
> > >> have
> > >> >>>
> > >> >>> to
> > >> >>>>
> > >> >>>> do with the ability to split the processing of *a single element*
> > over
> > >> >>>> multiple calls, whereas Romain I think is asking for repeatable
> > >> grouping
> > >> >>>
> > >> >>> of
> > >> >>>>
> > >> >>>> *multiple* elements.
> > >> >>>>
> > >> >>>> Romain - does
> > >> >>>>
> > >> >>>
> > >> >>>
> > >> https://github.com/apache/beam/blob/master/sdks/java/
> > core/src/main/java/org/apache/beam/sdk/transforms/GroupIntoBatches.java
> > >> >>>>
> > >> >>>> do what
> > >> >>>> you want?
> > >> >>>>
> > >> >>>> On Thu, Nov 16, 2017 at 10:19 PM Jean-Baptiste Onofré <
> > >> j...@nanthrax.net>
> > >> >>>> wrote:
> > >> >>>>
> > >> >>>>> It sounds like the "Trigger" in the Splittable DoFn, no ?
> > >> >>>>>
> > >> >>>>> https://beam.apache.org/blog/2017/08/16/splittable-do-fn.html
> > >> >>>>>
> > >> >>>>> Regards
> > >> >>>>> JB
> > >> >>>>>
> > >> >>>>>
> > >> >>>>> On 11/17/2017 06:56 AM, Romain Manni-Bucau wrote:
> > >> >>>>>>
> > >> >>>>>> it gives the fn/transform the ability to save a state - it can
> > get
> > >> >>>>>> back on "restart" / whatever unit we can use, probably runner
> > >> >>>>>> dependent? Without that you need to rewrite all IO usage with
> > >> >>>>>> something like the previous pattern which makes the IO not self
> > >> >>>>>> sufficient and kind of makes the entry cost and usage of beam
> way
> > >> >>>>>> further.
> > >> >>>>>>
> > >> >>>>>> In my mind it is exactly what jbatch/spring-batch uses but
> > adapted
> > >> to
> > >> >>>>>> beam (stream in particular) case.
> > >> >>>>>>
> > >> >>>>>> Romain Manni-Bucau
> > >> >>>>>> @rmannibucau |  Blog | Old Blog | Github | LinkedIn
> > >> >>>>>>
> > >> >>>>>>
> > >> >>>>>> 2017-11-17 6:49 GMT+01:00 Reuven Lax <re...@google.com.invalid
> >:
> > >> >>>>>>>
> > >> >>>>>>> Romain,
> > >> >>>>>>>
> > >> >>>>>>> Can you define what you mean by checkpoint? What are the
> > semantics,
> > >> >>>
> > >> >>> what
> > >> >>>>>>>
> > >> >>>>>>> does it accomplish?
> > >> >>>>>>>
> > >> >>>>>>> Reuven
> > >> >>>>>>>
> > >> >>>>>>> On Fri, Nov 17, 2017 at 1:40 PM, Romain Manni-Bucau <
> > >> >>>>>
> > >> >>>>> rmannibu...@gmail.com>
> > >> >>>>>>>
> > >> >>>>>>> wrote:
> > >> >>>>>>>
> > >> >>>>>>>> Yes, what I propose earlier was:
> > >> >>>>>>>>
> > >> >>>>>>>> I. checkpoint marker:
> > >> >>>>>>>>
> > >> >>>>>>>> @AnyBeamAnnotation
> > >> >>>>>>>> @CheckpointAfter
> > >> >>>>>>>> public void someHook(SomeContext ctx);
> > >> >>>>>>>>
> > >> >>>>>>>>
> > >> >>>>>>>> II. pipeline.apply(ParDo.of(new
> > >> MyFn()).withCheckpointAlgorithm(new
> > >> >>>>>>>> CountingAlgo()))
> > >> >>>>>>>>
> > >> >>>>>>>> III. (I like this one less)
> > >> >>>>>>>>
> > >> >>>>>>>> // in the dofn
> > >> >>>>>>>> @CheckpointTester
> > >> >>>>>>>> public boolean shouldCheckpoint();
> > >> >>>>>>>>
> > >> >>>>>>>> IV. @Checkpointer Serializable getCheckpoint(); in the dofn
> per
> > >> >>>
> > >> >>> element
> > >> >>>>>>>>
> > >> >>>>>>>>
> > >> >>>>>>>>
> > >> >>>>>>>>
> > >> >>>>>>>>
> > >> >>>>>>>> Romain Manni-Bucau
> > >> >>>>>>>> @rmannibucau |  Blog | Old Blog | Github | LinkedIn
> > >> >>>>>>>>
> > >> >>>>>>>>
> > >> >>>>>>>> 2017-11-17 6:06 GMT+01:00 Raghu Angadi
> > <rang...@google.com.invalid
> > >> >>>>
> > >> >>>> :
> > >> >>>>>>>>>
> > >> >>>>>>>>> How would you define it (rough API is fine)?. Without more
> > >> details,
> > >> >>>>>
> > >> >>>>> it is
> > >> >>>>>>>>>
> > >> >>>>>>>>> not easy to see wider applicability and feasibility in
> > runners.
> > >> >>>>>>>>>
> > >> >>>>>>>>> On Thu, Nov 16, 2017 at 1:13 PM, Romain Manni-Bucau <
> > >> >>>>>>>>
> > >> >>>>>>>> rmannibu...@gmail.com>
> > >> >>>>>>>>>
> > >> >>>>>>>>> wrote:
> > >> >>>>>>>>>
> > >> >>>>>>>>>> This is a fair summary of the current state but also where
> > beam
> > >> >>>
> > >> >>> can
> > >> >>>>>>>>
> > >> >>>>>>>> have a
> > >> >>>>>>>>>>
> > >> >>>>>>>>>> very strong added value and make big data great and smooth.
> > >> >>>>>>>>>>
> > >> >>>>>>>>>> Instead of this replay feature isnt checkpointing willable?
> > In
> > >> >>>>>>>>
> > >> >>>>>>>> particular
> > >> >>>>>>>>>>
> > >> >>>>>>>>>> with SDF no?
> > >> >>>>>>>>>>
> > >> >>>>>>>>>>
> > >> >>>>>>>>>> Le 16 nov. 2017 19:50, "Raghu Angadi"
> > >> <rang...@google.com.invalid>
> > >> >>>
> > >> >>> a
> > >> >>>>>>>>>>
> > >> >>>>>>>>>> écrit :
> > >> >>>>>>>>>>
> > >> >>>>>>>>>>> Core issue here is that there is no explicit concept of
> > >> >>>
> > >> >>> 'checkpoint'
> > >> >>>>>>>>
> > >> >>>>>>>> in
> > >> >>>>>>>>>>>
> > >> >>>>>>>>>>> Beam (UnboundedSource has a method 'getCheckpointMark' but
> > that
> > >> >>>>>>>>
> > >> >>>>>>>> refers to
> > >> >>>>>>>>>>>
> > >> >>>>>>>>>>> the checkoint on external source). Runners do checkpoint
> > >> >>>
> > >> >>> internally
> > >> >>>>>
> > >> >>>>> as
> > >> >>>>>>>>>>>
> > >> >>>>>>>>>>> implementation detail. Flink's checkpoint model is
> entirely
> > >> >>>>>
> > >> >>>>> different
> > >> >>>>>>>>>>
> > >> >>>>>>>>>> from
> > >> >>>>>>>>>>>
> > >> >>>>>>>>>>> Dataflow's and Spark's.
> > >> >>>>>>>>>>>
> > >> >>>>>>>>>>> @StableReplay helps, but it does not explicitly talk about
> a
> > >> >>>>>>>>
> > >> >>>>>>>> checkpoint
> > >> >>>>>>>>>>
> > >> >>>>>>>>>> by
> > >> >>>>>>>>>>>
> > >> >>>>>>>>>>> design.
> > >> >>>>>>>>>>>
> > >> >>>>>>>>>>> If you are looking to achieve some guarantees with a
> > >> sink/DoFn, I
> > >> >>>>>>>>
> > >> >>>>>>>> think
> > >> >>>>>>>>>>
> > >> >>>>>>>>>> it
> > >> >>>>>>>>>>>
> > >> >>>>>>>>>>> is better to start with the requirements. I worked on
> > >> >>>
> > >> >>> exactly-once
> > >> >>>>>>>>
> > >> >>>>>>>> sink
> > >> >>>>>>>>>>
> > >> >>>>>>>>>> for
> > >> >>>>>>>>>>>
> > >> >>>>>>>>>>> Kafka (see KafkaIO.write().withEOS()), where we
> essentially
> > >> >>>
> > >> >>> reshard
> > >> >>>>>>>>
> > >> >>>>>>>> the
> > >> >>>>>>>>>>>
> > >> >>>>>>>>>>> elements and assign sequence numbers to elements with in
> > each
> > >> >>>
> > >> >>> shard.
> > >> >>>>>>>>>>>
> > >> >>>>>>>>>>> Duplicates in replays are avoided based on these sequence
> > >> >>>
> > >> >>> numbers.
> > >> >>>>>>>>
> > >> >>>>>>>> DoFn
> > >> >>>>>>>>>>>
> > >> >>>>>>>>>>> state API is used to buffer out-of order replays. The
> > >> >>>
> > >> >>> implementation
> > >> >>>>>>>>>>>
> > >> >>>>>>>>>>> strategy works in Dataflow but not in Flink which has a
> > >> >>>
> > >> >>> horizontal
> > >> >>>>>>>>>>>
> > >> >>>>>>>>>>> checkpoint. KafkaIO checks for compatibility.
> > >> >>>>>>>>>>>
> > >> >>>>>>>>>>> On Wed, Nov 15, 2017 at 12:38 AM, Romain Manni-Bucau <
> > >> >>>>>>>>>>> rmannibu...@gmail.com>
> > >> >>>>>>>>>>> wrote:
> > >> >>>>>>>>>>>
> > >> >>>>>>>>>>>> Hi guys,
> > >> >>>>>>>>>>>>
> > >> >>>>>>>>>>>> The subject is a bit provocative but the topic is real
> and
> > >> >>>
> > >> >>> coming
> > >> >>>>>>>>>>>>
> > >> >>>>>>>>>>>> again and again with the beam usage: how a dofn can
> handle
> > >> some
> > >> >>>>>>>>>>>> "chunking".
> > >> >>>>>>>>>>>>
> > >> >>>>>>>>>>>> The need is to be able to commit each N records but with
> N
> > not
> > >> >>>
> > >> >>> too
> > >> >>>>>>>>
> > >> >>>>>>>> big.
> > >> >>>>>>>>>>>>
> > >> >>>>>>>>>>>>
> > >> >>>>>>>>>>>> The natural API for that in beam is the bundle one but
> > bundles
> > >> >>>
> > >> >>> are
> > >> >>>>>>>>
> > >> >>>>>>>> not
> > >> >>>>>>>>>>>>
> > >> >>>>>>>>>>>> reliable since they can be very small (flink) - we can
> say
> > it
> > >> is
> > >> >>>>>>>>
> > >> >>>>>>>> "ok"
> > >> >>>>>>>>>>>>
> > >> >>>>>>>>>>>> even if it has some perf impacts - or too big (spark does
> > full
> > >> >>>
> > >> >>> size
> > >> >>>>>>>>
> > >> >>>>>>>> /
> > >> >>>>>>>>>>>>
> > >> >>>>>>>>>>>> #workers).
> > >> >>>>>>>>>>>>
> > >> >>>>>>>>>>>> The workaround is what we see in the ES I/O: a maxSize
> > which
> > >> >>>
> > >> >>> does
> > >> >>>>>
> > >> >>>>> an
> > >> >>>>>>>>>>>>
> > >> >>>>>>>>>>>> eager flush. The issue is that then the checkpoint is not
> > >> >>>
> > >> >>> respected
> > >> >>>>>>>>>>>>
> > >> >>>>>>>>>>>> and you can process multiple times the same records.
> > >> >>>>>>>>>>>>
> > >> >>>>>>>>>>>> Any plan to make this API reliable and controllable from
> a
> > >> beam
> > >> >>>>>>>>
> > >> >>>>>>>> point
> > >> >>>>>>>>>>>>
> > >> >>>>>>>>>>>> of view (at least in a max manner)?
> > >> >>>>>>>>>>>>
> > >> >>>>>>>>>>>> Thanks,
> > >> >>>>>>>>>>>> Romain Manni-Bucau
> > >> >>>>>>>>>>>> @rmannibucau |  Blog | Old Blog | Github | LinkedIn
> > >> >>>>>>>>>>>>
> > >> >>>>>>>>>>>
> > >> >>>>>>>>>>
> > >> >>>>>>>>
> > >> >>>>>
> > >> >>>>> --
> > >> >>>>> Jean-Baptiste Onofré
> > >> >>>>> jbono...@apache.org
> > >> >>>>> http://blog.nanthrax.net
> > >> >>>>> Talend - http://www.talend.com
> > >> >>>>>
> > >> >>>
> > >> >>
> > >> >
> > >> > --
> > >> > Jean-Baptiste Onofré
> > >> > jbono...@apache.org
> > >> > http://blog.nanthrax.net
> > >> > Talend - http://www.talend.com
> > >>
> >
>

Reply via email to