@Ben: would all IO be rewritten to use that and the bundle concept
dropped from the API to avoid any ambiguity and misleading usage like
in current IOs?

Romain Manni-Bucau
@rmannibucau |  Blog | Old Blog | Github | LinkedIn


2017-11-30 18:43 GMT+01:00 Ben Chambers <bchamb...@google.com>:
> Beam includes a GroupIntoBatches transform (see
> https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupIntoBatches.java)
> which I believe was intended to be used as part of such a portable IO. It
> can be used to request that elements are divided into batches of some size
> which can then be used for further processing.
>
> On Thu, Nov 30, 2017 at 9:32 AM Romain Manni-Bucau <rmannibu...@gmail.com>
> wrote:
>>
>> 2017-11-30 18:11 GMT+01:00 Eugene Kirpichov <kirpic...@google.com>:
>> > Very strong -1 from me:
>> > - Having a pipeline-global parameter is bad because it will apply to all
>> > transforms, with no ability to control it for individual transforms.
>> > This
>> > can go especially poorly because it means that when I write a transform,
>> > I
>> > don't know whether a user will set this parameter in their pipeline to a
>> > value that's perhaps good for the user's transform but really bad for my
>> > transform; and the user will likely blame my transform for poor
>> > performance.
>> > A parameter like this should be set on exactly the thing it applies to:
>> > e.g.
>> > on the particular IO; and it should be set by the IO itself, not by a
>> > user
>> > in pipeline options, because the IO author likely knows better than a
>> > user
>> > what is a good value.
>>
>> This is true and this is worse today since the user can't tune it but
>> the IO doesn't handle it as well. it is up to the runner and none
>> implement it in a way which is IO friendly -check flink and spark
>> which do the exact opposite, bundle=1 vs bundle=datatset/partitions)
>>
>> Also note it is a "max" and not an exact value in the proposal.
>>
>> > - The parameter will not achieve what many IOs want, either. In some
>> > cases,
>> > you want to limit the number of bytes you write. In some cases, you want
>> > to
>> > limit the number of values within a key that you write. In some cases,
>> > it's
>> > something else - it isn't always elements.
>>
>> Elements is the only thing users can really tune since you can't
>> assume the content.
>>
>> > - The parameter will achieve none of the issues that you I think raised
>> > in
>> > the thread above: it doesn't give deterministic replay, nor any kind of
>> > fault tolerance.
>>
>> Right, it only partially solves the first issue popping up: the
>> chunking. However I think it is a quick win.
>>
>> > - Having a parameter like this *at all* goes against Beam's "no knobs"
>> > philosophy - for all the usual reasons: 1) it encourages users to waste
>> > time
>> > looking in the wrong places when doing performance tuning: tuning
>> > parameters
>> > is almost never the best way to improve performance; 2) when users can
>> > set a
>> > tuning parameter, in my experience it is almost always set wrong, or
>> > perhaps
>> > it was once set right but then nobody updates it when the use case or
>> > implementation changes; and we can end up in a situation where the
>> > pipeline
>> > is performing poorly because of the parameter but the runner isn't
>> > allowed
>> > to choose a better value. (in experience with legacy data processing
>> > systems
>> > in Google, like MapReduce, that support plenty of tuning parameters, a
>> > very
>> > common advice to someone complaining about a poorly performing job is
>> > "have
>> > you tried removing all your parameters?")
>>
>> I would be fine with that but what is the alternative?
>>
>> > - I still fail to understand the exact issue we're talking about, and
>> > I've
>> > made a number of suggestions as to how this understanding could be
>> > achieved:
>> > show code that demonstrates the issue; and show how the code could be
>> > improved by a hypothetical API.
>>
>> First immediately blocking issue is how to batch records reliably and
>> *portably* (the biggest beam added-value IMHO).
>> Since bundles are "flush" trigger for most IO it means ensuring the
>> bundle size is somehow controllable or at least not set to a very
>> small value OOTB.
>>
>> An alternative to this proposal can be to let an IO give an hint about
>> its desired bundle size. Would work as well for that particular issue.
>> Does it sound better?
>>
>> >
>> > On Thu, Nov 30, 2017 at 6:17 AM Jean-Baptiste Onofré <j...@nanthrax.net>
>> > wrote:
>> >>
>> >> It sounds reasonable to me.
>> >>
>> >> And agree for Spark, I would like to merge Spark 2 update first.
>> >>
>> >> Regards
>> >> JB
>> >>
>> >> On 11/30/2017 03:09 PM, Romain Manni-Bucau wrote:
>> >> > Guys,
>> >> >
>> >> > what about moving getMaxBundleSize from flink options to pipeline
>> >> > options. I think all runners can support it right? Spark code needs
>> >> > the merge of the v2 before being able to be implemented probably but
>> >> > I
>> >> > don't see any blocker.
>> >> >
>> >> > wdyt?
>> >> >
>> >> > Romain Manni-Bucau
>> >> > @rmannibucau |  Blog | Old Blog | Github | LinkedIn
>> >> >
>> >> >
>> >> > 2017-11-19 8:19 GMT+01:00 Romain Manni-Bucau <rmannibu...@gmail.com>:
>> >> >> @Eugene: "workaround" as specific to the IO each time and therefore
>> >> >> still highlight a lack in the core.
>> >> >>
>> >> >> Other comments inline
>> >> >>
>> >> >>
>> >> >> 2017-11-19 7:40 GMT+01:00 Robert Bradshaw
>> >> >> <rober...@google.com.invalid>:
>> >> >>> There is a possible fourth issue that we don't handle well:
>> >> >>> efficiency. For
>> >> >>> very large bundles, it may be advantageous to avoid replaying a
>> >> >>> bunch
>> >> >>> of
>> >> >>> idempotent operations if there were a way to record what ones we're
>> >> >>> sure
>> >> >>> went through. Not sure if that's the issue here (though one could
>> >> >>> possibly
>> >> >>> do this with SDFs, one can preemptively returning periodically
>> >> >>> before
>> >> >>> an
>> >> >>> element (or portion thereof) is done).
>> >> >>
>> >> >> +1, also lead to the IO handling its own chunking/bundles and
>> >> >> therefore solves all issues at once.
>> >> >>
>> >> >>>
>> >> >>> On Sat, Nov 18, 2017 at 6:58 PM, Eugene Kirpichov <
>> >> >>> kirpic...@google.com.invalid> wrote:
>> >> >>>
>> >> >>>> I disagree that the usage of document id in ES is a "workaround" -
>> >> >>>> it
>> >> >>>> does
>> >> >>>> not address any *accidental *complexity
>> >> >>>> <https://en.wikipedia.org/wiki/No_Silver_Bullet> coming from
>> >> >>>> shortcomings
>> >> >>>> of Beam, it addresses the *essential* complexity that a
>> >> >>>> distributed
>> >> >>>> system
>> >> >>>> forces one to take it as a fact of nature that the same write
>> >> >>>> (mutation) will happen multiple times, so if you want a mutation
>> >> >>>> to
>> >> >>>> happen
>> >> >>>> "as-if" it happened exactly once, the mutation itself must be
>> >> >>>> idempotent
>> >> >>>> <https://en.wikipedia.org/wiki/Idempotence>. Insert-with-id
>> >> >>>> (upsert
>> >> >>>> <https://en.wikipedia.org/wiki/Merge_(SQL)>) is a classic example
>> >> >>>> of
>> >> >>>> an
>> >> >>>> idempotent mutation, and it's very good that Elasticsearch
>> >> >>>> provides
>> >> >>>> it - if
>> >> >>>> it didn't, no matter how good of an API Beam had, achieving
>> >> >>>> exactly-once
>> >> >>>> writes would be theoretically impossible. Are we in agreement on
>> >> >>>> this
>> >> >>>> so
>> >> >>>> far?
>> >> >>>>
>> >> >>>> Next: you seem to be discussing 3 issues together, all of which
>> >> >>>> are
>> >> >>>> valid
>> >> >>>> issues, but they seem unrelated to me:
>> >> >>>> 1. Exactly-once mutation
>> >> >>>> 2. Batching multiple mutations into one RPC.
>> >> >>>> 3. Backpressure
>> >> >>>>
>> >> >>>> #1: was considered above. The system the IO is talking to has to
>> >> >>>> support
>> >> >>>> idempotent mutations, in an IO-specific way, and the IO has to
>> >> >>>> take
>> >> >>>> advantage of them, in the IO-specific way - end of story.
>> >> >>
>> >> >> Agree but don't forget the original point was about "chunks" and not
>> >> >> individual records.
>> >> >>
>> >> >>>>
>> >> >>>> #2: a batch of idempotent operations is also idempotent, so this
>> >> >>>> doesn't
>> >> >>>> add anything new semantically. Syntactically - Beam already allows
>> >> >>>> you to
>> >> >>>> write your own batching by notifying you of permitted batch
>> >> >>>> boundaries
>> >> >>>> (Start/FinishBundle). Sure, it could do more, but from my
>> >> >>>> experience
>> >> >>>> the
>> >> >>>> batching in IOs I've seen is one of the easiest and least
>> >> >>>> error-prone
>> >> >>>> parts, so I don't see something worth an extended discussion here.
>> >> >>
>> >> >> "Beam already allows you to
>> >> >>   write your own batching by notifying you of permitted batch
>> >> >> boundaries
>> >> >>   (Start/FinishBundle)"
>> >> >>
>> >> >> Is wrong since the bundle is potentially the whole PCollection
>> >> >> (spark)
>> >> >> so this is not even an option until you use the SDF (back to the
>> >> >> same
>> >> >> point).
>> >> >> Once again the API looks fine but no implementation makes it true.
>> >> >> It
>> >> >> would be easy to change it in spark, flink can be ok since it
>> >> >> targets
>> >> >> more the streaming case, not sure of others, any idea?
>> >> >>
>> >> >>
>> >> >>>>
>> >> >>>> #3: handling backpressure is a complex problem with multiple
>> >> >>>> facets:
>> >> >>>> 1) how
>> >> >>>> do you know you're being throttled, and by how much are you
>> >> >>>> exceeding
>> >> >>>> the
>> >> >>>> external system's capacity?
>> >> >>
>> >> >> This is the whole point of backpressure, the system sends it back to
>> >> >> you (header like or status technic in general)
>> >> >>
>> >> >>>> 2) how do you communicate this signal to the
>> >> >>>> runner?
>> >> >>
>> >> >> You are a client so you get the meta in the response - whatever
>> >> >> techno.
>> >> >>
>> >> >>>> 3) what does the runner do in response?
>> >> >>
>> >> >> Runner nothing but the IO adapts its handling as mentionned before
>> >> >> (wait and retry, skip, ... depending the config)
>> >> >>
>> >> >>>> 4) how do you wait until
>> >> >>>> it's ok to try again?
>> >> >>
>> >> >> This is one point to probably enhance in beam but waiting in the
>> >> >> processing is an option if the source has some buffering otherwise
>> >> >> it
>> >> >> requires to have a buffer fallback and max size if the wait mode is
>> >> >> activated.
>> >> >>
>> >> >>>> You seem to be advocating for solving one facet of this problem,
>> >> >>>> which is:
>> >> >>>> you want it to be possible to signal to the runner "I'm being
>> >> >>>> throttled,
>> >> >>>> please end the bundle", right? If so - I think this (ending the
>> >> >>>> bundle) is
>> >> >>>> unnecessary: the DoFn can simply do an exponential back-off sleep
>> >> >>>> loop.
>> >> >>
>> >> >> Agree, never said the runner should know but GBK+output doesnt work
>> >> >> cause you dont own the GBK.
>> >> >>
>> >> >>>> This is e.g. what DatastoreIO does
>> >> >>>> <https://github.com/apache/beam/blob/master/sdks/java/io/
>> >> >>>> google-cloud-platform/src/main/java/org/apache/beam/sdk/
>> >> >>>> io/gcp/datastore/DatastoreV1.java#L1318>
>> >> >>>> and
>> >> >>>> this is in general how most systems I've seen handle backpressure.
>> >> >>>> Is
>> >> >>>> there
>> >> >>>> something I'm missing? In particular, is there any compelling
>> >> >>>> reason
>> >> >>>> why
>> >> >>>> you think it'd be beneficial e.g. for DatastoreIO to commit the
>> >> >>>> results of
>> >> >>>> the bundle so far before processing other elements?
>> >> >>
>> >> >> It was more about ensuring you validate early a subset of the whole
>> >> >> bundle and avoid to reprocess it if it fails later.
>> >> >>
>> >> >>
>> >> >> So to summarize I see 2 outcomes:
>> >> >>
>> >> >> 1. impl SDF in all runners
>> >> >> 2. make the bundle size upper bounded - through a pipeline option -
>> >> >> in
>> >> >> all runners, not sure this one is doable everywhere since I mainly
>> >> >> checked spark case
>> >> >>
>> >> >>>>
>> >> >>>> Again, it might be that I'm still misunderstanding what you're
>> >> >>>> trying
>> >> >>>> to
>> >> >>>> say. One of the things it would help to clarify would be - exactly
>> >> >>>> what do
>> >> >>>> you mean by "how batch frameworks solved that for years": can you
>> >> >>>> point at
>> >> >>>> an existing API in some other framework that achieves what you
>> >> >>>> want?
>> >> >>>>
>> >> >>>> On Sat, Nov 18, 2017 at 1:02 PM Romain Manni-Bucau
>> >> >>>> <rmannibu...@gmail.com>
>> >> >>>> wrote:
>> >> >>>>
>> >> >>>>> Eugene, point - and issue with a single sample - is you can
>> >> >>>>> always
>> >> >>>>> find
>> >> >>>>> *workarounds* on a case by case basis as the id one with ES but
>> >> >>>>> beam
>> >> >>>> doesnt
>> >> >>>>> solve the problem as a framework.
>> >> >>>>>
>> >> >>>>>  From my past, I clearly dont see how batch frameworks solved
>> >> >>>>> that
>> >> >>>>> for
>> >> >>>> years
>> >> >>>>> and beam is not able to do it - keep in mind it is the same kind
>> >> >>>>> of
>> >> >>>> techno,
>> >> >>>>> it just uses different sources and bigger clusters so no real
>> >> >>>>> reason
>> >> >>>>> to
>> >> >>>> not
>> >> >>>>> have the same feature quality. The only potential reason i see is
>> >> >>>>> there
>> >> >>>> is
>> >> >>>>> no tracking of the state into the cluster - e2e. But i dont see
>> >> >>>>> why
>> >> >>>>> there
>> >> >>>>> wouldnt be. Do I miss something here?
>> >> >>>>>
>> >> >>>>> An example could be: take a github crawler computing stats on the
>> >> >>>>> whole
>> >> >>>>> girhub repos which is based on a rest client as example. You will
>> >> >>>>> need to
>> >> >>>>> handle the rate limit and likely want to "commit" each time you
>> >> >>>>> reach a
>> >> >>>>> rate limit with likely some buffering strategy with a max size
>> >> >>>>> before
>> >> >>>>> really waiting. How do you do it with a GBK independent of your
>> >> >>>>> dofn? You
>> >> >>>>> are not able to compose correctly the fn between them :(.
>> >> >>>>>
>> >> >>>>>
>> >> >>>>> Le 18 nov. 2017 20:48, "Eugene Kirpichov"
>> >> >>>>> <kirpic...@google.com.invalid>
>> >> >>>> a
>> >> >>>>> écrit :
>> >> >>>>>
>> >> >>>>> After giving this thread my best attempt at understanding exactly
>> >> >>>>> what is
>> >> >>>>> the problem and the proposed solution, I'm afraid I still fail to
>> >> >>>>> understand both. To reiterate, I think the only way to make
>> >> >>>>> progress
>> >> >>>>> here
>> >> >>>>> is to be more concrete: (quote) take some IO that you think could
>> >> >>>>> be
>> >> >>>> easier
>> >> >>>>> to write with your proposed API, give the contents of a
>> >> >>>>> hypothetical
>> >> >>>>> PCollection being written to this IO, give the code of a
>> >> >>>>> hypothetical
>> >> >>>> DoFn
>> >> >>>>> implementing the write using your API, and explain what you'd
>> >> >>>>> expect
>> >> >>>>> to
>> >> >>>>> happen at runtime. I'm going to re-engage in this thread after
>> >> >>>>> such
>> >> >>>>> an
>> >> >>>>> example is given.
>> >> >>>>>
>> >> >>>>> On Sat, Nov 18, 2017, 5:00 AM Romain Manni-Bucau
>> >> >>>>> <rmannibu...@gmail.com>
>> >> >>>>> wrote:
>> >> >>>>>
>> >> >>>>>> First bundle retry is unusable with dome runners like spark
>> >> >>>>>> where
>> >> >>>>>> the
>> >> >>>>>> bundle size is the collection size / number of work. This means
>> >> >>>>>> a
>> >> >>>>>> user
>> >> >>>>> cant
>> >> >>>>>> use bundle API or feature reliably and portably - which is beam
>> >> >>>> promise.
>> >> >>>>>> Aligning chunking and bundles would guarantee that bit can be
>> >> >>>>>> not
>> >> >>>>> desired,
>> >> >>>>>> that is why i thought it can be another feature.
>> >> >>>>>>
>> >> >>>>>> GBK works until the IO knows about that and both concepts are
>> >> >>>>>> not
>> >> >>>> always
>> >> >>>>>> orthogonal - backpressure like systems is a trivial common
>> >> >>>>>> example.
>> >> >>>> This
>> >> >>>>>> means the IO (dofn) must be able to do it itself at some point.
>> >> >>>>>>
>> >> >>>>>> Also note the GBK works only if the IO can take a list which is
>> >> >>>>>> never
>> >> >>>> the
>> >> >>>>>> case today.
>> >> >>>>>>
>> >> >>>>>> Big questions for me are: is SDF the way to go since it provides
>> >> >>>>>> the
>> >> >>>>> needed
>> >> >>>>>> API bit is not yet supported? What about existing IO? Should
>> >> >>>>>> beam
>> >> >>>> provide
>> >> >>>>>> an auto wrapping of dofn for that pre-aggregated support and
>> >> >>>>>> simulate
>> >> >>>>>> bundles to the actual IO impl to keep the existing API?
>> >> >>>>>>
>> >> >>>>>>
>> >> >>>>>> Le 17 nov. 2017 19:20, "Raghu Angadi"
>> >> >>>>>> <rang...@google.com.invalid>
>> >> >>>>>> a
>> >> >>>>>> écrit :
>> >> >>>>>>
>> >> >>>>>> On Fri, Nov 17, 2017 at 1:02 AM, Romain Manni-Bucau <
>> >> >>>>> rmannibu...@gmail.com
>> >> >>>>>>>
>> >> >>>>>> wrote:
>> >> >>>>>>
>> >> >>>>>>> Yep, just take ES IO, if a part of a bundle fails you are in an
>> >> >>>>>>> unmanaged state. This is the case for all O (of IO ;)). Issue
>> >> >>>>>>> is
>> >> >>>>>>> not
>> >> >>>>>>> much about "1" (the code it takes) but more the fact it doesn't
>> >> >>>>>>> integrate with runner features and retries potentially: what
>> >> >>>>>>> happens
>> >> >>>>>>> if a bundle has a failure? => undefined today. 2. I'm fine with
>> >> >>>>>>> it
>> >> >>>>>>> while we know exactly what happens when we restart after a
>> >> >>>>>>> bundle
>> >> >>>>>>> failure. With ES the timestamp can be used for instance.
>> >> >>>>>>>
>> >> >>>>>>
>> >> >>>>>> This deterministic batching can be achieved even now with an
>> >> >>>>>> extra
>> >> >>>>>> GroupByKey (and if you want ordering on top of that, will need
>> >> >>>>>> another
>> >> >>>>>> GBK). Don't know if that is too costly in your case. I would
>> >> >>>>>> need
>> >> >>>>>> bit
>> >> >>>>> more
>> >> >>>>>> details on handling ES IO write retries to see it could be
>> >> >>>>>> simplified.
>> >> >>>>> Note
>> >> >>>>>> that retries occur with or without any failures in your DoFn.
>> >> >>>>>>
>> >> >>>>>> The biggest negative with GBK approach is that it doesn't
>> >> >>>>>> provide
>> >> >>>>>> same
>> >> >>>>>> guarantees on Flink.
>> >> >>>>>>
>> >> >>>>>> I don't see how GroubIntoBatches in Beam provides specific
>> >> >>>>>> guarantees
>> >> >>>> on
>> >> >>>>>> deterministic batches.
>> >> >>>>>>
>> >> >>>>>> Thinking about it the SDF is really a way to do it since the SDF
>> >> >>>>>> will
>> >> >>>>>>> manage the bulking and associated with the runner "retry" it
>> >> >>>>>>> seems
>> >> >>>>>>> it
>> >> >>>>>>> covers the needs.
>> >> >>>>>>>
>> >> >>>>>>> Romain Manni-Bucau
>> >> >>>>>>> @rmannibucau |  Blog | Old Blog | Github | LinkedIn
>> >> >>>>>>>
>> >> >>>>>>>
>> >> >>>>>>> 2017-11-17 9:23 GMT+01:00 Eugene Kirpichov
>> >> >>>>> <kirpic...@google.com.invalid
>> >> >>>>>>> :
>> >> >>>>>>>> I must admit I'm still failing to understand the problem, so
>> >> >>>>>>>> let's
>> >> >>>>> step
>> >> >>>>>>>> back even further.
>> >> >>>>>>>>
>> >> >>>>>>>> Could you give an example of an IO that is currently difficult
>> >> >>>>>>>> to
>> >> >>>>>>> implement
>> >> >>>>>>>> specifically because of lack of the feature you're talking
>> >> >>>>>>>> about?
>> >> >>>>>>>>
>> >> >>>>>>>> I'm asking because I've reviewed almost all Beam IOs and don't
>> >> >>>> recall
>> >> >>>>>>>> seeing a similar problem. Sure, a lot of IOs do batching
>> >> >>>>>>>> within a
>> >> >>>>>> bundle,
>> >> >>>>>>>> but 1) it doesn't take up much code (granted, it would be even
>> >> >>>> easier
>> >> >>>>>> if
>> >> >>>>>>>> Beam did it for us) and 2) I don't remember any of them
>> >> >>>>>>>> requiring
>> >> >>>> the
>> >> >>>>>>>> batches to be deterministic, and I'm having a hard time
>> >> >>>>>>>> imagining
>> >> >>>>> what
>> >> >>>>>>> kind
>> >> >>>>>>>> of storage system would be able to deduplicate if batches were
>> >> >>>>>>>> deterministic but wouldn't be able to deduplicate if they
>> >> >>>>>>>> weren't.
>> >> >>>>>>>>
>> >> >>>>>>>> On Thu, Nov 16, 2017 at 11:50 PM Romain Manni-Bucau <
>> >> >>>>>>> rmannibu...@gmail.com>
>> >> >>>>>>>> wrote:
>> >> >>>>>>>>
>> >> >>>>>>>>> Ok, let me try to step back and summarize what we have today
>> >> >>>>>>>>> and
>> >> >>>>> what
>> >> >>>>>> I
>> >> >>>>>>>>> miss:
>> >> >>>>>>>>>
>> >> >>>>>>>>> 1. we can handle chunking in beam through group in batch (or
>> >> >>>>>> equivalent)
>> >> >>>>>>>>> but:
>> >> >>>>>>>>>     > it is not built-in into the transforms (IO) and it is
>> >> >>>>> controlled
>> >> >>>>>>>>> from outside the transforms so no way for a transform to do
>> >> >>>>>>>>> it
>> >> >>>>>>>>> properly without handling itself a composition and links
>> >> >>>>>>>>> between
>> >> >>>>>>>>> multiple dofns to have notifications and potentially react
>> >> >>>> properly
>> >> >>>>> or
>> >> >>>>>>>>> handle backpressure from its backend
>> >> >>>>>>>>> 2. there is no restart feature because there is no real state
>> >> >>>>> handling
>> >> >>>>>>>>> at the moment. this sounds fully delegated to the runner but
>> >> >>>>>>>>> I
>> >> >>>>>>>>> was
>> >> >>>>>>>>> hoping to have more guarantees from the used API to be able
>> >> >>>>>>>>> to
>> >> >>>>> restart
>> >> >>>>>>>>> a pipeline (mainly batch since it can be irrelevant or
>> >> >>>>>>>>> delegates
>> >> >>>> to
>> >> >>>>>>>>> the backend for streams) and handle only not commited records
>> >> >>>>>>>>> so
>> >> >>>> it
>> >> >>>>>>>>> requires some persistence outside the main IO storages to do
>> >> >>>>>>>>> it
>> >> >>>>>>>>> properly
>> >> >>>>>>>>>     > note this is somehow similar to the monitoring topic
>> >> >>>>>>>>> which
>> >> >>>> miss
>> >> >>>>>>>>> persistence ATM so it can end up to beam to have a pluggable
>> >> >>>> storage
>> >> >>>>>>>>> for a few concerns
>> >> >>>>>>>>>
>> >> >>>>>>>>>
>> >> >>>>>>>>> Short term I would be happy with 1 solved properly, long term
>> >> >>>>>>>>> I
>> >> >>>> hope
>> >> >>>>> 2
>> >> >>>>>>>>> will be tackled without workarounds requiring custom wrapping
>> >> >>>>>>>>> of
>> >> >>>> IO
>> >> >>>>> to
>> >> >>>>>>>>> use a custom state persistence.
>> >> >>>>>>>>>
>> >> >>>>>>>>>
>> >> >>>>>>>>>
>> >> >>>>>>>>> Romain Manni-Bucau
>> >> >>>>>>>>> @rmannibucau |  Blog | Old Blog | Github | LinkedIn
>> >> >>>>>>>>>
>> >> >>>>>>>>>
>> >> >>>>>>>>> 2017-11-17 7:44 GMT+01:00 Jean-Baptiste Onofré
>> >> >>>>>>>>> <j...@nanthrax.net>:
>> >> >>>>>>>>>> Thanks for the explanation. Agree, we might talk about
>> >> >>>>>>>>>> different
>> >> >>>>>>> things
>> >> >>>>>>>>>> using the same wording.
>> >> >>>>>>>>>>
>> >> >>>>>>>>>> I'm also struggling to understand the use case (for a
>> >> >>>>>>>>>> generic
>> >> >>>>> DoFn).
>> >> >>>>>>>>>>
>> >> >>>>>>>>>> Regards
>> >> >>>>>>>>>> JB
>> >> >>>>>>>>>>
>> >> >>>>>>>>>>
>> >> >>>>>>>>>> On 11/17/2017 07:40 AM, Eugene Kirpichov wrote:
>> >> >>>>>>>>>>>
>> >> >>>>>>>>>>> To avoid spending a lot of time pursuing a false path, I'd
>> >> >>>>>>>>>>> like
>> >> >>>>> to
>> >> >>>>>>> say
>> >> >>>>>>>>>>> straight up that SDF is definitely not going to help here,
>> >> >>>>> despite
>> >> >>>>>>> the
>> >> >>>>>>>>>>> fact
>> >> >>>>>>>>>>> that its API includes the term "checkpoint". In SDF, the
>> >> >>>>>> "checkpoint"
>> >> >>>>>>>>>>> captures the state of processing within a single element.
>> >> >>>>>>>>>>> If
>> >> >>>>> you're
>> >> >>>>>>>>>>> applying an SDF to 1000 elements, it will, like any other
>> >> >>>>>>>>>>> DoFn,
>> >> >>>>> be
>> >> >>>>>>>>> applied
>> >> >>>>>>>>>>> to each of them independently and in parallel, and you'll
>> >> >>>>>>>>>>> have
>> >> >>>>> 1000
>> >> >>>>>>>>>>> checkpoints capturing the state of processing each of these
>> >> >>>>>> elements,
>> >> >>>>>>>>>>> which
>> >> >>>>>>>>>>> is probably not what you want.
>> >> >>>>>>>>>>>
>> >> >>>>>>>>>>> I'm afraid I still don't understand what kind of checkpoint
>> >> >>>>>>>>>>> you
>> >> >>>>>>> need, if
>> >> >>>>>>>>>>> it
>> >> >>>>>>>>>>> is not just deterministic grouping into batches.
>> >> >>>>>>>>>>> "Checkpoint"
>> >> >>>> is
>> >> >>>>> a
>> >> >>>>>>> very
>> >> >>>>>>>>>>> broad term and it's very possible that everybody in this
>> >> >>>>>>>>>>> thread
>> >> >>>>> is
>> >> >>>>>>>>> talking
>> >> >>>>>>>>>>> about different things when saying it. So it would help if
>> >> >>>>>>>>>>> you
>> >> >>>>>> could
>> >> >>>>>>>>> give
>> >> >>>>>>>>>>> a
>> >> >>>>>>>>>>> more concrete example: for example, take some IO that you
>> >> >>>>>>>>>>> think
>> >> >>>>>>> could be
>> >> >>>>>>>>>>> easier to write with your proposed API, give the contents
>> >> >>>>>>>>>>> of a
>> >> >>>>>>>>>>> hypothetical
>> >> >>>>>>>>>>> PCollection being written to this IO, give the code of a
>> >> >>>>>> hypothetical
>> >> >>>>>>>>> DoFn
>> >> >>>>>>>>>>> implementing the write using your API, and explain what
>> >> >>>>>>>>>>> you'd
>> >> >>>>>> expect
>> >> >>>>>>> to
>> >> >>>>>>>>>>> happen at runtime.
>> >> >>>>>>>>>>>
>> >> >>>>>>>>>>> On Thu, Nov 16, 2017 at 10:33 PM Romain Manni-Bucau
>> >> >>>>>>>>>>> <rmannibu...@gmail.com>
>> >> >>>>>>>>>>> wrote:
>> >> >>>>>>>>>>>
>> >> >>>>>>>>>>>> @Eugene: yes and the other alternative of Reuven too but
>> >> >>>>>>>>>>>> it
>> >> >>>>>>>>>>>> is
>> >> >>>>>> still
>> >> >>>>>>>>>>>> 1. relying on timers, 2. not really checkpointed
>> >> >>>>>>>>>>>>
>> >> >>>>>>>>>>>> In other words it seems all solutions are to create a
>> >> >>>>>>>>>>>> chunk
>> >> >>>>>>>>>>>> of
>> >> >>>>>> size
>> >> >>>>>>> 1
>> >> >>>>>>>>>>>> and replayable to fake the lack of chunking in the
>> >> >>>>>>>>>>>> framework.
>> >> >>>>> This
>> >> >>>>>>>>>>>> always implies a chunk handling outside the component
>> >> >>>> (typically
>> >> >>>>>>>>>>>> before for an output). My point is I think IO need it in
>> >> >>>>>>>>>>>> their
>> >> >>>>> own
>> >> >>>>>>>>>>>> "internal" or at least control it themselves since the
>> >> >>>>>>>>>>>> chunk
>> >> >>>>> size
>> >> >>>>>> is
>> >> >>>>>>>>>>>> part of the IO handling most of the time.
>> >> >>>>>>>>>>>>
>> >> >>>>>>>>>>>> I think JB spoke of the same "group before" trick using
>> >> >>>>>> restrictions
>> >> >>>>>>>>>>>> which can work I have to admit if SDF are implemented by
>> >> >>>>> runners.
>> >> >>>>>> Is
>> >> >>>>>>>>>>>> there a roadmap/status on that? Last time I checked SDF
>> >> >>>>>>>>>>>> was a
>> >> >>>>>> great
>> >> >>>>>>>>>>>> API without support :(.
>> >> >>>>>>>>>>>>
>> >> >>>>>>>>>>>>
>> >> >>>>>>>>>>>>
>> >> >>>>>>>>>>>> Romain Manni-Bucau
>> >> >>>>>>>>>>>> @rmannibucau |  Blog | Old Blog | Github | LinkedIn
>> >> >>>>>>>>>>>>
>> >> >>>>>>>>>>>>
>> >> >>>>>>>>>>>> 2017-11-17 7:25 GMT+01:00 Eugene Kirpichov
>> >> >>>>>>>>>>>> <kirpic...@google.com.invalid>:
>> >> >>>>>>>>>>>>>
>> >> >>>>>>>>>>>>> JB, not sure what you mean? SDFs and triggers are
>> >> >>>>>>>>>>>>> unrelated,
>> >> >>>>> and
>> >> >>>>>>> the
>> >> >>>>>>>>>>>>> post
>> >> >>>>>>>>>>>>> doesn't mention the word. Did you mean something else,
>> >> >>>>>>>>>>>>> e.g.
>> >> >>>>>>>>> restriction
>> >> >>>>>>>>>>>>> perhaps? Either way I don't think SDFs are the solution
>> >> >>>>>>>>>>>>> here;
>> >> >>>>>> SDFs
>> >> >>>>>>>>> have
>> >> >>>>>>>>>>>>
>> >> >>>>>>>>>>>> to
>> >> >>>>>>>>>>>>>
>> >> >>>>>>>>>>>>> do with the ability to split the processing of *a single
>> >> >>>>> element*
>> >> >>>>>>> over
>> >> >>>>>>>>>>>>> multiple calls, whereas Romain I think is asking for
>> >> >>>> repeatable
>> >> >>>>>>>>> grouping
>> >> >>>>>>>>>>>>
>> >> >>>>>>>>>>>> of
>> >> >>>>>>>>>>>>>
>> >> >>>>>>>>>>>>> *multiple* elements.
>> >> >>>>>>>>>>>>>
>> >> >>>>>>>>>>>>> Romain - does
>> >> >>>>>>>>>>>>>
>> >> >>>>>>>>>>>>
>> >> >>>>>>>>>>>>
>> >> >>>>>>>>> https://github.com/apache/beam/blob/master/sdks/java/
>> >> >>>>>>> core/src/main/java/org/apache/beam/sdk/transforms/
>> >> >>>> GroupIntoBatches.java
>> >> >>>>>>>>>>>>>
>> >> >>>>>>>>>>>>> do what
>> >> >>>>>>>>>>>>> you want?
>> >> >>>>>>>>>>>>>
>> >> >>>>>>>>>>>>> On Thu, Nov 16, 2017 at 10:19 PM Jean-Baptiste Onofré <
>> >> >>>>>>>>> j...@nanthrax.net>
>> >> >>>>>>>>>>>>> wrote:
>> >> >>>>>>>>>>>>>
>> >> >>>>>>>>>>>>>> It sounds like the "Trigger" in the Splittable DoFn, no
>> >> >>>>>>>>>>>>>> ?
>> >> >>>>>>>>>>>>>>
>> >> >>>>>>>>>>>>>>
>> >> >>>>>>>>>>>>>> https://beam.apache.org/blog/2017/08/16/splittable-do-fn.
>> >> >>>> html
>> >> >>>>>>>>>>>>>>
>> >> >>>>>>>>>>>>>> Regards
>> >> >>>>>>>>>>>>>> JB
>> >> >>>>>>>>>>>>>>
>> >> >>>>>>>>>>>>>>
>> >> >>>>>>>>>>>>>> On 11/17/2017 06:56 AM, Romain Manni-Bucau wrote:
>> >> >>>>>>>>>>>>>>>
>> >> >>>>>>>>>>>>>>> it gives the fn/transform the ability to save a state -
>> >> >>>>>>>>>>>>>>> it
>> >> >>>>> can
>> >> >>>>>>> get
>> >> >>>>>>>>>>>>>>> back on "restart" / whatever unit we can use, probably
>> >> >>>> runner
>> >> >>>>>>>>>>>>>>> dependent? Without that you need to rewrite all IO
>> >> >>>>>>>>>>>>>>> usage
>> >> >>>> with
>> >> >>>>>>>>>>>>>>> something like the previous pattern which makes the IO
>> >> >>>>>>>>>>>>>>> not
>> >> >>>>> self
>> >> >>>>>>>>>>>>>>> sufficient and kind of makes the entry cost and usage
>> >> >>>>>>>>>>>>>>> of
>> >> >>>> beam
>> >> >>>>>> way
>> >> >>>>>>>>>>>>>>> further.
>> >> >>>>>>>>>>>>>>>
>> >> >>>>>>>>>>>>>>> In my mind it is exactly what jbatch/spring-batch uses
>> >> >>>>>>>>>>>>>>> but
>> >> >>>>>>> adapted
>> >> >>>>>>>>> to
>> >> >>>>>>>>>>>>>>> beam (stream in particular) case.
>> >> >>>>>>>>>>>>>>>
>> >> >>>>>>>>>>>>>>> Romain Manni-Bucau
>> >> >>>>>>>>>>>>>>> @rmannibucau |  Blog | Old Blog | Github | LinkedIn
>> >> >>>>>>>>>>>>>>>
>> >> >>>>>>>>>>>>>>>
>> >> >>>>>>>>>>>>>>> 2017-11-17 6:49 GMT+01:00 Reuven Lax
>> >> >>>>> <re...@google.com.invalid
>> >> >>>>>>> :
>> >> >>>>>>>>>>>>>>>>
>> >> >>>>>>>>>>>>>>>> Romain,
>> >> >>>>>>>>>>>>>>>>
>> >> >>>>>>>>>>>>>>>> Can you define what you mean by checkpoint? What are
>> >> >>>>>>>>>>>>>>>> the
>> >> >>>>>>> semantics,
>> >> >>>>>>>>>>>>
>> >> >>>>>>>>>>>> what
>> >> >>>>>>>>>>>>>>>>
>> >> >>>>>>>>>>>>>>>> does it accomplish?
>> >> >>>>>>>>>>>>>>>>
>> >> >>>>>>>>>>>>>>>> Reuven
>> >> >>>>>>>>>>>>>>>>
>> >> >>>>>>>>>>>>>>>> On Fri, Nov 17, 2017 at 1:40 PM, Romain Manni-Bucau <
>> >> >>>>>>>>>>>>>>
>> >> >>>>>>>>>>>>>> rmannibu...@gmail.com>
>> >> >>>>>>>>>>>>>>>>
>> >> >>>>>>>>>>>>>>>> wrote:
>> >> >>>>>>>>>>>>>>>>
>> >> >>>>>>>>>>>>>>>>> Yes, what I propose earlier was:
>> >> >>>>>>>>>>>>>>>>>
>> >> >>>>>>>>>>>>>>>>> I. checkpoint marker:
>> >> >>>>>>>>>>>>>>>>>
>> >> >>>>>>>>>>>>>>>>> @AnyBeamAnnotation
>> >> >>>>>>>>>>>>>>>>> @CheckpointAfter
>> >> >>>>>>>>>>>>>>>>> public void someHook(SomeContext ctx);
>> >> >>>>>>>>>>>>>>>>>
>> >> >>>>>>>>>>>>>>>>>
>> >> >>>>>>>>>>>>>>>>> II. pipeline.apply(ParDo.of(new
>> >> >>>>>>>>> MyFn()).withCheckpointAlgorithm(new
>> >> >>>>>>>>>>>>>>>>> CountingAlgo()))
>> >> >>>>>>>>>>>>>>>>>
>> >> >>>>>>>>>>>>>>>>> III. (I like this one less)
>> >> >>>>>>>>>>>>>>>>>
>> >> >>>>>>>>>>>>>>>>> // in the dofn
>> >> >>>>>>>>>>>>>>>>> @CheckpointTester
>> >> >>>>>>>>>>>>>>>>> public boolean shouldCheckpoint();
>> >> >>>>>>>>>>>>>>>>>
>> >> >>>>>>>>>>>>>>>>> IV. @Checkpointer Serializable getCheckpoint(); in
>> >> >>>>>>>>>>>>>>>>> the
>> >> >>>> dofn
>> >> >>>>>> per
>> >> >>>>>>>>>>>>
>> >> >>>>>>>>>>>> element
>> >> >>>>>>>>>>>>>>>>>
>> >> >>>>>>>>>>>>>>>>>
>> >> >>>>>>>>>>>>>>>>>
>> >> >>>>>>>>>>>>>>>>>
>> >> >>>>>>>>>>>>>>>>>
>> >> >>>>>>>>>>>>>>>>> Romain Manni-Bucau
>> >> >>>>>>>>>>>>>>>>> @rmannibucau |  Blog | Old Blog | Github | LinkedIn
>> >> >>>>>>>>>>>>>>>>>
>> >> >>>>>>>>>>>>>>>>>
>> >> >>>>>>>>>>>>>>>>> 2017-11-17 6:06 GMT+01:00 Raghu Angadi
>> >> >>>>>>> <rang...@google.com.invalid
>> >> >>>>>>>>>>>>>
>> >> >>>>>>>>>>>>> :
>> >> >>>>>>>>>>>>>>>>>>
>> >> >>>>>>>>>>>>>>>>>> How would you define it (rough API is fine)?.
>> >> >>>>>>>>>>>>>>>>>> Without
>> >> >>>> more
>> >> >>>>>>>>> details,
>> >> >>>>>>>>>>>>>>
>> >> >>>>>>>>>>>>>> it is
>> >> >>>>>>>>>>>>>>>>>>
>> >> >>>>>>>>>>>>>>>>>> not easy to see wider applicability and feasibility
>> >> >>>>>>>>>>>>>>>>>> in
>> >> >>>>>>> runners.
>> >> >>>>>>>>>>>>>>>>>>
>> >> >>>>>>>>>>>>>>>>>> On Thu, Nov 16, 2017 at 1:13 PM, Romain Manni-Bucau
>> >> >>>>>>>>>>>>>>>>>> <
>> >> >>>>>>>>>>>>>>>>>
>> >> >>>>>>>>>>>>>>>>> rmannibu...@gmail.com>
>> >> >>>>>>>>>>>>>>>>>>
>> >> >>>>>>>>>>>>>>>>>> wrote:
>> >> >>>>>>>>>>>>>>>>>>
>> >> >>>>>>>>>>>>>>>>>>> This is a fair summary of the current state but
>> >> >>>>>>>>>>>>>>>>>>> also
>> >> >>>>> where
>> >> >>>>>>> beam
>> >> >>>>>>>>>>>>
>> >> >>>>>>>>>>>> can
>> >> >>>>>>>>>>>>>>>>>
>> >> >>>>>>>>>>>>>>>>> have a
>> >> >>>>>>>>>>>>>>>>>>>
>> >> >>>>>>>>>>>>>>>>>>> very strong added value and make big data great and
>> >> >>>>> smooth.
>> >> >>>>>>>>>>>>>>>>>>>
>> >> >>>>>>>>>>>>>>>>>>> Instead of this replay feature isnt checkpointing
>> >> >>>>> willable?
>> >> >>>>>>> In
>> >> >>>>>>>>>>>>>>>>>
>> >> >>>>>>>>>>>>>>>>> particular
>> >> >>>>>>>>>>>>>>>>>>>
>> >> >>>>>>>>>>>>>>>>>>> with SDF no?
>> >> >>>>>>>>>>>>>>>>>>>
>> >> >>>>>>>>>>>>>>>>>>>
>> >> >>>>>>>>>>>>>>>>>>> Le 16 nov. 2017 19:50, "Raghu Angadi"
>> >> >>>>>>>>> <rang...@google.com.invalid>
>> >> >>>>>>>>>>>>
>> >> >>>>>>>>>>>> a
>> >> >>>>>>>>>>>>>>>>>>>
>> >> >>>>>>>>>>>>>>>>>>> écrit :
>> >> >>>>>>>>>>>>>>>>>>>
>> >> >>>>>>>>>>>>>>>>>>>> Core issue here is that there is no explicit
>> >> >>>>>>>>>>>>>>>>>>>> concept
>> >> >>>> of
>> >> >>>>>>>>>>>>
>> >> >>>>>>>>>>>> 'checkpoint'
>> >> >>>>>>>>>>>>>>>>>
>> >> >>>>>>>>>>>>>>>>> in
>> >> >>>>>>>>>>>>>>>>>>>>
>> >> >>>>>>>>>>>>>>>>>>>> Beam (UnboundedSource has a method
>> >> >>>>>>>>>>>>>>>>>>>> 'getCheckpointMark'
>> >> >>>>> but
>> >> >>>>>>> that
>> >> >>>>>>>>>>>>>>>>>
>> >> >>>>>>>>>>>>>>>>> refers to
>> >> >>>>>>>>>>>>>>>>>>>>
>> >> >>>>>>>>>>>>>>>>>>>> the checkoint on external source). Runners do
>> >> >>>> checkpoint
>> >> >>>>>>>>>>>>
>> >> >>>>>>>>>>>> internally
>> >> >>>>>>>>>>>>>>
>> >> >>>>>>>>>>>>>> as
>> >> >>>>>>>>>>>>>>>>>>>>
>> >> >>>>>>>>>>>>>>>>>>>> implementation detail. Flink's checkpoint model is
>> >> >>>>>> entirely
>> >> >>>>>>>>>>>>>>
>> >> >>>>>>>>>>>>>> different
>> >> >>>>>>>>>>>>>>>>>>>
>> >> >>>>>>>>>>>>>>>>>>> from
>> >> >>>>>>>>>>>>>>>>>>>>
>> >> >>>>>>>>>>>>>>>>>>>> Dataflow's and Spark's.
>> >> >>>>>>>>>>>>>>>>>>>>
>> >> >>>>>>>>>>>>>>>>>>>> @StableReplay helps, but it does not explicitly
>> >> >>>>>>>>>>>>>>>>>>>> talk
>> >> >>>>> about
>> >> >>>>>> a
>> >> >>>>>>>>>>>>>>>>>
>> >> >>>>>>>>>>>>>>>>> checkpoint
>> >> >>>>>>>>>>>>>>>>>>>
>> >> >>>>>>>>>>>>>>>>>>> by
>> >> >>>>>>>>>>>>>>>>>>>>
>> >> >>>>>>>>>>>>>>>>>>>> design.
>> >> >>>>>>>>>>>>>>>>>>>>
>> >> >>>>>>>>>>>>>>>>>>>> If you are looking to achieve some guarantees with
>> >> >>>>>>>>>>>>>>>>>>>> a
>> >> >>>>>>>>> sink/DoFn, I
>> >> >>>>>>>>>>>>>>>>>
>> >> >>>>>>>>>>>>>>>>> think
>> >> >>>>>>>>>>>>>>>>>>>
>> >> >>>>>>>>>>>>>>>>>>> it
>> >> >>>>>>>>>>>>>>>>>>>>
>> >> >>>>>>>>>>>>>>>>>>>> is better to start with the requirements. I worked
>> >> >>>>>>>>>>>>>>>>>>>> on
>> >> >>>>>>>>>>>>
>> >> >>>>>>>>>>>> exactly-once
>> >> >>>>>>>>>>>>>>>>>
>> >> >>>>>>>>>>>>>>>>> sink
>> >> >>>>>>>>>>>>>>>>>>>
>> >> >>>>>>>>>>>>>>>>>>> for
>> >> >>>>>>>>>>>>>>>>>>>>
>> >> >>>>>>>>>>>>>>>>>>>> Kafka (see KafkaIO.write().withEOS()), where we
>> >> >>>>>> essentially
>> >> >>>>>>>>>>>>
>> >> >>>>>>>>>>>> reshard
>> >> >>>>>>>>>>>>>>>>>
>> >> >>>>>>>>>>>>>>>>> the
>> >> >>>>>>>>>>>>>>>>>>>>
>> >> >>>>>>>>>>>>>>>>>>>> elements and assign sequence numbers to elements
>> >> >>>>>>>>>>>>>>>>>>>> with
>> >> >>>> in
>> >> >>>>>>> each
>> >> >>>>>>>>>>>>
>> >> >>>>>>>>>>>> shard.
>> >> >>>>>>>>>>>>>>>>>>>>
>> >> >>>>>>>>>>>>>>>>>>>> Duplicates in replays are avoided based on these
>> >> >>>>> sequence
>> >> >>>>>>>>>>>>
>> >> >>>>>>>>>>>> numbers.
>> >> >>>>>>>>>>>>>>>>>
>> >> >>>>>>>>>>>>>>>>> DoFn
>> >> >>>>>>>>>>>>>>>>>>>>
>> >> >>>>>>>>>>>>>>>>>>>> state API is used to buffer out-of order replays.
>> >> >>>>>>>>>>>>>>>>>>>> The
>> >> >>>>>>>>>>>>
>> >> >>>>>>>>>>>> implementation
>> >> >>>>>>>>>>>>>>>>>>>>
>> >> >>>>>>>>>>>>>>>>>>>> strategy works in Dataflow but not in Flink which
>> >> >>>>>>>>>>>>>>>>>>>> has
>> >> >>>> a
>> >> >>>>>>>>>>>>
>> >> >>>>>>>>>>>> horizontal
>> >> >>>>>>>>>>>>>>>>>>>>
>> >> >>>>>>>>>>>>>>>>>>>> checkpoint. KafkaIO checks for compatibility.
>> >> >>>>>>>>>>>>>>>>>>>>
>> >> >>>>>>>>>>>>>>>>>>>> On Wed, Nov 15, 2017 at 12:38 AM, Romain
>> >> >>>>>>>>>>>>>>>>>>>> Manni-Bucau
>> >> >>>>>>>>>>>>>>>>>>>> <
>> >> >>>>>>>>>>>>>>>>>>>> rmannibu...@gmail.com>
>> >>
>> >> >>>>>>>>>>>>>>>>>>>> wrote:
>> >> >>>>>>>>>>>>>>>>>>>>
>> >> >>>>>>>>>>>>>>>>>>>>> Hi guys,
>> >> >>>>>>>>>>>>>>>>>>>>>
>> >> >>>>>>>>>>>>>>>>>>>>> The subject is a bit provocative but the topic is
>> >> >>>> real
>> >> >>>>>> and
>> >> >>>>>>>>>>>>
>> >> >>>>>>>>>>>> coming
>> >> >>>>>>>>>>>>>>>>>>>>>
>> >> >>>>>>>>>>>>>>>>>>>>> again and again with the beam usage: how a dofn
>> >> >>>>>>>>>>>>>>>>>>>>> can
>> >> >>>>>> handle
>> >> >>>>>>>>> some
>> >> >>>>>>>>>>>>>>>>>>>>> "chunking".
>> >> >>>>>>>>>>>>>>>>>>>>>
>> >> >>>>>>>>>>>>>>>>>>>>> The need is to be able to commit each N records
>> >> >>>>>>>>>>>>>>>>>>>>> but
>> >> >>>>> with
>> >> >>>>>> N
>> >> >>>>>>> not
>> >> >>>>>>>>>>>>
>> >> >>>>>>>>>>>> too
>> >> >>>>>>>>>>>>>>>>>
>> >> >>>>>>>>>>>>>>>>> big.
>> >> >>>>>>>>>>>>>>>>>>>>>
>> >> >>>>>>>>>>>>>>>>>>>>>
>> >> >>>>>>>>>>>>>>>>>>>>> The natural API for that in beam is the bundle
>> >> >>>>>>>>>>>>>>>>>>>>> one
>> >> >>>> but
>> >> >>>>>>> bundles
>> >> >>>>>>>>>>>>
>> >> >>>>>>>>>>>> are
>> >> >>>>>>>>>>>>>>>>>
>> >> >>>>>>>>>>>>>>>>> not
>> >> >>>>>>>>>>>>>>>>>>>>>
>> >> >>>>>>>>>>>>>>>>>>>>> reliable since they can be very small (flink) -
>> >> >>>>>>>>>>>>>>>>>>>>> we
>> >> >>>> can
>> >> >>>>>> say
>> >> >>>>>>> it
>> >> >>>>>>>>> is
>> >> >>>>>>>>>>>>>>>>>
>> >> >>>>>>>>>>>>>>>>> "ok"
>> >> >>>>>>>>>>>>>>>>>>>>>
>> >> >>>>>>>>>>>>>>>>>>>>> even if it has some perf impacts - or too big
>> >> >>>>>>>>>>>>>>>>>>>>> (spark
>> >> >>>>> does
>> >> >>>>>>> full
>> >> >>>>>>>>>>>>
>> >> >>>>>>>>>>>> size
>> >> >>>>>>>>>>>>>>>>>
>> >> >>>>>>>>>>>>>>>>> /
>> >> >>>>>>>>>>>>>>>>>>>>>
>> >> >>>>>>>>>>>>>>>>>>>>> #workers).
>> >> >>>>>>>>>>>>>>>>>>>>>
>> >> >>>>>>>>>>>>>>>>>>>>> The workaround is what we see in the ES I/O: a
>> >> >>>> maxSize
>> >> >>>>>>> which
>> >> >>>>>>>>>>>>
>> >> >>>>>>>>>>>> does
>> >> >>>>>>>>>>>>>>
>> >> >>>>>>>>>>>>>> an
>> >> >>>>>>>>>>>>>>>>>>>>>
>> >> >>>>>>>>>>>>>>>>>>>>> eager flush. The issue is that then the
>> >> >>>>>>>>>>>>>>>>>>>>> checkpoint
>> >> >>>>>>>>>>>>>>>>>>>>> is
>> >> >>>>> not
>> >> >>>>>>>>>>>>
>> >> >>>>>>>>>>>> respected
>> >> >>>>>>>>>>>>>>>>>>>>>
>> >> >>>>>>>>>>>>>>>>>>>>> and you can process multiple times the same
>> >> >>>>>>>>>>>>>>>>>>>>> records.
>> >> >>>>>>>>>>>>>>>>>>>>>
>> >> >>>>>>>>>>>>>>>>>>>>> Any plan to make this API reliable and
>> >> >>>>>>>>>>>>>>>>>>>>> controllable
>> >> >>>>> from
>> >> >>>>>> a
>> >> >>>>>>>>> beam
>> >> >>>>>>>>>>>>>>>>>
>> >> >>>>>>>>>>>>>>>>> point
>> >> >>>>>>>>>>>>>>>>>>>>>
>> >> >>>>>>>>>>>>>>>>>>>>> of view (at least in a max manner)?
>> >> >>>>>>>>>>>>>>>>>>>>>
>> >> >>>>>>>>>>>>>>>>>>>>> Thanks,
>> >> >>>>>>>>>>>>>>>>>>>>> Romain Manni-Bucau
>> >> >>>>>>>>>>>>>>>>>>>>> @rmannibucau |  Blog | Old Blog | Github |
>> >> >>>>>>>>>>>>>>>>>>>>> LinkedIn
>> >> >>>>>>>>>>>>>>>>>>>>>
>> >> >>>>>>>>>>>>>>>>>>>>
>> >> >>>>>>>>>>>>>>>>>>>
>> >> >>>>>>>>>>>>>>>>>
>> >> >>>>>>>>>>>>>>
>> >> >>>>>>>>>>>>>> --
>> >> >>>>>>>>>>>>>> Jean-Baptiste Onofré
>> >> >>>>>>>>>>>>>> jbono...@apache.org
>> >> >>>>>>>>>>>>>> http://blog.nanthrax.net
>> >> >>>>>>>>>>>>>> Talend - http://www.talend.com
>> >> >>>>>>>>>>>>>>
>> >> >>>>>>>>>>>>
>> >> >>>>>>>>>>>
>> >> >>>>>>>>>>
>> >> >>>>>>>>>> --
>> >> >>>>>>>>>> Jean-Baptiste Onofré
>> >> >>>>>>>>>> jbono...@apache.org
>> >> >>>>>>>>>> http://blog.nanthrax.net
>> >> >>>>>>>>>> Talend - http://www.talend.com
>> >> >>>>>>>>>
>> >> >>>>>>>
>> >> >>>>>>
>> >> >>>>>
>> >> >>>>
>> >>
>> >> --
>> >> Jean-Baptiste Onofré
>> >> jbono...@apache.org
>> >> http://blog.nanthrax.net
>> >> Talend - http://www.talend.com
>>
>

Reply via email to