2017-11-30 20:36 GMT+01:00 Kenneth Knowles <k...@google.com>:
> On Thu, Nov 30, 2017 at 11:28 AM, Romain Manni-Bucau <rmannibu...@gmail.com>
> wrote:
>>
>> This is my short term concern yes. Note that the opposite is not sane
>> neither (too big) cause it forces eager flushes in all IO (so instead
>> of fixing it once in a single code location you impact everyone N
>> times). However it is not blocking as the small bundle size issue.
>
>
> So which runner is raising your concern? I guess the FlinkRunner? IIRC the
> SparkRunner in streaming has bundles governed by microbatching and in batch
> has large bundles like you would expect.

Only checked flink and spark so yes from my side but inputs on other
runners would be great.

Symmetrically and for the simplicity of writing IO, would having a
BaseFlushingWriter help to automatically handle this "if currentsize >
maxflushsize then flush()" logic?

>
> Kenn
>
>> Next concenr is commit handling/idempotence but this is more
>> complicated so can be worth another thread once this one is fixed
>> which can end up on being specific each time :(.
>>
>>
>>
>> Romain Manni-Bucau
>> @rmannibucau |  Blog | Old Blog | Github | LinkedIn
>>
>>
>> 2017-11-30 20:21 GMT+01:00 Eugene Kirpichov <kirpic...@google.com>:
>> > I mean: if these runners have some limitation that forces them into only
>> > supporting tiny bundles, there's a good chance that this limitation will
>> > also apply to whatever beam model API you propose as a fix, and they
>> > won't
>> > be able to implement it.
>> >
>> >
>> > On Thu, Nov 30, 2017, 11:19 AM Eugene Kirpichov <kirpic...@google.com>
>> > wrote:
>> >>
>> >> So is your main concern potential poor performance on runners that
>> >> choose
>> >> to use a very small bundle size? (Currently an IO can trivially handle
>> >> too
>> >> large bundles, simply by flushing when enough data accumulates, which
>> >> is
>> >> what all IOs do - but indeed working around having unreasonably small
>> >> bundles is much harder)
>> >>
>> >> If so, I think, rather than making a model change, we should understand
>> >> why those runners are choosing such a small bundle size, and
>> >> potentially fix
>> >> them.
>> >>
>> >>
>> >> On Thu, Nov 30, 2017, 11:01 AM Romain Manni-Bucau
>> >> <rmannibu...@gmail.com>
>> >> wrote:
>> >>>
>> >>>
>> >>>
>> >>> Le 30 nov. 2017 19:23, "Kenneth Knowles" <k...@google.com> a écrit :
>> >>>
>> >>> On Thu, Nov 30, 2017 at 10:03 AM, Romain Manni-Bucau
>> >>> <rmannibu...@gmail.com> wrote:
>> >>>>
>> >>>> Hmm,
>> >>>>
>> >>>> ESIO:
>> >>>>
>> >>>> https://github.com/apache/beam/blob/master/sdks/java/io/elasticsearch/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.java#L847
>> >>>> JDBCIO:
>> >>>>
>> >>>> https://github.com/apache/beam/blob/master/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcIO.java#L592
>> >>>> MongoIO:
>> >>>>
>> >>>> https://github.com/apache/beam/blob/master/sdks/java/io/mongodb/src/main/java/org/apache/beam/sdk/io/mongodb/MongoDbIO.java#L657
>> >>>> etc...
>> >>>>
>> >>>> They all use the same pattern.
>> >>>
>> >>>
>> >>> This is actually correct - if you have some triggers set up to yield
>> >>> some
>> >>> low latency, then things need to actually be flushed here. If the
>> >>> runner
>> >>> provides a one-element bundle it could be because data volume has
>> >>> dipped. In
>> >>> this case, you pay per element instead of getting good amortization,
>> >>> but
>> >>> since data volume is low this is not so bad and anyhow the only way to
>> >>> yield
>> >>> the desired latency.
>> >>>
>> >>> Romain - just to echo some others, did you have a particular
>> >>> combination
>> >>> of runner + IO that you wanted to target for improvement? That would
>> >>> focus
>> >>> the discussion and we could think about what to change in the runner
>> >>> or IO
>> >>> or discover an issue that they cannot solve.
>> >>>
>> >>>
>> >>> I want to ensure EsIO will never do a flush of 1 element on any runner
>> >>> without a timertrigger - assuming data volume is continuous and with a
>> >>> size
>> >>> > 1.
>> >>>
>> >>> Really rephrased, my concern is that bundle which is a great
>> >>> infra/environment feedback is today owned by beam code which defeats
>> >>> that
>> >>> great purpose since beam doesnt use the impl for that. This notion
>> >>> should be
>> >>> unified (size + timeout are often the defaults to trigger an "end" and
>> >>> would
>> >>> work) accross runners or it should be hidden from the transform
>> >>> developers
>> >>> IMHO.
>> >>>
>> >>> Note the low latency point is not linked to bundle size but, as you
>> >>> mentionned, triggers (timeout or event based) which means both worlds
>> >>> can
>> >>> work together in harmony and outcome to a valid bundle api (without
>> >>> any
>> >>> change, yeah) and exposure to the user.
>> >>>
>> >>>
>> >>>
>> >>> Kenn
>> >>>
>> >>>
>> >>>>
>> >>>>
>> >>>> From what you wrote - and technically I agree but in current state my
>> >>>> point is valid I think - you should drop bundle from the whole user
>> >>>> API and make it all @Internal, no?
>> >>>>
>> >>>>
>> >>>> Romain Manni-Bucau
>> >>>> @rmannibucau |  Blog | Old Blog | Github | LinkedIn
>> >>>>
>> >>>>
>> >>>> 2017-11-30 18:58 GMT+01:00 Jean-Baptiste Onofré <j...@nanthrax.net>:
>> >>>> > Agree, but maybe we can inform the runner if wanted no ?
>> >>>> >
>> >>>> > Honestly, from my side, I'm fine with the current situation as it's
>> >>>> > runner
>> >>>> > specific.
>> >>>> >
>> >>>> > Regards
>> >>>> > JB
>> >>>> >
>> >>>> > On 11/30/2017 06:12 PM, Reuven Lax wrote:
>> >>>> >>
>> >>>> >> I don't think it belongs in PIpelineOptions, as bundle size is
>> >>>> >> always
>> >>>> >> a
>> >>>> >> runner thing.
>> >>>> >>
>> >>>> >> We could consider adding a new generic RunnerOptions, however I'm
>> >>>> >> not
>> >>>> >> convinced all runners can actually support this.
>> >>>> >>
>> >>>> >> On Thu, Nov 30, 2017 at 6:09 AM, Romain Manni-Bucau
>> >>>> >> <rmannibu...@gmail.com
>> >>>> >> <mailto:rmannibu...@gmail.com>> wrote:
>> >>>> >>
>> >>>> >>     Guys,
>> >>>> >>
>> >>>> >>     what about moving getMaxBundleSize from flink options to
>> >>>> >> pipeline
>> >>>> >>     options. I think all runners can support it right? Spark code
>> >>>> >> needs
>> >>>> >>     the merge of the v2 before being able to be implemented
>> >>>> >> probably
>> >>>> >> but I
>> >>>> >>     don't see any blocker.
>> >>>> >>
>> >>>> >>     wdyt?
>> >>>> >>
>> >>>> >>     Romain Manni-Bucau
>> >>>> >>     @rmannibucau |  Blog | Old Blog | Github | LinkedIn
>> >>>> >>
>> >>>> >>
>> >>>> >>     2017-11-19 8:19 GMT+01:00 Romain Manni-Bucau
>> >>>> >> <rmannibu...@gmail.com
>> >>>> >>     <mailto:rmannibu...@gmail.com>>:
>> >>>> >>
>> >>>> >>      > @Eugene: "workaround" as specific to the IO each time and
>> >>>> >> therefore
>> >>>> >>      > still highlight a lack in the core.
>> >>>> >>      >
>> >>>> >>      > Other comments inline
>> >>>> >>      >
>> >>>> >>      >
>> >>>> >>      > 2017-11-19 7:40 GMT+01:00 Robert Bradshaw
>> >>>> >> <rober...@google.com.invalid>:
>> >>>> >>      >> There is a possible fourth issue that we don't handle
>> >>>> >> well:
>> >>>> >> efficiency. For
>> >>>> >>      >> very large bundles, it may be advantageous to avoid
>> >>>> >> replaying
>> >>>> >> a
>> >>>> >> bunch of
>> >>>> >>      >> idempotent operations if there were a way to record what
>> >>>> >> ones
>> >>>> >> we're sure
>> >>>> >>      >> went through. Not sure if that's the issue here (though
>> >>>> >> one
>> >>>> >> could
>> >>>> >> possibly
>> >>>> >>      >> do this with SDFs, one can preemptively returning
>> >>>> >> periodically
>> >>>> >> before an
>> >>>> >>      >> element (or portion thereof) is done).
>> >>>> >>      >
>> >>>> >>      > +1, also lead to the IO handling its own chunking/bundles
>> >>>> >> and
>> >>>> >>      > therefore solves all issues at once.
>> >>>> >>      >
>> >>>> >>      >>
>> >>>> >>      >> On Sat, Nov 18, 2017 at 6:58 PM, Eugene Kirpichov <
>> >>>> >>      >> kirpic...@google.com.invalid> wrote:
>> >>>> >>      >>
>> >>>> >>      >>> I disagree that the usage of document id in ES is a
>> >>>> >> "workaround"
>> >>>> >> - it does
>> >>>> >>      >>> not address any *accidental *complexity
>> >>>> >>      >>> <https://en.wikipedia.org/wiki/No_Silver_Bullet
>> >>>> >>     <https://en.wikipedia.org/wiki/No_Silver_Bullet>> coming from
>> >>>> >> shortcomings
>> >>>> >>      >>> of Beam, it addresses the *essential* complexity that a
>> >>>> >> distributed system
>> >>>> >>      >>> forces one to take it as a fact of nature that the same
>> >>>> >> write
>> >>>> >>      >>> (mutation) will happen multiple times, so if you want a
>> >>>> >> mutation
>> >>>> >> to happen
>> >>>> >>      >>> "as-if" it happened exactly once, the mutation itself
>> >>>> >> must
>> >>>> >> be
>> >>>> >> idempotent
>> >>>> >>      >>> <https://en.wikipedia.org/wiki/Idempotence
>> >>>> >>     <https://en.wikipedia.org/wiki/Idempotence>>. Insert-with-id
>> >>>> >> (upsert
>> >>>> >>      >>> <https://en.wikipedia.org/wiki/Merge_(SQL)
>> >>>> >>     <https://en.wikipedia.org/wiki/Merge_(SQL)>>) is a classic
>> >>>> >> example of
>> >>>> >> an
>> >>>> >>      >>> idempotent mutation, and it's very good that
>> >>>> >> Elasticsearch
>> >>>> >> provides it - if
>> >>>> >>      >>> it didn't, no matter how good of an API Beam had,
>> >>>> >> achieving
>> >>>> >> exactly-once
>> >>>> >>      >>> writes would be theoretically impossible. Are we in
>> >>>> >> agreement on
>> >>>> >> this so
>> >>>> >>      >>> far?
>> >>>> >>      >>>
>> >>>> >>      >>> Next: you seem to be discussing 3 issues together, all of
>> >>>> >> which
>> >>>> >> are valid
>> >>>> >>      >>> issues, but they seem unrelated to me:
>> >>>> >>      >>> 1. Exactly-once mutation
>> >>>> >>      >>> 2. Batching multiple mutations into one RPC.
>> >>>> >>      >>> 3. Backpressure
>> >>>> >>      >>>
>> >>>> >>      >>> #1: was considered above. The system the IO is talking to
>> >>>> >> has to
>> >>>> >> support
>> >>>> >>      >>> idempotent mutations, in an IO-specific way, and the IO
>> >>>> >> has
>> >>>> >> to
>> >>>> >> take
>> >>>> >>      >>> advantage of them, in the IO-specific way - end of story.
>> >>>> >>      >
>> >>>> >>      > Agree but don't forget the original point was about
>> >>>> >> "chunks"
>> >>>> >> and
>> >>>> >> not
>> >>>> >>      > individual records.
>> >>>> >>      >
>> >>>> >>      >>>
>> >>>> >>      >>> #2: a batch of idempotent operations is also idempotent,
>> >>>> >> so
>> >>>> >> this
>> >>>> >> doesn't
>> >>>> >>      >>> add anything new semantically. Syntactically - Beam
>> >>>> >> already
>> >>>> >> allows you to
>> >>>> >>      >>> write your own batching by notifying you of permitted
>> >>>> >> batch
>> >>>> >> boundaries
>> >>>> >>      >>> (Start/FinishBundle). Sure, it could do more, but from my
>> >>>> >> experience the
>> >>>> >>      >>> batching in IOs I've seen is one of the easiest and least
>> >>>> >> error-prone
>> >>>> >>      >>> parts, so I don't see something worth an extended
>> >>>> >> discussion
>> >>>> >> here.
>> >>>> >>      >
>> >>>> >>      > "Beam already allows you to
>> >>>> >>      >  write your own batching by notifying you of permitted
>> >>>> >> batch
>> >>>> >> boundaries
>> >>>> >>      >  (Start/FinishBundle)"
>> >>>> >>      >
>> >>>> >>      > Is wrong since the bundle is potentially the whole
>> >>>> >> PCollection
>> >>>> >> (spark)
>> >>>> >>      > so this is not even an option until you use the SDF (back
>> >>>> >> to
>> >>>> >> the
>> >>>> >> same
>> >>>> >>      > point).
>> >>>> >>      > Once again the API looks fine but no implementation makes
>> >>>> >> it
>> >>>> >> true.
>> >>>> >> It
>> >>>> >>      > would be easy to change it in spark, flink can be ok since
>> >>>> >> it
>> >>>> >> targets
>> >>>> >>      > more the streaming case, not sure of others, any idea?
>> >>>> >>      >
>> >>>> >>      >
>> >>>> >>      >>>
>> >>>> >>      >>> #3: handling backpressure is a complex problem with
>> >>>> >> multiple
>> >>>> >> facets: 1) how
>> >>>> >>      >>> do you know you're being throttled, and by how much are
>> >>>> >> you
>> >>>> >> exceeding the
>> >>>> >>      >>> external system's capacity?
>> >>>> >>      >
>> >>>> >>      > This is the whole point of backpressure, the system sends
>> >>>> >> it
>> >>>> >> back
>> >>>> >> to
>> >>>> >>      > you (header like or status technic in general)
>> >>>> >>      >
>> >>>> >>      >>> 2) how do you communicate this signal to the
>> >>>> >>      >>> runner?
>> >>>> >>      >
>> >>>> >>      > You are a client so you get the meta in the response -
>> >>>> >> whatever
>> >>>> >> techno.
>> >>>> >>      >
>> >>>> >>      >>> 3) what does the runner do in response?
>> >>>> >>      >
>> >>>> >>      > Runner nothing but the IO adapts its handling as mentionned
>> >>>> >> before
>> >>>> >>      > (wait and retry, skip, ... depending the config)
>> >>>> >>      >
>> >>>> >>      >>> 4) how do you wait until
>> >>>> >>      >>> it's ok to try again?
>> >>>> >>      >
>> >>>> >>      > This is one point to probably enhance in beam but waiting
>> >>>> >> in
>> >>>> >> the
>> >>>> >>      > processing is an option if the source has some buffering
>> >>>> >> otherwise
>> >>>> >> it
>> >>>> >>      > requires to have a buffer fallback and max size if the wait
>> >>>> >> mode is
>> >>>> >>      > activated.
>> >>>> >>      >
>> >>>> >>      >>> You seem to be advocating for solving one facet of this
>> >>>> >> problem,
>> >>>> >> which is:
>> >>>> >>      >>> you want it to be possible to signal to the runner "I'm
>> >>>> >> being
>> >>>> >> throttled,
>> >>>> >>      >>> please end the bundle", right? If so - I think this
>> >>>> >> (ending
>> >>>> >> the
>> >>>> >> bundle) is
>> >>>> >>      >>> unnecessary: the DoFn can simply do an exponential
>> >>>> >> back-off
>> >>>> >> sleep
>> >>>> >> loop.
>> >>>> >>      >
>> >>>> >>      > Agree, never said the runner should know but GBK+output
>> >>>> >> doesnt
>> >>>> >> work
>> >>>> >>      > cause you dont own the GBK.
>> >>>> >>      >
>> >>>> >>      >>> This is e.g. what DatastoreIO does
>> >>>> >>      >>> <https://github.com/apache/beam/blob/master/sdks/java/io/
>> >>>> >>     <https://github.com/apache/beam/blob/master/sdks/java/io/>
>> >>>> >>      >>> google-cloud-platform/src/main/java/org/apache/beam/sdk/
>> >>>> >>      >>> io/gcp/datastore/DatastoreV1.java#L1318>
>> >>>> >>      >>> and
>> >>>> >>      >>> this is in general how most systems I've seen handle
>> >>>> >> backpressure. Is there
>> >>>> >>      >>> something I'm missing? In particular, is there any
>> >>>> >> compelling
>> >>>> >> reason why
>> >>>> >>      >>> you think it'd be beneficial e.g. for DatastoreIO to
>> >>>> >> commit
>> >>>> >> the
>> >>>> >> results of
>> >>>> >>      >>> the bundle so far before processing other elements?
>> >>>> >>      >
>> >>>> >>      > It was more about ensuring you validate early a subset of
>> >>>> >> the
>> >>>> >> whole
>> >>>> >>      > bundle and avoid to reprocess it if it fails later.
>> >>>> >>      >
>> >>>> >>      >
>> >>>> >>      > So to summarize I see 2 outcomes:
>> >>>> >>      >
>> >>>> >>      > 1. impl SDF in all runners
>> >>>> >>      > 2. make the bundle size upper bounded - through a pipeline
>> >>>> >> option -
>> >>>> >> in
>> >>>> >>      > all runners, not sure this one is doable everywhere since I
>> >>>> >> mainly
>> >>>> >>      > checked spark case
>> >>>> >>      >
>> >>>> >>      >>>
>> >>>> >>      >>> Again, it might be that I'm still misunderstanding what
>> >>>> >> you're
>> >>>> >> trying to
>> >>>> >>      >>> say. One of the things it would help to clarify would be
>> >>>> >> -
>> >>>> >> exactly what do
>> >>>> >>      >>> you mean by "how batch frameworks solved that for years":
>> >>>> >> can you
>> >>>> >> point at
>> >>>> >>      >>> an existing API in some other framework that achieves
>> >>>> >> what
>> >>>> >> you
>> >>>> >> want?
>> >>>> >>      >>>
>> >>>> >>      >>> On Sat, Nov 18, 2017 at 1:02 PM Romain Manni-Bucau
>> >>>> >>     <rmannibu...@gmail.com <mailto:rmannibu...@gmail.com>>
>> >>>> >>
>> >>>> >>      >>> wrote:
>> >>>> >>      >>>
>> >>>> >>      >>> > Eugene, point - and issue with a single sample - is you
>> >>>> >> can
>> >>>> >> always find
>> >>>> >>      >>> > *workarounds* on a case by case basis as the id one
>> >>>> >> with
>> >>>> >> ES but
>> >>>> >> beam
>> >>>> >>      >>> doesnt
>> >>>> >>      >>> > solve the problem as a framework.
>> >>>> >>      >>> >
>> >>>> >>      >>> > From my past, I clearly dont see how batch frameworks
>> >>>> >> solved
>> >>>> >> that for
>> >>>> >>      >>> years
>> >>>> >>      >>> > and beam is not able to do it - keep in mind it is the
>> >>>> >> same
>> >>>> >> kind of
>> >>>> >>      >>> techno,
>> >>>> >>      >>> > it just uses different sources and bigger clusters so
>> >>>> >> no
>> >>>> >> real
>> >>>> >> reason to
>> >>>> >>      >>> not
>> >>>> >>      >>> > have the same feature quality. The only potential
>> >>>> >> reason i
>> >>>> >> see
>> >>>> >> is there
>> >>>> >>      >>> is
>> >>>> >>      >>> > no tracking of the state into the cluster - e2e. But i
>> >>>> >> dont see
>> >>>> >> why there
>> >>>> >>      >>> > wouldnt be. Do I miss something here?
>> >>>> >>      >>> >
>> >>>> >>      >>> > An example could be: take a github crawler computing
>> >>>> >> stats
>> >>>> >> on
>> >>>> >> the whole
>> >>>> >>      >>> > girhub repos which is based on a rest client as
>> >>>> >> example.
>> >>>> >> You
>> >>>> >> will need to
>> >>>> >>      >>> > handle the rate limit and likely want to "commit" each
>> >>>> >> time you
>> >>>> >> reach a
>> >>>> >>      >>> > rate limit with likely some buffering strategy with a
>> >>>> >> max
>> >>>> >> size
>> >>>> >> before
>> >>>> >>      >>> > really waiting. How do you do it with a GBK independent
>> >>>> >> of
>> >>>> >> your
>> >>>> >> dofn? You
>> >>>> >>      >>> > are not able to compose correctly the fn between them
>> >>>> >> :(.
>> >>>> >>      >>> >
>> >>>> >>      >>> >
>> >>>> >>      >>> > Le 18 nov. 2017 20:48, "Eugene Kirpichov"
>> >>>> >> <kirpic...@google.com.invalid>
>> >>>> >>      >>> a
>> >>>> >>      >>> > écrit :
>> >>>> >>      >>> >
>> >>>> >>      >>> > After giving this thread my best attempt at
>> >>>> >> understanding
>> >>>> >> exactly what is
>> >>>> >>      >>> > the problem and the proposed solution, I'm afraid I
>> >>>> >> still
>> >>>> >> fail
>> >>>> >> to
>> >>>> >>      >>> > understand both. To reiterate, I think the only way to
>> >>>> >> make
>> >>>> >> progress here
>> >>>> >>      >>> > is to be more concrete: (quote) take some IO that you
>> >>>> >> think
>> >>>> >> could be
>> >>>> >>      >>> easier
>> >>>> >>      >>> > to write with your proposed API, give the contents of a
>> >>>> >> hypothetical
>> >>>> >>      >>> > PCollection being written to this IO, give the code of
>> >>>> >> a
>> >>>> >> hypothetical
>> >>>> >>      >>> DoFn
>> >>>> >>      >>> > implementing the write using your API, and explain what
>> >>>> >> you'd
>> >>>> >> expect to
>> >>>> >>      >>> > happen at runtime. I'm going to re-engage in this
>> >>>> >> thread
>> >>>> >> after
>> >>>> >> such an
>> >>>> >>      >>> > example is given.
>> >>>> >>      >>> >
>> >>>> >>      >>> > On Sat, Nov 18, 2017, 5:00 AM Romain Manni-Bucau
>> >>>> >>     <rmannibu...@gmail.com <mailto:rmannibu...@gmail.com>>
>> >>>> >>
>> >>>> >>      >>> > wrote:
>> >>>> >>      >>> >
>> >>>> >>      >>> > > First bundle retry is unusable with dome runners like
>> >>>> >> spark
>> >>>> >> where the
>> >>>> >>      >>> > > bundle size is the collection size / number of work.
>> >>>> >> This
>> >>>> >> means a user
>> >>>> >>      >>> > cant
>> >>>> >>      >>> > > use bundle API or feature reliably and portably -
>> >>>> >> which
>> >>>> >> is
>> >>>> >> beam
>> >>>> >>      >>> promise.
>> >>>> >>      >>> > > Aligning chunking and bundles would guarantee that
>> >>>> >> bit
>> >>>> >> can be
>> >>>> >> not
>> >>>> >>      >>> > desired,
>> >>>> >>      >>> > > that is why i thought it can be another feature.
>> >>>> >>      >>> > >
>> >>>> >>      >>> > > GBK works until the IO knows about that and both
>> >>>> >> concepts are
>> >>>> >> not
>> >>>> >>      >>> always
>> >>>> >>      >>> > > orthogonal - backpressure like systems is a trivial
>> >>>> >> common
>> >>>> >> example.
>> >>>> >>      >>> This
>> >>>> >>      >>> > > means the IO (dofn) must be able to do it itself at
>> >>>> >> some
>> >>>> >> point.
>> >>>> >>      >>> > >
>> >>>> >>      >>> > > Also note the GBK works only if the IO can take a
>> >>>> >> list
>> >>>> >> which
>> >>>> >> is never
>> >>>> >>      >>> the
>> >>>> >>      >>> > > case today.
>> >>>> >>      >>> > >
>> >>>> >>      >>> > > Big questions for me are: is SDF the way to go since
>> >>>> >> it
>> >>>> >> provides the
>> >>>> >>      >>> > needed
>> >>>> >>      >>> > > API bit is not yet supported? What about existing IO?
>> >>>> >> Should
>> >>>> >> beam
>> >>>> >>      >>> provide
>> >>>> >>      >>> > > an auto wrapping of dofn for that pre-aggregated
>> >>>> >> support
>> >>>> >> and
>> >>>> >> simulate
>> >>>> >>      >>> > > bundles to the actual IO impl to keep the existing
>> >>>> >> API?
>> >>>> >>      >>> > >
>> >>>> >>      >>> > >
>> >>>> >>      >>> > > Le 17 nov. 2017 19:20, "Raghu Angadi"
>> >>>> >> <rang...@google.com.invalid> a
>> >>>> >>      >>> > > écrit :
>> >>>> >>      >>> > >
>> >>>> >>      >>> > > On Fri, Nov 17, 2017 at 1:02 AM, Romain Manni-Bucau <
>> >>>> >>      >>> > rmannibu...@gmail.com <mailto:rmannibu...@gmail.com>
>> >>>> >>
>> >>>> >>      >>> > > >
>> >>>> >>      >>> > > wrote:
>> >>>> >>      >>> > >
>> >>>> >>      >>> > > > Yep, just take ES IO, if a part of a bundle fails
>> >>>> >> you
>> >>>> >> are
>> >>>> >> in an
>> >>>> >>      >>> > > > unmanaged state. This is the case for all O (of IO
>> >>>> >> ;)).
>> >>>> >> Issue is not
>> >>>> >>      >>> > > > much about "1" (the code it takes) but more the
>> >>>> >> fact
>> >>>> >> it
>> >>>> >> doesn't
>> >>>> >>      >>> > > > integrate with runner features and retries
>> >>>> >> potentially:
>> >>>> >> what happens
>> >>>> >>      >>> > > > if a bundle has a failure? => undefined today. 2.
>> >>>> >> I'm
>> >>>> >> fine
>> >>>> >> with it
>> >>>> >>      >>> > > > while we know exactly what happens when we restart
>> >>>> >> after a
>> >>>> >> bundle
>> >>>> >>      >>> > > > failure. With ES the timestamp can be used for
>> >>>> >> instance.
>> >>>> >>      >>> > > >
>> >>>> >>      >>> > >
>> >>>> >>      >>> > > This deterministic batching can be achieved even now
>> >>>> >> with an
>> >>>> >> extra
>> >>>> >>      >>> > > GroupByKey (and if you want ordering on top of that,
>> >>>> >> will
>> >>>> >> need another
>> >>>> >>      >>> > > GBK). Don't know if that is too costly in your case.
>> >>>> >> I
>> >>>> >> would
>> >>>> >> need bit
>> >>>> >>      >>> > more
>> >>>> >>      >>> > > details on handling ES IO write retries to see it
>> >>>> >> could
>> >>>> >> be
>> >>>> >> simplified.
>> >>>> >>      >>> > Note
>> >>>> >>      >>> > > that retries occur with or without any failures in
>> >>>> >> your
>> >>>> >> DoFn.
>> >>>> >>      >>> > >
>> >>>> >>      >>> > > The biggest negative with GBK approach is that it
>> >>>> >> doesn't
>> >>>> >> provide same
>> >>>> >>      >>> > > guarantees on Flink.
>> >>>> >>      >>> > >
>> >>>> >>      >>> > > I don't see how GroubIntoBatches in Beam provides
>> >>>> >> specific
>> >>>> >> guarantees
>> >>>> >>      >>> on
>> >>>> >>      >>> > > deterministic batches.
>> >>>> >>      >>> > >
>> >>>> >>      >>> > > Thinking about it the SDF is really a way to do it
>> >>>> >> since
>> >>>> >> the
>> >>>> >> SDF will
>> >>>> >>      >>> > > > manage the bulking and associated with the runner
>> >>>> >> "retry"
>> >>>> >> it seems it
>> >>>> >>      >>> > > > covers the needs.
>> >>>> >>      >>> > > >
>> >>>> >>      >>> > > > Romain Manni-Bucau
>> >>>> >>      >>> > > > @rmannibucau |  Blog | Old Blog | Github | LinkedIn
>> >>>> >>      >>> > > >
>> >>>> >>      >>> > > >
>> >>>> >>      >>> > > > 2017-11-17 9:23 GMT+01:00 Eugene Kirpichov
>> >>>> >>      >>> > <kirpic...@google.com.invalid
>> >>>> >>      >>> > > >:
>> >>>> >>      >>> > > > > I must admit I'm still failing to understand the
>> >>>> >> problem,
>> >>>> >> so let's
>> >>>> >>      >>> > step
>> >>>> >>      >>> > > > > back even further.
>> >>>> >>      >>> > > > >
>> >>>> >>      >>> > > > > Could you give an example of an IO that is
>> >>>> >> currently
>> >>>> >> difficult to
>> >>>> >>      >>> > > > implement
>> >>>> >>      >>> > > > > specifically because of lack of the feature
>> >>>> >> you're
>> >>>> >> talking about?
>> >>>> >>      >>> > > > >
>> >>>> >>      >>> > > > > I'm asking because I've reviewed almost all Beam
>> >>>> >> IOs
>> >>>> >> and
>> >>>> >> don't
>> >>>> >>      >>> recall
>> >>>> >>      >>> > > > > seeing a similar problem. Sure, a lot of IOs do
>> >>>> >> batching
>> >>>> >> within a
>> >>>> >>      >>> > > bundle,
>> >>>> >>      >>> > > > > but 1) it doesn't take up much code (granted, it
>> >>>> >> would be
>> >>>> >> even
>> >>>> >>      >>> easier
>> >>>> >>      >>> > > if
>> >>>> >>      >>> > > > > Beam did it for us) and 2) I don't remember any
>> >>>> >> of
>> >>>> >> them
>> >>>> >> requiring
>> >>>> >>      >>> the
>> >>>> >>      >>> > > > > batches to be deterministic, and I'm having a
>> >>>> >> hard
>> >>>> >> time
>> >>>> >> imagining
>> >>>> >>      >>> > what
>> >>>> >>      >>> > > > kind
>> >>>> >>      >>> > > > > of storage system would be able to deduplicate if
>> >>>> >> batches
>> >>>> >> were
>> >>>> >>      >>> > > > > deterministic but wouldn't be able to deduplicate
>> >>>> >> if
>> >>>> >> they
>> >>>> >> weren't.
>> >>>> >>      >>> > > > >
>> >>>> >>      >>> > > > > On Thu, Nov 16, 2017 at 11:50 PM Romain
>> >>>> >> Manni-Bucau
>> >>>> >> <
>> >>>> >>      >>> > > > rmannibu...@gmail.com
>> >>>> >> <mailto:rmannibu...@gmail.com>>
>> >>>> >>
>> >>>> >>      >>> > > > > wrote:
>> >>>> >>      >>> > > > >
>> >>>> >>      >>> > > > >> Ok, let me try to step back and summarize what
>> >>>> >> we
>> >>>> >> have
>> >>>> >> today and
>> >>>> >>      >>> > what
>> >>>> >>      >>> > > I
>> >>>> >>      >>> > > > >> miss:
>> >>>> >>      >>> > > > >>
>> >>>> >>      >>> > > > >> 1. we can handle chunking in beam through group
>> >>>> >> in
>> >>>> >> batch
>> >>>> >> (or
>> >>>> >>      >>> > > equivalent)
>> >>>> >>      >>> > > > >> but:
>> >>>> >>      >>> > > > >>    > it is not built-in into the transforms (IO)
>> >>>> >> and it
>> >>>> >> is
>> >>>> >>      >>> > controlled
>> >>>> >>      >>> > > > >> from outside the transforms so no way for a
>> >>>> >> transform to
>> >>>> >> do it
>> >>>> >>      >>> > > > >> properly without handling itself a composition
>> >>>> >> and
>> >>>> >> links
>> >>>> >> between
>> >>>> >>      >>> > > > >> multiple dofns to have notifications and
>> >>>> >> potentially
>> >>>> >> react
>> >>>> >>      >>> properly
>> >>>> >>      >>> > or
>> >>>> >>      >>> > > > >> handle backpressure from its backend
>> >>>> >>      >>> > > > >> 2. there is no restart feature because there is
>> >>>> >> no
>> >>>> >> real
>> >>>> >> state
>> >>>> >>      >>> > handling
>> >>>> >>      >>> > > > >> at the moment. this sounds fully delegated to
>> >>>> >> the
>> >>>> >> runner
>> >>>> >> but I was
>> >>>> >>      >>> > > > >> hoping to have more guarantees from the used API
>> >>>> >> to
>> >>>> >> be
>> >>>> >> able to
>> >>>> >>      >>> > restart
>> >>>> >>      >>> > > > >> a pipeline (mainly batch since it can be
>> >>>> >> irrelevant
>> >>>> >> or
>> >>>> >> delegates
>> >>>> >>      >>> to
>> >>>> >>      >>> > > > >> the backend for streams) and handle only not
>> >>>> >> commited
>> >>>> >> records so
>> >>>> >>      >>> it
>> >>>> >>      >>> > > > >> requires some persistence outside the main IO
>> >>>> >> storages
>> >>>> >> to do it
>> >>>> >>      >>> > > > >> properly
>> >>>> >>      >>> > > > >>    > note this is somehow similar to the
>> >>>> >> monitoring
>> >>>> >> topic which
>> >>>> >>      >>> miss
>> >>>> >>      >>> > > > >> persistence ATM so it can end up to beam to have
>> >>>> >> a
>> >>>> >> pluggable
>> >>>> >>      >>> storage
>> >>>> >>      >>> > > > >> for a few concerns
>> >>>> >>      >>> > > > >>
>> >>>> >>      >>> > > > >>
>> >>>> >>      >>> > > > >> Short term I would be happy with 1 solved
>> >>>> >> properly,
>> >>>> >> long
>> >>>> >> term I
>> >>>> >>      >>> hope
>> >>>> >>      >>> > 2
>> >>>> >>      >>> > > > >> will be tackled without workarounds requiring
>> >>>> >> custom
>> >>>> >> wrapping of
>> >>>> >>      >>> IO
>> >>>> >>      >>> > to
>> >>>> >>      >>> > > > >> use a custom state persistence.
>> >>>> >>      >>> > > > >>
>> >>>> >>      >>> > > > >>
>> >>>> >>      >>> > > > >>
>> >>>> >>      >>> > > > >> Romain Manni-Bucau
>> >>>> >>      >>> > > > >> @rmannibucau |  Blog | Old Blog | Github |
>> >>>> >> LinkedIn
>> >>>> >>      >>> > > > >>
>> >>>> >>      >>> > > > >>
>> >>>> >>      >>> > > > >> 2017-11-17 7:44 GMT+01:00 Jean-Baptiste Onofré
>> >>>> >>     <j...@nanthrax.net <mailto:j...@nanthrax.net>>:
>> >>>> >>
>> >>>> >>      >>> > > > >> > Thanks for the explanation. Agree, we might
>> >>>> >> talk
>> >>>> >> about
>> >>>> >> different
>> >>>> >>      >>> > > > things
>> >>>> >>      >>> > > > >> > using the same wording.
>> >>>> >>      >>> > > > >> >
>> >>>> >>      >>> > > > >> > I'm also struggling to understand the use case
>> >>>> >> (for a
>> >>>> >> generic
>> >>>> >>      >>> > DoFn).
>> >>>> >>      >>> > > > >> >
>> >>>> >>      >>> > > > >> > Regards
>> >>>> >>      >>> > > > >> > JB
>> >>>> >>      >>> > > > >> >
>> >>>> >>      >>> > > > >> >
>> >>>> >>      >>> > > > >> > On 11/17/2017 07:40 AM, Eugene Kirpichov
>> >>>> >> wrote:
>> >>>> >>      >>> > > > >> >>
>> >>>> >>      >>> > > > >> >> To avoid spending a lot of time pursuing a
>> >>>> >> false
>> >>>> >> path, I'd like
>> >>>> >>      >>> > to
>> >>>> >>      >>> > > > say
>> >>>> >>      >>> > > > >> >> straight up that SDF is definitely not going
>> >>>> >> to
>> >>>> >> help
>> >>>> >> here,
>> >>>> >>      >>> > despite
>> >>>> >>      >>> > > > the
>> >>>> >>      >>> > > > >> >> fact
>> >>>> >>      >>> > > > >> >> that its API includes the term "checkpoint".
>> >>>> >> In
>> >>>> >> SDF,
>> >>>> >> the
>> >>>> >>      >>> > > "checkpoint"
>> >>>> >>      >>> > > > >> >> captures the state of processing within a
>> >>>> >> single
>> >>>> >> element. If
>> >>>> >>      >>> > you're
>> >>>> >>      >>> > > > >> >> applying an SDF to 1000 elements, it will,
>> >>>> >> like
>> >>>> >> any
>> >>>> >> other DoFn,
>> >>>> >>      >>> > be
>> >>>> >>      >>> > > > >> applied
>> >>>> >>      >>> > > > >> >> to each of them independently and in
>> >>>> >> parallel,
>> >>>> >> and
>> >>>> >> you'll have
>> >>>> >>      >>> > 1000
>> >>>> >>      >>> > > > >> >> checkpoints capturing the state of processing
>> >>>> >> each of
>> >>>> >> these
>> >>>> >>      >>> > > elements,
>> >>>> >>      >>> > > > >> >> which
>> >>>> >>      >>> > > > >> >> is probably not what you want.
>> >>>> >>      >>> > > > >> >>
>> >>>> >>      >>> > > > >> >> I'm afraid I still don't understand what kind
>> >>>> >> of
>> >>>> >> checkpoint you
>> >>>> >>      >>> > > > need, if
>> >>>> >>      >>> > > > >> >> it
>> >>>> >>      >>> > > > >> >> is not just deterministic grouping into
>> >>>> >> batches.
>> >>>> >> "Checkpoint"
>> >>>> >>      >>> is
>> >>>> >>      >>> > a
>> >>>> >>      >>> > > > very
>> >>>> >>      >>> > > > >> >> broad term and it's very possible that
>> >>>> >> everybody
>> >>>> >> in
>> >>>> >> this thread
>> >>>> >>      >>> > is
>> >>>> >>      >>> > > > >> talking
>> >>>> >>      >>> > > > >> >> about different things when saying it. So it
>> >>>> >> would
>> >>>> >> help if you
>> >>>> >>      >>> > > could
>> >>>> >>      >>> > > > >> give
>> >>>> >>      >>> > > > >> >> a
>> >>>> >>      >>> > > > >> >> more concrete example: for example, take some
>> >>>> >> IO
>> >>>> >> that
>> >>>> >> you think
>> >>>> >>      >>> > > > could be
>> >>>> >>      >>> > > > >> >> easier to write with your proposed API, give
>> >>>> >> the
>> >>>> >> contents of a
>> >>>> >>      >>> > > > >> >> hypothetical
>> >>>> >>      >>> > > > >> >> PCollection being written to this IO, give
>> >>>> >> the
>> >>>> >> code
>> >>>> >> of a
>> >>>> >>      >>> > > hypothetical
>> >>>> >>      >>> > > > >> DoFn
>> >>>> >>      >>> > > > >> >> implementing the write using your API, and
>> >>>> >> explain
>> >>>> >> what you'd
>> >>>> >>      >>> > > expect
>> >>>> >>      >>> > > > to
>> >>>> >>      >>> > > > >> >> happen at runtime.
>> >>>> >>      >>> > > > >> >>
>> >>>> >>      >>> > > > >> >> On Thu, Nov 16, 2017 at 10:33 PM Romain
>> >>>> >> Manni-Bucau
>> >>>> >>      >>> > > > >> >> <rmannibu...@gmail.com
>> >>>> >> <mailto:rmannibu...@gmail.com>>
>> >>>> >>
>> >>>> >>      >>> > > > >> >> wrote:
>> >>>> >>      >>> > > > >> >>
>> >>>> >>      >>> > > > >> >>> @Eugene: yes and the other alternative of
>> >>>> >> Reuven too
>> >>>> >> but it is
>> >>>> >>      >>> > > still
>> >>>> >>      >>> > > > >> >>> 1. relying on timers, 2. not really
>> >>>> >> checkpointed
>> >>>> >>      >>> > > > >> >>>
>> >>>> >>      >>> > > > >> >>> In other words it seems all solutions are to
>> >>>> >> create
>> >>>> >> a chunk of
>> >>>> >>      >>> > > size
>> >>>> >>      >>> > > > 1
>> >>>> >>      >>> > > > >> >>> and replayable to fake the lack of chunking
>> >>>> >> in
>> >>>> >> the
>> >>>> >> framework.
>> >>>> >>      >>> > This
>> >>>> >>      >>> > > > >> >>> always implies a chunk handling outside the
>> >>>> >> component
>> >>>> >>      >>> (typically
>> >>>> >>      >>> > > > >> >>> before for an output). My point is I think
>> >>>> >> IO
>> >>>> >> need
>> >>>> >> it in their
>> >>>> >>      >>> > own
>> >>>> >>      >>> > > > >> >>> "internal" or at least control it themselves
>> >>>> >> since
>> >>>> >> the chunk
>> >>>> >>      >>> > size
>> >>>> >>      >>> > > is
>> >>>> >>      >>> > > > >> >>> part of the IO handling most of the time.
>> >>>> >>      >>> > > > >> >>>
>> >>>> >>      >>> > > > >> >>> I think JB spoke of the same "group before"
>> >>>> >> trick
>> >>>> >> using
>> >>>> >>      >>> > > restrictions
>> >>>> >>      >>> > > > >> >>> which can work I have to admit if SDF are
>> >>>> >> implemented by
>> >>>> >>      >>> > runners.
>> >>>> >>      >>> > > Is
>> >>>> >>      >>> > > > >> >>> there a roadmap/status on that? Last time I
>> >>>> >> checked
>> >>>> >> SDF was a
>> >>>> >>      >>> > > great
>> >>>> >>      >>> > > > >> >>> API without support :(.
>> >>>> >>      >>> > > > >> >>>
>> >>>> >>      >>> > > > >> >>>
>> >>>> >>      >>> > > > >> >>>
>> >>>> >>      >>> > > > >> >>> Romain Manni-Bucau
>> >>>> >>      >>> > > > >> >>> @rmannibucau |  Blog | Old Blog | Github |
>> >>>> >> LinkedIn
>> >>>> >>      >>> > > > >> >>>
>> >>>> >>      >>> > > > >> >>>
>> >>>> >>      >>> > > > >> >>> 2017-11-17 7:25 GMT+01:00 Eugene Kirpichov
>> >>>> >>      >>> > > > >> >>> <kirpic...@google.com.invalid>:
>> >>>> >>      >>> > > > >> >>>>
>> >>>> >>      >>> > > > >> >>>> JB, not sure what you mean? SDFs and
>> >>>> >> triggers
>> >>>> >> are
>> >>>> >> unrelated,
>> >>>> >>      >>> > and
>> >>>> >>      >>> > > > the
>> >>>> >>      >>> > > > >> >>>> post
>> >>>> >>      >>> > > > >> >>>> doesn't mention the word. Did you mean
>> >>>> >> something
>> >>>> >> else, e.g.
>> >>>> >>      >>> > > > >> restriction
>> >>>> >>      >>> > > > >> >>>> perhaps? Either way I don't think SDFs are
>> >>>> >> the
>> >>>> >> solution here;
>> >>>> >>      >>> > > SDFs
>> >>>> >>      >>> > > > >> have
>> >>>> >>      >>> > > > >> >>>
>> >>>> >>      >>> > > > >> >>> to
>> >>>> >>      >>> > > > >> >>>>
>> >>>> >>      >>> > > > >> >>>> do with the ability to split the processing
>> >>>> >> of
>> >>>> >> *a
>> >>>> >> single
>> >>>> >>      >>> > element*
>> >>>> >>      >>> > > > over
>> >>>> >>      >>> > > > >> >>>> multiple calls, whereas Romain I think is
>> >>>> >> asking
>> >>>> >> for
>> >>>> >>      >>> repeatable
>> >>>> >>      >>> > > > >> grouping
>> >>>> >>      >>> > > > >> >>>
>> >>>> >>      >>> > > > >> >>> of
>> >>>> >>      >>> > > > >> >>>>
>> >>>> >>      >>> > > > >> >>>> *multiple* elements.
>> >>>> >>      >>> > > > >> >>>>
>> >>>> >>      >>> > > > >> >>>> Romain - does
>> >>>> >>      >>> > > > >> >>>>
>> >>>> >>      >>> > > > >> >>>
>> >>>> >>      >>> > > > >> >>>
>> >>>> >>      >>> > > > >>
>> >>>> >> https://github.com/apache/beam/blob/master/sdks/java/
>> >>>> >>     <https://github.com/apache/beam/blob/master/sdks/java/>
>> >>>> >>      >>> > > > core/src/main/java/org/apache/beam/sdk/transforms/
>> >>>> >>      >>> GroupIntoBatches.java
>> >>>> >>      >>> > > > >> >>>>
>> >>>> >>      >>> > > > >> >>>> do what
>> >>>> >>      >>> > > > >> >>>> you want?
>> >>>> >>      >>> > > > >> >>>>
>> >>>> >>      >>> > > > >> >>>> On Thu, Nov 16, 2017 at 10:19 PM
>> >>>> >> Jean-Baptiste
>> >>>> >> Onofré <
>> >>>> >>      >>> > > > >> j...@nanthrax.net <mailto:j...@nanthrax.net>>
>> >>>> >>      >>> > > > >> >>>> wrote:
>> >>>> >>      >>> > > > >> >>>>
>> >>>> >>      >>> > > > >> >>>>> It sounds like the "Trigger" in the
>> >>>> >> Splittable
>> >>>> >> DoFn, no ?
>> >>>> >>      >>> > > > >> >>>>>
>> >>>> >>      >>> > > > >> >>>>>
>> >>>> >> https://beam.apache.org/blog/2017/08/16/splittable-do-fn
>> >>>> >>     <https://beam.apache.org/blog/2017/08/16/splittable-do-fn>.
>> >>>> >>
>> >>>> >>      >>> html
>> >>>> >>      >>> > > > >> >>>>>
>> >>>> >>      >>> > > > >> >>>>> Regards
>> >>>> >>      >>> > > > >> >>>>> JB
>> >>>> >>      >>> > > > >> >>>>>
>> >>>> >>      >>> > > > >> >>>>>
>> >>>> >>      >>> > > > >> >>>>> On 11/17/2017 06:56 AM, Romain Manni-Bucau
>> >>>> >> wrote:
>> >>>> >>      >>> > > > >> >>>>>>
>> >>>> >>      >>> > > > >> >>>>>> it gives the fn/transform the ability to
>> >>>> >> save a
>> >>>> >> state - it
>> >>>> >>      >>> > can
>> >>>> >>      >>> > > > get
>> >>>> >>      >>> > > > >> >>>>>> back on "restart" / whatever unit we can
>> >>>> >> use,
>> >>>> >> probably
>> >>>> >>      >>> runner
>> >>>> >>      >>> > > > >> >>>>>> dependent? Without that you need to
>> >>>> >> rewrite
>> >>>> >> all
>> >>>> >> IO usage
>> >>>> >>      >>> with
>> >>>> >>      >>> > > > >> >>>>>> something like the previous pattern which
>> >>>> >> makes
>> >>>> >> the IO not
>> >>>> >>      >>> > self
>> >>>> >>      >>> > > > >> >>>>>> sufficient and kind of makes the entry
>> >>>> >> cost
>> >>>> >> and
>> >>>> >> usage of
>> >>>> >>      >>> beam
>> >>>> >>      >>> > > way
>> >>>> >>      >>> > > > >> >>>>>> further.
>> >>>> >>      >>> > > > >> >>>>>>
>> >>>> >>      >>> > > > >> >>>>>> In my mind it is exactly what
>> >>>> >> jbatch/spring-batch
>> >>>> >> uses but
>> >>>> >>      >>> > > > adapted
>> >>>> >>      >>> > > > >> to
>> >>>> >>      >>> > > > >> >>>>>> beam (stream in particular) case.
>> >>>> >>      >>> > > > >> >>>>>>
>> >>>> >>      >>> > > > >> >>>>>> Romain Manni-Bucau
>> >>>> >>      >>> > > > >> >>>>>> @rmannibucau |  Blog | Old Blog | Github
>> >>>> >> |
>> >>>> >> LinkedIn
>> >>>> >>      >>> > > > >> >>>>>>
>> >>>> >>      >>> > > > >> >>>>>>
>> >>>> >>      >>> > > > >> >>>>>> 2017-11-17 6:49 GMT+01:00 Reuven Lax
>> >>>> >>      >>> > <re...@google.com.invalid
>> >>>> >>      >>> > > >:
>> >>>> >>      >>> > > > >> >>>>>>>
>> >>>> >>      >>> > > > >> >>>>>>> Romain,
>> >>>> >>      >>> > > > >> >>>>>>>
>> >>>> >>      >>> > > > >> >>>>>>> Can you define what you mean by
>> >>>> >> checkpoint?
>> >>>> >> What
>> >>>> >> are the
>> >>>> >>      >>> > > > semantics,
>> >>>> >>      >>> > > > >> >>>
>> >>>> >>      >>> > > > >> >>> what
>> >>>> >>      >>> > > > >> >>>>>>>
>> >>>> >>      >>> > > > >> >>>>>>> does it accomplish?
>> >>>> >>      >>> > > > >> >>>>>>>
>> >>>> >>      >>> > > > >> >>>>>>> Reuven
>> >>>> >>      >>> > > > >> >>>>>>>
>> >>>> >>      >>> > > > >> >>>>>>> On Fri, Nov 17, 2017 at 1:40 PM, Romain
>> >>>> >> Manni-Bucau <
>> >>>> >>      >>> > > > >> >>>>>
>> >>>> >>      >>> > > > >> >>>>> rmannibu...@gmail.com
>> >>>> >> <mailto:rmannibu...@gmail.com>>
>> >>>> >>
>> >>>> >>      >>> > > > >> >>>>>>>
>> >>>> >>      >>> > > > >> >>>>>>> wrote:
>> >>>> >>      >>> > > > >> >>>>>>>
>> >>>> >>      >>> > > > >> >>>>>>>> Yes, what I propose earlier was:
>> >>>> >>      >>> > > > >> >>>>>>>>
>> >>>> >>      >>> > > > >> >>>>>>>> I. checkpoint marker:
>> >>>> >>      >>> > > > >> >>>>>>>>
>> >>>> >>      >>> > > > >> >>>>>>>> @AnyBeamAnnotation
>> >>>> >>      >>> > > > >> >>>>>>>> @CheckpointAfter
>> >>>> >>      >>> > > > >> >>>>>>>> public void someHook(SomeContext ctx);
>> >>>> >>      >>> > > > >> >>>>>>>>
>> >>>> >>      >>> > > > >> >>>>>>>>
>> >>>> >>      >>> > > > >> >>>>>>>> II. pipeline.apply(ParDo.of(new
>> >>>> >>      >>> > > > >> MyFn()).withCheckpointAlgorithm(new
>> >>>> >>      >>> > > > >> >>>>>>>> CountingAlgo()))
>> >>>> >>      >>> > > > >> >>>>>>>>
>> >>>> >>      >>> > > > >> >>>>>>>> III. (I like this one less)
>> >>>> >>      >>> > > > >> >>>>>>>>
>> >>>> >>      >>> > > > >> >>>>>>>> // in the dofn
>> >>>> >>      >>> > > > >> >>>>>>>> @CheckpointTester
>> >>>> >>      >>> > > > >> >>>>>>>> public boolean shouldCheckpoint();
>> >>>> >>      >>> > > > >> >>>>>>>>
>> >>>> >>      >>> > > > >> >>>>>>>> IV. @Checkpointer Serializable
>> >>>> >> getCheckpoint();
>> >>>> >> in the
>> >>>> >>      >>> dofn
>> >>>> >>      >>> > > per
>> >>>> >>      >>> > > > >> >>>
>> >>>> >>      >>> > > > >> >>> element
>> >>>> >>      >>> > > > >> >>>>>>>>
>> >>>> >>      >>> > > > >> >>>>>>>>
>> >>>> >>      >>> > > > >> >>>>>>>>
>> >>>> >>      >>> > > > >> >>>>>>>>
>> >>>> >>      >>> > > > >> >>>>>>>>
>> >>>> >>      >>> > > > >> >>>>>>>> Romain Manni-Bucau
>> >>>> >>      >>> > > > >> >>>>>>>> @rmannibucau |  Blog | Old Blog |
>> >>>> >> Github |
>> >>>> >> LinkedIn
>> >>>> >>      >>> > > > >> >>>>>>>>
>> >>>> >>      >>> > > > >> >>>>>>>>
>> >>>> >>      >>> > > > >> >>>>>>>> 2017-11-17 6:06 GMT+01:00 Raghu Angadi
>> >>>> >>      >>> > > > <rang...@google.com.invalid
>> >>>> >>      >>> > > > >> >>>>
>> >>>> >>      >>> > > > >> >>>> :
>> >>>> >>      >>> > > > >> >>>>>>>>>
>> >>>> >>      >>> > > > >> >>>>>>>>> How would you define it (rough API is
>> >>>> >> fine)?.
>> >>>> >> Without
>> >>>> >>      >>> more
>> >>>> >>      >>> > > > >> details,
>> >>>> >>      >>> > > > >> >>>>>
>> >>>> >>      >>> > > > >> >>>>> it is
>> >>>> >>      >>> > > > >> >>>>>>>>>
>> >>>> >>      >>> > > > >> >>>>>>>>> not easy to see wider applicability
>> >>>> >> and
>> >>>> >> feasibility in
>> >>>> >>      >>> > > > runners.
>> >>>> >>      >>> > > > >> >>>>>>>>>
>> >>>> >>      >>> > > > >> >>>>>>>>> On Thu, Nov 16, 2017 at 1:13 PM,
>> >>>> >> Romain
>> >>>> >> Manni-Bucau <
>> >>>> >>      >>> > > > >> >>>>>>>>
>> >>>> >>      >>> > > > >> >>>>>>>> rmannibu...@gmail.com
>> >>>> >> <mailto:rmannibu...@gmail.com>>
>> >>>> >>
>> >>>> >>      >>> > > > >> >>>>>>>>>
>> >>>> >>      >>> > > > >> >>>>>>>>> wrote:
>> >>>> >>      >>> > > > >> >>>>>>>>>
>> >>>> >>      >>> > > > >> >>>>>>>>>> This is a fair summary of the current
>> >>>> >> state
>> >>>> >> but also
>> >>>> >>      >>> > where
>> >>>> >>      >>> > > > beam
>> >>>> >>      >>> > > > >> >>>
>> >>>> >>      >>> > > > >> >>> can
>> >>>> >>      >>> > > > >> >>>>>>>>
>> >>>> >>      >>> > > > >> >>>>>>>> have a
>> >>>> >>      >>> > > > >> >>>>>>>>>>
>> >>>> >>      >>> > > > >> >>>>>>>>>> very strong added value and make big
>> >>>> >> data
>> >>>> >> great and
>> >>>> >>      >>> > smooth.
>> >>>> >>      >>> > > > >> >>>>>>>>>>
>> >>>> >>      >>> > > > >> >>>>>>>>>> Instead of this replay feature isnt
>> >>>> >> checkpointing
>> >>>> >>      >>> > willable?
>> >>>> >>      >>> > > > In
>> >>>> >>      >>> > > > >> >>>>>>>>
>> >>>> >>      >>> > > > >> >>>>>>>> particular
>> >>>> >>      >>> > > > >> >>>>>>>>>>
>> >>>> >>      >>> > > > >> >>>>>>>>>> with SDF no?
>> >>>> >>      >>> > > > >> >>>>>>>>>>
>> >>>> >>      >>> > > > >> >>>>>>>>>>
>> >>>> >>      >>> > > > >> >>>>>>>>>> Le 16 nov. 2017 19:50, "Raghu Angadi"
>> >>>> >>      >>> > > > >> <rang...@google.com.invalid>
>> >>>> >>      >>> > > > >> >>>
>> >>>> >>      >>> > > > >> >>> a
>> >>>> >>      >>> > > > >> >>>>>>>>>>
>> >>>> >>      >>> > > > >> >>>>>>>>>> écrit :
>> >>>> >>      >>> > > > >> >>>>>>>>>>
>> >>>> >>      >>> > > > >> >>>>>>>>>>> Core issue here is that there is no
>> >>>> >> explicit
>> >>>> >> concept
>> >>>> >>      >>> of
>> >>>> >>      >>> > > > >> >>>
>> >>>> >>      >>> > > > >> >>> 'checkpoint'
>> >>>> >>      >>> > > > >> >>>>>>>>
>> >>>> >>      >>> > > > >> >>>>>>>> in
>> >>>> >>      >>> > > > >> >>>>>>>>>>>
>> >>>> >>      >>> > > > >> >>>>>>>>>>> Beam (UnboundedSource has a method
>> >>>> >> 'getCheckpointMark'
>> >>>> >>      >>> > but
>> >>>> >>      >>> > > > that
>> >>>> >>      >>> > > > >> >>>>>>>>
>> >>>> >>      >>> > > > >> >>>>>>>> refers to
>> >>>> >>      >>> > > > >> >>>>>>>>>>>
>> >>>> >>      >>> > > > >> >>>>>>>>>>> the checkoint on external source).
>> >>>> >> Runners
>> >>>> >> do
>> >>>> >>      >>> checkpoint
>> >>>> >>      >>> > > > >> >>>
>> >>>> >>      >>> > > > >> >>> internally
>> >>>> >>      >>> > > > >> >>>>>
>> >>>> >>      >>> > > > >> >>>>> as
>> >>>> >>      >>> > > > >> >>>>>>>>>>>
>> >>>> >>      >>> > > > >> >>>>>>>>>>> implementation detail. Flink's
>> >>>> >> checkpoint
>> >>>> >> model is
>> >>>> >>      >>> > > entirely
>> >>>> >>      >>> > > > >> >>>>>
>> >>>> >>      >>> > > > >> >>>>> different
>> >>>> >>      >>> > > > >> >>>>>>>>>>
>> >>>> >>      >>> > > > >> >>>>>>>>>> from
>> >>>> >>      >>> > > > >> >>>>>>>>>>>
>> >>>> >>      >>> > > > >> >>>>>>>>>>> Dataflow's and Spark's.
>> >>>> >>      >>> > > > >> >>>>>>>>>>>
>> >>>> >>      >>> > > > >> >>>>>>>>>>> @StableReplay helps, but it does not
>> >>>> >> explicitly talk
>> >>>> >>      >>> > about
>> >>>> >>      >>> > > a
>> >>>> >>      >>> > > > >> >>>>>>>>
>> >>>> >>      >>> > > > >> >>>>>>>> checkpoint
>> >>>> >>      >>> > > > >> >>>>>>>>>>
>> >>>> >>      >>> > > > >> >>>>>>>>>> by
>> >>>> >>      >>> > > > >> >>>>>>>>>>>
>> >>>> >>      >>> > > > >> >>>>>>>>>>> design.
>> >>>> >>      >>> > > > >> >>>>>>>>>>>
>> >>>> >>      >>> > > > >> >>>>>>>>>>> If you are looking to achieve some
>> >>>> >> guarantees with a
>> >>>> >>      >>> > > > >> sink/DoFn, I
>> >>>> >>      >>> > > > >> >>>>>>>>
>> >>>> >>      >>> > > > >> >>>>>>>> think
>> >>>> >>      >>> > > > >> >>>>>>>>>>
>> >>>> >>      >>> > > > >> >>>>>>>>>> it
>> >>>> >>      >>> > > > >> >>>>>>>>>>>
>> >>>> >>      >>> > > > >> >>>>>>>>>>> is better to start with the
>> >>>> >> requirements. I
>> >>>> >> worked on
>> >>>> >>      >>> > > > >> >>>
>> >>>> >>      >>> > > > >> >>> exactly-once
>> >>>> >>      >>> > > > >> >>>>>>>>
>> >>>> >>      >>> > > > >> >>>>>>>> sink
>> >>>> >>      >>> > > > >> >>>>>>>>>>
>> >>>> >>      >>> > > > >> >>>>>>>>>> for
>> >>>> >>      >>> > > > >> >>>>>>>>>>>
>> >>>> >>      >>> > > > >> >>>>>>>>>>> Kafka (see
>> >>>> >> KafkaIO.write().withEOS()),
>> >>>> >> where
>> >>>> >> we
>> >>>> >>      >>> > > essentially
>> >>>> >>      >>> > > > >> >>>
>> >>>> >>      >>> > > > >> >>> reshard
>> >>>> >>      >>> > > > >> >>>>>>>>
>> >>>> >>      >>> > > > >> >>>>>>>> the
>> >>>> >>      >>> > > > >> >>>>>>>>>>>
>> >>>> >>      >>> > > > >> >>>>>>>>>>> elements and assign sequence numbers
>> >>>> >> to
>> >>>> >> elements with
>> >>>> >>      >>> in
>> >>>> >>      >>> > > > each
>> >>>> >>      >>> > > > >> >>>
>> >>>> >>      >>> > > > >> >>> shard.
>> >>>> >>      >>> > > > >> >>>>>>>>>>>
>> >>>> >>      >>> > > > >> >>>>>>>>>>> Duplicates in replays are avoided
>> >>>> >> based
>> >>>> >> on
>> >>>> >> these
>> >>>> >>      >>> > sequence
>> >>>> >>      >>> > > > >> >>>
>> >>>> >>      >>> > > > >> >>> numbers.
>> >>>> >>      >>> > > > >> >>>>>>>>
>> >>>> >>      >>> > > > >> >>>>>>>> DoFn
>> >>>> >>      >>> > > > >> >>>>>>>>>>>
>> >>>> >>      >>> > > > >> >>>>>>>>>>> state API is used to buffer out-of
>> >>>> >> order
>> >>>> >> replays. The
>> >>>> >>      >>> > > > >> >>>
>> >>>> >>      >>> > > > >> >>> implementation
>> >>>> >>      >>> > > > >> >>>>>>>>>>>
>> >>>> >>      >>> > > > >> >>>>>>>>>>> strategy works in Dataflow but not
>> >>>> >> in
>> >>>> >> Flink
>> >>>> >> which has
>> >>>> >>      >>> a
>> >>>> >>      >>> > > > >> >>>
>> >>>> >>      >>> > > > >> >>> horizontal
>> >>>> >>      >>> > > > >> >>>>>>>>>>>
>> >>>> >>      >>> > > > >> >>>>>>>>>>> checkpoint. KafkaIO checks for
>> >>>> >> compatibility.
>> >>>> >>      >>> > > > >> >>>>>>>>>>>
>> >>>> >>      >>> > > > >> >>>>>>>>>>> On Wed, Nov 15, 2017 at 12:38 AM,
>> >>>> >> Romain
>> >>>> >> Manni-Bucau <
>> >>>> >>      >>> > > > >> >>>>>>>>>>> rmannibu...@gmail.com
>> >>>> >> <mailto:rmannibu...@gmail.com>>
>> >>>> >>
>> >>>> >>      >>> > > > >> >>>>>>>>>>> wrote:
>> >>>> >>      >>> > > > >> >>>>>>>>>>>
>> >>>> >>      >>> > > > >> >>>>>>>>>>>> Hi guys,
>> >>>> >>      >>> > > > >> >>>>>>>>>>>>
>> >>>> >>      >>> > > > >> >>>>>>>>>>>> The subject is a bit provocative
>> >>>> >> but
>> >>>> >> the
>> >>>> >> topic is
>> >>>> >>      >>> real
>> >>>> >>      >>> > > and
>> >>>> >>      >>> > > > >> >>>
>> >>>> >>      >>> > > > >> >>> coming
>> >>>> >>      >>> > > > >> >>>>>>>>>>>>
>> >>>> >>      >>> > > > >> >>>>>>>>>>>> again and again with the beam
>> >>>> >> usage:
>> >>>> >> how a
>> >>>> >> dofn can
>> >>>> >>      >>> > > handle
>> >>>> >>      >>> > > > >> some
>> >>>> >>      >>> > > > >> >>>>>>>>>>>> "chunking".
>> >>>> >>      >>> > > > >> >>>>>>>>>>>>
>> >>>> >>      >>> > > > >> >>>>>>>>>>>> The need is to be able to commit
>> >>>> >> each
>> >>>> >> N
>> >>>> >> records but
>> >>>> >>      >>> > with
>> >>>> >>      >>> > > N
>> >>>> >>      >>> > > > not
>> >>>> >>      >>> > > > >> >>>
>> >>>> >>      >>> > > > >> >>> too
>> >>>> >>      >>> > > > >> >>>>>>>>
>> >>>> >>      >>> > > > >> >>>>>>>> big.
>> >>>> >>      >>> > > > >> >>>>>>>>>>>>
>> >>>> >>      >>> > > > >> >>>>>>>>>>>>
>> >>>> >>      >>> > > > >> >>>>>>>>>>>> The natural API for that in beam is
>> >>>> >> the
>> >>>> >> bundle one
>> >>>> >>      >>> but
>> >>>> >>      >>> > > > bundles
>> >>>> >>      >>> > > > >> >>>
>> >>>> >>      >>> > > > >> >>> are
>> >>>> >>      >>> > > > >> >>>>>>>>
>> >>>> >>      >>> > > > >> >>>>>>>> not
>> >>>> >>      >>> > > > >> >>>>>>>>>>>>
>> >>>> >>      >>> > > > >> >>>>>>>>>>>> reliable since they can be very
>> >>>> >> small
>> >>>> >> (flink) - we
>> >>>> >>      >>> can
>> >>>> >>      >>> > > say
>> >>>> >>      >>> > > > it
>> >>>> >>      >>> > > > >> is
>> >>>> >>      >>> > > > >> >>>>>>>>
>> >>>> >>      >>> > > > >> >>>>>>>> "ok"
>> >>>> >>      >>> > > > >> >>>>>>>>>>>>
>> >>>> >>      >>> > > > >> >>>>>>>>>>>> even if it has some perf impacts -
>> >>>> >> or
>> >>>> >> too
>> >>>> >> big (spark
>> >>>> >>      >>> > does
>> >>>> >>      >>> > > > full
>> >>>> >>      >>> > > > >> >>>
>> >>>> >>      >>> > > > >> >>> size
>> >>>> >>      >>> > > > >> >>>>>>>>
>> >>>> >>      >>> > > > >> >>>>>>>> /
>> >>>> >>      >>> > > > >> >>>>>>>>>>>>
>> >>>> >>      >>> > > > >> >>>>>>>>>>>> #workers).
>> >>>> >>      >>> > > > >> >>>>>>>>>>>>
>> >>>> >>      >>> > > > >> >>>>>>>>>>>> The workaround is what we see in
>> >>>> >> the
>> >>>> >> ES
>> >>>> >> I/O: a
>> >>>> >>      >>> maxSize
>> >>>> >>      >>> > > > which
>> >>>> >>      >>> > > > >> >>>
>> >>>> >>      >>> > > > >> >>> does
>> >>>> >>      >>> > > > >> >>>>>
>> >>>> >>      >>> > > > >> >>>>> an
>> >>>> >>      >>> > > > >> >>>>>>>>>>>>
>> >>>> >>      >>> > > > >> >>>>>>>>>>>> eager flush. The issue is that then
>> >>>> >> the
>> >>>> >> checkpoint is
>> >>>> >>      >>> > not
>> >>>> >>      >>> > > > >> >>>
>> >>>> >>      >>> > > > >> >>> respected
>> >>>> >>      >>> > > > >> >>>>>>>>>>>>
>> >>>> >>      >>> > > > >> >>>>>>>>>>>> and you can process multiple times
>> >>>> >> the
>> >>>> >> same
>> >>>> >> records.
>> >>>> >>      >>> > > > >> >>>>>>>>>>>>
>> >>>> >>      >>> > > > >> >>>>>>>>>>>> Any plan to make this API reliable
>> >>>> >> and
>> >>>> >> controllable
>> >>>> >>      >>> > from
>> >>>> >>      >>> > > a
>> >>>> >>      >>> > > > >> beam
>> >>>> >>      >>> > > > >> >>>>>>>>
>> >>>> >>      >>> > > > >> >>>>>>>> point
>> >>>> >>      >>> > > > >> >>>>>>>>>>>>
>> >>>> >>      >>> > > > >> >>>>>>>>>>>> of view (at least in a max manner)?
>> >>>> >>      >>> > > > >> >>>>>>>>>>>>
>> >>>> >>      >>> > > > >> >>>>>>>>>>>> Thanks,
>> >>>> >>      >>> > > > >> >>>>>>>>>>>> Romain Manni-Bucau
>> >>>> >>      >>> > > > >> >>>>>>>>>>>> @rmannibucau |  Blog | Old Blog |
>> >>>> >> Github |
>> >>>> >> LinkedIn
>> >>>> >>      >>> > > > >> >>>>>>>>>>>>
>> >>>> >>      >>> > > > >> >>>>>>>>>>>
>> >>>> >>      >>> > > > >> >>>>>>>>>>
>> >>>> >>      >>> > > > >> >>>>>>>>
>> >>>> >>      >>> > > > >> >>>>>
>> >>>> >>      >>> > > > >> >>>>> --
>> >>>> >>      >>> > > > >> >>>>> Jean-Baptiste Onofré
>> >>>> >>      >>> > > > >> >>>>> jbono...@apache.org
>> >>>> >> <mailto:jbono...@apache.org>
>> >>>> >>      >>> > > > >> >>>>> http://blog.nanthrax.net
>> >>>> >>      >>> > > > >> >>>>> Talend - http://www.talend.com
>> >>>> >>      >>> > > > >> >>>>>
>> >>>> >>      >>> > > > >> >>>
>> >>>> >>      >>> > > > >> >>
>> >>>> >>      >>> > > > >> >
>> >>>> >>      >>> > > > >> > --
>> >>>> >>      >>> > > > >> > Jean-Baptiste Onofré
>> >>>> >>      >>> > > > >> > jbono...@apache.org
>> >>>> >> <mailto:jbono...@apache.org>
>> >>>> >>      >>> > > > >> > http://blog.nanthrax.net
>> >>>> >>      >>> > > > >> > Talend - http://www.talend.com
>> >>>> >>      >>> > > > >>
>> >>>> >>      >>> > > >
>> >>>> >>      >>> > >
>> >>>> >>      >>> >
>> >>>> >>      >>>
>> >>>> >>
>> >>>> >>
>> >>>> >
>> >>>> > --
>> >>>> > Jean-Baptiste Onofré
>> >>>> > jbono...@apache.org
>> >>>> > http://blog.nanthrax.net
>> >>>> > Talend - http://www.talend.com
>> >>>>
>> >>>
>> >>>
>> >
>>
>

Reply via email to