@Ben: would all IO be rewritten to use that and the bundle concept dropped from the API to avoid any ambiguity and misleading usage like in current IOs?
Romain Manni-Bucau @rmannibucau | Blog | Old Blog | Github | LinkedIn 2017-11-30 18:43 GMT+01:00 Ben Chambers <bchamb...@google.com>: > Beam includes a GroupIntoBatches transform (see > https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupIntoBatches.java) > which I believe was intended to be used as part of such a portable IO. It > can be used to request that elements are divided into batches of some size > which can then be used for further processing. > > On Thu, Nov 30, 2017 at 9:32 AM Romain Manni-Bucau <rmannibu...@gmail.com> > wrote: >> >> 2017-11-30 18:11 GMT+01:00 Eugene Kirpichov <kirpic...@google.com>: >> > Very strong -1 from me: >> > - Having a pipeline-global parameter is bad because it will apply to all >> > transforms, with no ability to control it for individual transforms. >> > This >> > can go especially poorly because it means that when I write a transform, >> > I >> > don't know whether a user will set this parameter in their pipeline to a >> > value that's perhaps good for the user's transform but really bad for my >> > transform; and the user will likely blame my transform for poor >> > performance. >> > A parameter like this should be set on exactly the thing it applies to: >> > e.g. >> > on the particular IO; and it should be set by the IO itself, not by a >> > user >> > in pipeline options, because the IO author likely knows better than a >> > user >> > what is a good value. >> >> This is true and this is worse today since the user can't tune it but >> the IO doesn't handle it as well. it is up to the runner and none >> implement it in a way which is IO friendly -check flink and spark >> which do the exact opposite, bundle=1 vs bundle=datatset/partitions) >> >> Also note it is a "max" and not an exact value in the proposal. >> >> > - The parameter will not achieve what many IOs want, either. In some >> > cases, >> > you want to limit the number of bytes you write. In some cases, you want >> > to >> > limit the number of values within a key that you write. In some cases, >> > it's >> > something else - it isn't always elements. >> >> Elements is the only thing users can really tune since you can't >> assume the content. >> >> > - The parameter will achieve none of the issues that you I think raised >> > in >> > the thread above: it doesn't give deterministic replay, nor any kind of >> > fault tolerance. >> >> Right, it only partially solves the first issue popping up: the >> chunking. However I think it is a quick win. >> >> > - Having a parameter like this *at all* goes against Beam's "no knobs" >> > philosophy - for all the usual reasons: 1) it encourages users to waste >> > time >> > looking in the wrong places when doing performance tuning: tuning >> > parameters >> > is almost never the best way to improve performance; 2) when users can >> > set a >> > tuning parameter, in my experience it is almost always set wrong, or >> > perhaps >> > it was once set right but then nobody updates it when the use case or >> > implementation changes; and we can end up in a situation where the >> > pipeline >> > is performing poorly because of the parameter but the runner isn't >> > allowed >> > to choose a better value. (in experience with legacy data processing >> > systems >> > in Google, like MapReduce, that support plenty of tuning parameters, a >> > very >> > common advice to someone complaining about a poorly performing job is >> > "have >> > you tried removing all your parameters?") >> >> I would be fine with that but what is the alternative? >> >> > - I still fail to understand the exact issue we're talking about, and >> > I've >> > made a number of suggestions as to how this understanding could be >> > achieved: >> > show code that demonstrates the issue; and show how the code could be >> > improved by a hypothetical API. >> >> First immediately blocking issue is how to batch records reliably and >> *portably* (the biggest beam added-value IMHO). >> Since bundles are "flush" trigger for most IO it means ensuring the >> bundle size is somehow controllable or at least not set to a very >> small value OOTB. >> >> An alternative to this proposal can be to let an IO give an hint about >> its desired bundle size. Would work as well for that particular issue. >> Does it sound better? >> >> > >> > On Thu, Nov 30, 2017 at 6:17 AM Jean-Baptiste Onofré <j...@nanthrax.net> >> > wrote: >> >> >> >> It sounds reasonable to me. >> >> >> >> And agree for Spark, I would like to merge Spark 2 update first. >> >> >> >> Regards >> >> JB >> >> >> >> On 11/30/2017 03:09 PM, Romain Manni-Bucau wrote: >> >> > Guys, >> >> > >> >> > what about moving getMaxBundleSize from flink options to pipeline >> >> > options. I think all runners can support it right? Spark code needs >> >> > the merge of the v2 before being able to be implemented probably but >> >> > I >> >> > don't see any blocker. >> >> > >> >> > wdyt? >> >> > >> >> > Romain Manni-Bucau >> >> > @rmannibucau | Blog | Old Blog | Github | LinkedIn >> >> > >> >> > >> >> > 2017-11-19 8:19 GMT+01:00 Romain Manni-Bucau <rmannibu...@gmail.com>: >> >> >> @Eugene: "workaround" as specific to the IO each time and therefore >> >> >> still highlight a lack in the core. >> >> >> >> >> >> Other comments inline >> >> >> >> >> >> >> >> >> 2017-11-19 7:40 GMT+01:00 Robert Bradshaw >> >> >> <rober...@google.com.invalid>: >> >> >>> There is a possible fourth issue that we don't handle well: >> >> >>> efficiency. For >> >> >>> very large bundles, it may be advantageous to avoid replaying a >> >> >>> bunch >> >> >>> of >> >> >>> idempotent operations if there were a way to record what ones we're >> >> >>> sure >> >> >>> went through. Not sure if that's the issue here (though one could >> >> >>> possibly >> >> >>> do this with SDFs, one can preemptively returning periodically >> >> >>> before >> >> >>> an >> >> >>> element (or portion thereof) is done). >> >> >> >> >> >> +1, also lead to the IO handling its own chunking/bundles and >> >> >> therefore solves all issues at once. >> >> >> >> >> >>> >> >> >>> On Sat, Nov 18, 2017 at 6:58 PM, Eugene Kirpichov < >> >> >>> kirpic...@google.com.invalid> wrote: >> >> >>> >> >> >>>> I disagree that the usage of document id in ES is a "workaround" - >> >> >>>> it >> >> >>>> does >> >> >>>> not address any *accidental *complexity >> >> >>>> <https://en.wikipedia.org/wiki/No_Silver_Bullet> coming from >> >> >>>> shortcomings >> >> >>>> of Beam, it addresses the *essential* complexity that a >> >> >>>> distributed >> >> >>>> system >> >> >>>> forces one to take it as a fact of nature that the same write >> >> >>>> (mutation) will happen multiple times, so if you want a mutation >> >> >>>> to >> >> >>>> happen >> >> >>>> "as-if" it happened exactly once, the mutation itself must be >> >> >>>> idempotent >> >> >>>> <https://en.wikipedia.org/wiki/Idempotence>. Insert-with-id >> >> >>>> (upsert >> >> >>>> <https://en.wikipedia.org/wiki/Merge_(SQL)>) is a classic example >> >> >>>> of >> >> >>>> an >> >> >>>> idempotent mutation, and it's very good that Elasticsearch >> >> >>>> provides >> >> >>>> it - if >> >> >>>> it didn't, no matter how good of an API Beam had, achieving >> >> >>>> exactly-once >> >> >>>> writes would be theoretically impossible. Are we in agreement on >> >> >>>> this >> >> >>>> so >> >> >>>> far? >> >> >>>> >> >> >>>> Next: you seem to be discussing 3 issues together, all of which >> >> >>>> are >> >> >>>> valid >> >> >>>> issues, but they seem unrelated to me: >> >> >>>> 1. Exactly-once mutation >> >> >>>> 2. Batching multiple mutations into one RPC. >> >> >>>> 3. Backpressure >> >> >>>> >> >> >>>> #1: was considered above. The system the IO is talking to has to >> >> >>>> support >> >> >>>> idempotent mutations, in an IO-specific way, and the IO has to >> >> >>>> take >> >> >>>> advantage of them, in the IO-specific way - end of story. >> >> >> >> >> >> Agree but don't forget the original point was about "chunks" and not >> >> >> individual records. >> >> >> >> >> >>>> >> >> >>>> #2: a batch of idempotent operations is also idempotent, so this >> >> >>>> doesn't >> >> >>>> add anything new semantically. Syntactically - Beam already allows >> >> >>>> you to >> >> >>>> write your own batching by notifying you of permitted batch >> >> >>>> boundaries >> >> >>>> (Start/FinishBundle). Sure, it could do more, but from my >> >> >>>> experience >> >> >>>> the >> >> >>>> batching in IOs I've seen is one of the easiest and least >> >> >>>> error-prone >> >> >>>> parts, so I don't see something worth an extended discussion here. >> >> >> >> >> >> "Beam already allows you to >> >> >> write your own batching by notifying you of permitted batch >> >> >> boundaries >> >> >> (Start/FinishBundle)" >> >> >> >> >> >> Is wrong since the bundle is potentially the whole PCollection >> >> >> (spark) >> >> >> so this is not even an option until you use the SDF (back to the >> >> >> same >> >> >> point). >> >> >> Once again the API looks fine but no implementation makes it true. >> >> >> It >> >> >> would be easy to change it in spark, flink can be ok since it >> >> >> targets >> >> >> more the streaming case, not sure of others, any idea? >> >> >> >> >> >> >> >> >>>> >> >> >>>> #3: handling backpressure is a complex problem with multiple >> >> >>>> facets: >> >> >>>> 1) how >> >> >>>> do you know you're being throttled, and by how much are you >> >> >>>> exceeding >> >> >>>> the >> >> >>>> external system's capacity? >> >> >> >> >> >> This is the whole point of backpressure, the system sends it back to >> >> >> you (header like or status technic in general) >> >> >> >> >> >>>> 2) how do you communicate this signal to the >> >> >>>> runner? >> >> >> >> >> >> You are a client so you get the meta in the response - whatever >> >> >> techno. >> >> >> >> >> >>>> 3) what does the runner do in response? >> >> >> >> >> >> Runner nothing but the IO adapts its handling as mentionned before >> >> >> (wait and retry, skip, ... depending the config) >> >> >> >> >> >>>> 4) how do you wait until >> >> >>>> it's ok to try again? >> >> >> >> >> >> This is one point to probably enhance in beam but waiting in the >> >> >> processing is an option if the source has some buffering otherwise >> >> >> it >> >> >> requires to have a buffer fallback and max size if the wait mode is >> >> >> activated. >> >> >> >> >> >>>> You seem to be advocating for solving one facet of this problem, >> >> >>>> which is: >> >> >>>> you want it to be possible to signal to the runner "I'm being >> >> >>>> throttled, >> >> >>>> please end the bundle", right? If so - I think this (ending the >> >> >>>> bundle) is >> >> >>>> unnecessary: the DoFn can simply do an exponential back-off sleep >> >> >>>> loop. >> >> >> >> >> >> Agree, never said the runner should know but GBK+output doesnt work >> >> >> cause you dont own the GBK. >> >> >> >> >> >>>> This is e.g. what DatastoreIO does >> >> >>>> <https://github.com/apache/beam/blob/master/sdks/java/io/ >> >> >>>> google-cloud-platform/src/main/java/org/apache/beam/sdk/ >> >> >>>> io/gcp/datastore/DatastoreV1.java#L1318> >> >> >>>> and >> >> >>>> this is in general how most systems I've seen handle backpressure. >> >> >>>> Is >> >> >>>> there >> >> >>>> something I'm missing? In particular, is there any compelling >> >> >>>> reason >> >> >>>> why >> >> >>>> you think it'd be beneficial e.g. for DatastoreIO to commit the >> >> >>>> results of >> >> >>>> the bundle so far before processing other elements? >> >> >> >> >> >> It was more about ensuring you validate early a subset of the whole >> >> >> bundle and avoid to reprocess it if it fails later. >> >> >> >> >> >> >> >> >> So to summarize I see 2 outcomes: >> >> >> >> >> >> 1. impl SDF in all runners >> >> >> 2. make the bundle size upper bounded - through a pipeline option - >> >> >> in >> >> >> all runners, not sure this one is doable everywhere since I mainly >> >> >> checked spark case >> >> >> >> >> >>>> >> >> >>>> Again, it might be that I'm still misunderstanding what you're >> >> >>>> trying >> >> >>>> to >> >> >>>> say. One of the things it would help to clarify would be - exactly >> >> >>>> what do >> >> >>>> you mean by "how batch frameworks solved that for years": can you >> >> >>>> point at >> >> >>>> an existing API in some other framework that achieves what you >> >> >>>> want? >> >> >>>> >> >> >>>> On Sat, Nov 18, 2017 at 1:02 PM Romain Manni-Bucau >> >> >>>> <rmannibu...@gmail.com> >> >> >>>> wrote: >> >> >>>> >> >> >>>>> Eugene, point - and issue with a single sample - is you can >> >> >>>>> always >> >> >>>>> find >> >> >>>>> *workarounds* on a case by case basis as the id one with ES but >> >> >>>>> beam >> >> >>>> doesnt >> >> >>>>> solve the problem as a framework. >> >> >>>>> >> >> >>>>> From my past, I clearly dont see how batch frameworks solved >> >> >>>>> that >> >> >>>>> for >> >> >>>> years >> >> >>>>> and beam is not able to do it - keep in mind it is the same kind >> >> >>>>> of >> >> >>>> techno, >> >> >>>>> it just uses different sources and bigger clusters so no real >> >> >>>>> reason >> >> >>>>> to >> >> >>>> not >> >> >>>>> have the same feature quality. The only potential reason i see is >> >> >>>>> there >> >> >>>> is >> >> >>>>> no tracking of the state into the cluster - e2e. But i dont see >> >> >>>>> why >> >> >>>>> there >> >> >>>>> wouldnt be. Do I miss something here? >> >> >>>>> >> >> >>>>> An example could be: take a github crawler computing stats on the >> >> >>>>> whole >> >> >>>>> girhub repos which is based on a rest client as example. You will >> >> >>>>> need to >> >> >>>>> handle the rate limit and likely want to "commit" each time you >> >> >>>>> reach a >> >> >>>>> rate limit with likely some buffering strategy with a max size >> >> >>>>> before >> >> >>>>> really waiting. How do you do it with a GBK independent of your >> >> >>>>> dofn? You >> >> >>>>> are not able to compose correctly the fn between them :(. >> >> >>>>> >> >> >>>>> >> >> >>>>> Le 18 nov. 2017 20:48, "Eugene Kirpichov" >> >> >>>>> <kirpic...@google.com.invalid> >> >> >>>> a >> >> >>>>> écrit : >> >> >>>>> >> >> >>>>> After giving this thread my best attempt at understanding exactly >> >> >>>>> what is >> >> >>>>> the problem and the proposed solution, I'm afraid I still fail to >> >> >>>>> understand both. To reiterate, I think the only way to make >> >> >>>>> progress >> >> >>>>> here >> >> >>>>> is to be more concrete: (quote) take some IO that you think could >> >> >>>>> be >> >> >>>> easier >> >> >>>>> to write with your proposed API, give the contents of a >> >> >>>>> hypothetical >> >> >>>>> PCollection being written to this IO, give the code of a >> >> >>>>> hypothetical >> >> >>>> DoFn >> >> >>>>> implementing the write using your API, and explain what you'd >> >> >>>>> expect >> >> >>>>> to >> >> >>>>> happen at runtime. I'm going to re-engage in this thread after >> >> >>>>> such >> >> >>>>> an >> >> >>>>> example is given. >> >> >>>>> >> >> >>>>> On Sat, Nov 18, 2017, 5:00 AM Romain Manni-Bucau >> >> >>>>> <rmannibu...@gmail.com> >> >> >>>>> wrote: >> >> >>>>> >> >> >>>>>> First bundle retry is unusable with dome runners like spark >> >> >>>>>> where >> >> >>>>>> the >> >> >>>>>> bundle size is the collection size / number of work. This means >> >> >>>>>> a >> >> >>>>>> user >> >> >>>>> cant >> >> >>>>>> use bundle API or feature reliably and portably - which is beam >> >> >>>> promise. >> >> >>>>>> Aligning chunking and bundles would guarantee that bit can be >> >> >>>>>> not >> >> >>>>> desired, >> >> >>>>>> that is why i thought it can be another feature. >> >> >>>>>> >> >> >>>>>> GBK works until the IO knows about that and both concepts are >> >> >>>>>> not >> >> >>>> always >> >> >>>>>> orthogonal - backpressure like systems is a trivial common >> >> >>>>>> example. >> >> >>>> This >> >> >>>>>> means the IO (dofn) must be able to do it itself at some point. >> >> >>>>>> >> >> >>>>>> Also note the GBK works only if the IO can take a list which is >> >> >>>>>> never >> >> >>>> the >> >> >>>>>> case today. >> >> >>>>>> >> >> >>>>>> Big questions for me are: is SDF the way to go since it provides >> >> >>>>>> the >> >> >>>>> needed >> >> >>>>>> API bit is not yet supported? What about existing IO? Should >> >> >>>>>> beam >> >> >>>> provide >> >> >>>>>> an auto wrapping of dofn for that pre-aggregated support and >> >> >>>>>> simulate >> >> >>>>>> bundles to the actual IO impl to keep the existing API? >> >> >>>>>> >> >> >>>>>> >> >> >>>>>> Le 17 nov. 2017 19:20, "Raghu Angadi" >> >> >>>>>> <rang...@google.com.invalid> >> >> >>>>>> a >> >> >>>>>> écrit : >> >> >>>>>> >> >> >>>>>> On Fri, Nov 17, 2017 at 1:02 AM, Romain Manni-Bucau < >> >> >>>>> rmannibu...@gmail.com >> >> >>>>>>> >> >> >>>>>> wrote: >> >> >>>>>> >> >> >>>>>>> Yep, just take ES IO, if a part of a bundle fails you are in an >> >> >>>>>>> unmanaged state. This is the case for all O (of IO ;)). Issue >> >> >>>>>>> is >> >> >>>>>>> not >> >> >>>>>>> much about "1" (the code it takes) but more the fact it doesn't >> >> >>>>>>> integrate with runner features and retries potentially: what >> >> >>>>>>> happens >> >> >>>>>>> if a bundle has a failure? => undefined today. 2. I'm fine with >> >> >>>>>>> it >> >> >>>>>>> while we know exactly what happens when we restart after a >> >> >>>>>>> bundle >> >> >>>>>>> failure. With ES the timestamp can be used for instance. >> >> >>>>>>> >> >> >>>>>> >> >> >>>>>> This deterministic batching can be achieved even now with an >> >> >>>>>> extra >> >> >>>>>> GroupByKey (and if you want ordering on top of that, will need >> >> >>>>>> another >> >> >>>>>> GBK). Don't know if that is too costly in your case. I would >> >> >>>>>> need >> >> >>>>>> bit >> >> >>>>> more >> >> >>>>>> details on handling ES IO write retries to see it could be >> >> >>>>>> simplified. >> >> >>>>> Note >> >> >>>>>> that retries occur with or without any failures in your DoFn. >> >> >>>>>> >> >> >>>>>> The biggest negative with GBK approach is that it doesn't >> >> >>>>>> provide >> >> >>>>>> same >> >> >>>>>> guarantees on Flink. >> >> >>>>>> >> >> >>>>>> I don't see how GroubIntoBatches in Beam provides specific >> >> >>>>>> guarantees >> >> >>>> on >> >> >>>>>> deterministic batches. >> >> >>>>>> >> >> >>>>>> Thinking about it the SDF is really a way to do it since the SDF >> >> >>>>>> will >> >> >>>>>>> manage the bulking and associated with the runner "retry" it >> >> >>>>>>> seems >> >> >>>>>>> it >> >> >>>>>>> covers the needs. >> >> >>>>>>> >> >> >>>>>>> Romain Manni-Bucau >> >> >>>>>>> @rmannibucau | Blog | Old Blog | Github | LinkedIn >> >> >>>>>>> >> >> >>>>>>> >> >> >>>>>>> 2017-11-17 9:23 GMT+01:00 Eugene Kirpichov >> >> >>>>> <kirpic...@google.com.invalid >> >> >>>>>>> : >> >> >>>>>>>> I must admit I'm still failing to understand the problem, so >> >> >>>>>>>> let's >> >> >>>>> step >> >> >>>>>>>> back even further. >> >> >>>>>>>> >> >> >>>>>>>> Could you give an example of an IO that is currently difficult >> >> >>>>>>>> to >> >> >>>>>>> implement >> >> >>>>>>>> specifically because of lack of the feature you're talking >> >> >>>>>>>> about? >> >> >>>>>>>> >> >> >>>>>>>> I'm asking because I've reviewed almost all Beam IOs and don't >> >> >>>> recall >> >> >>>>>>>> seeing a similar problem. Sure, a lot of IOs do batching >> >> >>>>>>>> within a >> >> >>>>>> bundle, >> >> >>>>>>>> but 1) it doesn't take up much code (granted, it would be even >> >> >>>> easier >> >> >>>>>> if >> >> >>>>>>>> Beam did it for us) and 2) I don't remember any of them >> >> >>>>>>>> requiring >> >> >>>> the >> >> >>>>>>>> batches to be deterministic, and I'm having a hard time >> >> >>>>>>>> imagining >> >> >>>>> what >> >> >>>>>>> kind >> >> >>>>>>>> of storage system would be able to deduplicate if batches were >> >> >>>>>>>> deterministic but wouldn't be able to deduplicate if they >> >> >>>>>>>> weren't. >> >> >>>>>>>> >> >> >>>>>>>> On Thu, Nov 16, 2017 at 11:50 PM Romain Manni-Bucau < >> >> >>>>>>> rmannibu...@gmail.com> >> >> >>>>>>>> wrote: >> >> >>>>>>>> >> >> >>>>>>>>> Ok, let me try to step back and summarize what we have today >> >> >>>>>>>>> and >> >> >>>>> what >> >> >>>>>> I >> >> >>>>>>>>> miss: >> >> >>>>>>>>> >> >> >>>>>>>>> 1. we can handle chunking in beam through group in batch (or >> >> >>>>>> equivalent) >> >> >>>>>>>>> but: >> >> >>>>>>>>> > it is not built-in into the transforms (IO) and it is >> >> >>>>> controlled >> >> >>>>>>>>> from outside the transforms so no way for a transform to do >> >> >>>>>>>>> it >> >> >>>>>>>>> properly without handling itself a composition and links >> >> >>>>>>>>> between >> >> >>>>>>>>> multiple dofns to have notifications and potentially react >> >> >>>> properly >> >> >>>>> or >> >> >>>>>>>>> handle backpressure from its backend >> >> >>>>>>>>> 2. there is no restart feature because there is no real state >> >> >>>>> handling >> >> >>>>>>>>> at the moment. this sounds fully delegated to the runner but >> >> >>>>>>>>> I >> >> >>>>>>>>> was >> >> >>>>>>>>> hoping to have more guarantees from the used API to be able >> >> >>>>>>>>> to >> >> >>>>> restart >> >> >>>>>>>>> a pipeline (mainly batch since it can be irrelevant or >> >> >>>>>>>>> delegates >> >> >>>> to >> >> >>>>>>>>> the backend for streams) and handle only not commited records >> >> >>>>>>>>> so >> >> >>>> it >> >> >>>>>>>>> requires some persistence outside the main IO storages to do >> >> >>>>>>>>> it >> >> >>>>>>>>> properly >> >> >>>>>>>>> > note this is somehow similar to the monitoring topic >> >> >>>>>>>>> which >> >> >>>> miss >> >> >>>>>>>>> persistence ATM so it can end up to beam to have a pluggable >> >> >>>> storage >> >> >>>>>>>>> for a few concerns >> >> >>>>>>>>> >> >> >>>>>>>>> >> >> >>>>>>>>> Short term I would be happy with 1 solved properly, long term >> >> >>>>>>>>> I >> >> >>>> hope >> >> >>>>> 2 >> >> >>>>>>>>> will be tackled without workarounds requiring custom wrapping >> >> >>>>>>>>> of >> >> >>>> IO >> >> >>>>> to >> >> >>>>>>>>> use a custom state persistence. >> >> >>>>>>>>> >> >> >>>>>>>>> >> >> >>>>>>>>> >> >> >>>>>>>>> Romain Manni-Bucau >> >> >>>>>>>>> @rmannibucau | Blog | Old Blog | Github | LinkedIn >> >> >>>>>>>>> >> >> >>>>>>>>> >> >> >>>>>>>>> 2017-11-17 7:44 GMT+01:00 Jean-Baptiste Onofré >> >> >>>>>>>>> <j...@nanthrax.net>: >> >> >>>>>>>>>> Thanks for the explanation. Agree, we might talk about >> >> >>>>>>>>>> different >> >> >>>>>>> things >> >> >>>>>>>>>> using the same wording. >> >> >>>>>>>>>> >> >> >>>>>>>>>> I'm also struggling to understand the use case (for a >> >> >>>>>>>>>> generic >> >> >>>>> DoFn). >> >> >>>>>>>>>> >> >> >>>>>>>>>> Regards >> >> >>>>>>>>>> JB >> >> >>>>>>>>>> >> >> >>>>>>>>>> >> >> >>>>>>>>>> On 11/17/2017 07:40 AM, Eugene Kirpichov wrote: >> >> >>>>>>>>>>> >> >> >>>>>>>>>>> To avoid spending a lot of time pursuing a false path, I'd >> >> >>>>>>>>>>> like >> >> >>>>> to >> >> >>>>>>> say >> >> >>>>>>>>>>> straight up that SDF is definitely not going to help here, >> >> >>>>> despite >> >> >>>>>>> the >> >> >>>>>>>>>>> fact >> >> >>>>>>>>>>> that its API includes the term "checkpoint". In SDF, the >> >> >>>>>> "checkpoint" >> >> >>>>>>>>>>> captures the state of processing within a single element. >> >> >>>>>>>>>>> If >> >> >>>>> you're >> >> >>>>>>>>>>> applying an SDF to 1000 elements, it will, like any other >> >> >>>>>>>>>>> DoFn, >> >> >>>>> be >> >> >>>>>>>>> applied >> >> >>>>>>>>>>> to each of them independently and in parallel, and you'll >> >> >>>>>>>>>>> have >> >> >>>>> 1000 >> >> >>>>>>>>>>> checkpoints capturing the state of processing each of these >> >> >>>>>> elements, >> >> >>>>>>>>>>> which >> >> >>>>>>>>>>> is probably not what you want. >> >> >>>>>>>>>>> >> >> >>>>>>>>>>> I'm afraid I still don't understand what kind of checkpoint >> >> >>>>>>>>>>> you >> >> >>>>>>> need, if >> >> >>>>>>>>>>> it >> >> >>>>>>>>>>> is not just deterministic grouping into batches. >> >> >>>>>>>>>>> "Checkpoint" >> >> >>>> is >> >> >>>>> a >> >> >>>>>>> very >> >> >>>>>>>>>>> broad term and it's very possible that everybody in this >> >> >>>>>>>>>>> thread >> >> >>>>> is >> >> >>>>>>>>> talking >> >> >>>>>>>>>>> about different things when saying it. So it would help if >> >> >>>>>>>>>>> you >> >> >>>>>> could >> >> >>>>>>>>> give >> >> >>>>>>>>>>> a >> >> >>>>>>>>>>> more concrete example: for example, take some IO that you >> >> >>>>>>>>>>> think >> >> >>>>>>> could be >> >> >>>>>>>>>>> easier to write with your proposed API, give the contents >> >> >>>>>>>>>>> of a >> >> >>>>>>>>>>> hypothetical >> >> >>>>>>>>>>> PCollection being written to this IO, give the code of a >> >> >>>>>> hypothetical >> >> >>>>>>>>> DoFn >> >> >>>>>>>>>>> implementing the write using your API, and explain what >> >> >>>>>>>>>>> you'd >> >> >>>>>> expect >> >> >>>>>>> to >> >> >>>>>>>>>>> happen at runtime. >> >> >>>>>>>>>>> >> >> >>>>>>>>>>> On Thu, Nov 16, 2017 at 10:33 PM Romain Manni-Bucau >> >> >>>>>>>>>>> <rmannibu...@gmail.com> >> >> >>>>>>>>>>> wrote: >> >> >>>>>>>>>>> >> >> >>>>>>>>>>>> @Eugene: yes and the other alternative of Reuven too but >> >> >>>>>>>>>>>> it >> >> >>>>>>>>>>>> is >> >> >>>>>> still >> >> >>>>>>>>>>>> 1. relying on timers, 2. not really checkpointed >> >> >>>>>>>>>>>> >> >> >>>>>>>>>>>> In other words it seems all solutions are to create a >> >> >>>>>>>>>>>> chunk >> >> >>>>>>>>>>>> of >> >> >>>>>> size >> >> >>>>>>> 1 >> >> >>>>>>>>>>>> and replayable to fake the lack of chunking in the >> >> >>>>>>>>>>>> framework. >> >> >>>>> This >> >> >>>>>>>>>>>> always implies a chunk handling outside the component >> >> >>>> (typically >> >> >>>>>>>>>>>> before for an output). My point is I think IO need it in >> >> >>>>>>>>>>>> their >> >> >>>>> own >> >> >>>>>>>>>>>> "internal" or at least control it themselves since the >> >> >>>>>>>>>>>> chunk >> >> >>>>> size >> >> >>>>>> is >> >> >>>>>>>>>>>> part of the IO handling most of the time. >> >> >>>>>>>>>>>> >> >> >>>>>>>>>>>> I think JB spoke of the same "group before" trick using >> >> >>>>>> restrictions >> >> >>>>>>>>>>>> which can work I have to admit if SDF are implemented by >> >> >>>>> runners. >> >> >>>>>> Is >> >> >>>>>>>>>>>> there a roadmap/status on that? Last time I checked SDF >> >> >>>>>>>>>>>> was a >> >> >>>>>> great >> >> >>>>>>>>>>>> API without support :(. >> >> >>>>>>>>>>>> >> >> >>>>>>>>>>>> >> >> >>>>>>>>>>>> >> >> >>>>>>>>>>>> Romain Manni-Bucau >> >> >>>>>>>>>>>> @rmannibucau | Blog | Old Blog | Github | LinkedIn >> >> >>>>>>>>>>>> >> >> >>>>>>>>>>>> >> >> >>>>>>>>>>>> 2017-11-17 7:25 GMT+01:00 Eugene Kirpichov >> >> >>>>>>>>>>>> <kirpic...@google.com.invalid>: >> >> >>>>>>>>>>>>> >> >> >>>>>>>>>>>>> JB, not sure what you mean? SDFs and triggers are >> >> >>>>>>>>>>>>> unrelated, >> >> >>>>> and >> >> >>>>>>> the >> >> >>>>>>>>>>>>> post >> >> >>>>>>>>>>>>> doesn't mention the word. Did you mean something else, >> >> >>>>>>>>>>>>> e.g. >> >> >>>>>>>>> restriction >> >> >>>>>>>>>>>>> perhaps? Either way I don't think SDFs are the solution >> >> >>>>>>>>>>>>> here; >> >> >>>>>> SDFs >> >> >>>>>>>>> have >> >> >>>>>>>>>>>> >> >> >>>>>>>>>>>> to >> >> >>>>>>>>>>>>> >> >> >>>>>>>>>>>>> do with the ability to split the processing of *a single >> >> >>>>> element* >> >> >>>>>>> over >> >> >>>>>>>>>>>>> multiple calls, whereas Romain I think is asking for >> >> >>>> repeatable >> >> >>>>>>>>> grouping >> >> >>>>>>>>>>>> >> >> >>>>>>>>>>>> of >> >> >>>>>>>>>>>>> >> >> >>>>>>>>>>>>> *multiple* elements. >> >> >>>>>>>>>>>>> >> >> >>>>>>>>>>>>> Romain - does >> >> >>>>>>>>>>>>> >> >> >>>>>>>>>>>> >> >> >>>>>>>>>>>> >> >> >>>>>>>>> https://github.com/apache/beam/blob/master/sdks/java/ >> >> >>>>>>> core/src/main/java/org/apache/beam/sdk/transforms/ >> >> >>>> GroupIntoBatches.java >> >> >>>>>>>>>>>>> >> >> >>>>>>>>>>>>> do what >> >> >>>>>>>>>>>>> you want? >> >> >>>>>>>>>>>>> >> >> >>>>>>>>>>>>> On Thu, Nov 16, 2017 at 10:19 PM Jean-Baptiste Onofré < >> >> >>>>>>>>> j...@nanthrax.net> >> >> >>>>>>>>>>>>> wrote: >> >> >>>>>>>>>>>>> >> >> >>>>>>>>>>>>>> It sounds like the "Trigger" in the Splittable DoFn, no >> >> >>>>>>>>>>>>>> ? >> >> >>>>>>>>>>>>>> >> >> >>>>>>>>>>>>>> >> >> >>>>>>>>>>>>>> https://beam.apache.org/blog/2017/08/16/splittable-do-fn. >> >> >>>> html >> >> >>>>>>>>>>>>>> >> >> >>>>>>>>>>>>>> Regards >> >> >>>>>>>>>>>>>> JB >> >> >>>>>>>>>>>>>> >> >> >>>>>>>>>>>>>> >> >> >>>>>>>>>>>>>> On 11/17/2017 06:56 AM, Romain Manni-Bucau wrote: >> >> >>>>>>>>>>>>>>> >> >> >>>>>>>>>>>>>>> it gives the fn/transform the ability to save a state - >> >> >>>>>>>>>>>>>>> it >> >> >>>>> can >> >> >>>>>>> get >> >> >>>>>>>>>>>>>>> back on "restart" / whatever unit we can use, probably >> >> >>>> runner >> >> >>>>>>>>>>>>>>> dependent? Without that you need to rewrite all IO >> >> >>>>>>>>>>>>>>> usage >> >> >>>> with >> >> >>>>>>>>>>>>>>> something like the previous pattern which makes the IO >> >> >>>>>>>>>>>>>>> not >> >> >>>>> self >> >> >>>>>>>>>>>>>>> sufficient and kind of makes the entry cost and usage >> >> >>>>>>>>>>>>>>> of >> >> >>>> beam >> >> >>>>>> way >> >> >>>>>>>>>>>>>>> further. >> >> >>>>>>>>>>>>>>> >> >> >>>>>>>>>>>>>>> In my mind it is exactly what jbatch/spring-batch uses >> >> >>>>>>>>>>>>>>> but >> >> >>>>>>> adapted >> >> >>>>>>>>> to >> >> >>>>>>>>>>>>>>> beam (stream in particular) case. >> >> >>>>>>>>>>>>>>> >> >> >>>>>>>>>>>>>>> Romain Manni-Bucau >> >> >>>>>>>>>>>>>>> @rmannibucau | Blog | Old Blog | Github | LinkedIn >> >> >>>>>>>>>>>>>>> >> >> >>>>>>>>>>>>>>> >> >> >>>>>>>>>>>>>>> 2017-11-17 6:49 GMT+01:00 Reuven Lax >> >> >>>>> <re...@google.com.invalid >> >> >>>>>>> : >> >> >>>>>>>>>>>>>>>> >> >> >>>>>>>>>>>>>>>> Romain, >> >> >>>>>>>>>>>>>>>> >> >> >>>>>>>>>>>>>>>> Can you define what you mean by checkpoint? What are >> >> >>>>>>>>>>>>>>>> the >> >> >>>>>>> semantics, >> >> >>>>>>>>>>>> >> >> >>>>>>>>>>>> what >> >> >>>>>>>>>>>>>>>> >> >> >>>>>>>>>>>>>>>> does it accomplish? >> >> >>>>>>>>>>>>>>>> >> >> >>>>>>>>>>>>>>>> Reuven >> >> >>>>>>>>>>>>>>>> >> >> >>>>>>>>>>>>>>>> On Fri, Nov 17, 2017 at 1:40 PM, Romain Manni-Bucau < >> >> >>>>>>>>>>>>>> >> >> >>>>>>>>>>>>>> rmannibu...@gmail.com> >> >> >>>>>>>>>>>>>>>> >> >> >>>>>>>>>>>>>>>> wrote: >> >> >>>>>>>>>>>>>>>> >> >> >>>>>>>>>>>>>>>>> Yes, what I propose earlier was: >> >> >>>>>>>>>>>>>>>>> >> >> >>>>>>>>>>>>>>>>> I. checkpoint marker: >> >> >>>>>>>>>>>>>>>>> >> >> >>>>>>>>>>>>>>>>> @AnyBeamAnnotation >> >> >>>>>>>>>>>>>>>>> @CheckpointAfter >> >> >>>>>>>>>>>>>>>>> public void someHook(SomeContext ctx); >> >> >>>>>>>>>>>>>>>>> >> >> >>>>>>>>>>>>>>>>> >> >> >>>>>>>>>>>>>>>>> II. pipeline.apply(ParDo.of(new >> >> >>>>>>>>> MyFn()).withCheckpointAlgorithm(new >> >> >>>>>>>>>>>>>>>>> CountingAlgo())) >> >> >>>>>>>>>>>>>>>>> >> >> >>>>>>>>>>>>>>>>> III. (I like this one less) >> >> >>>>>>>>>>>>>>>>> >> >> >>>>>>>>>>>>>>>>> // in the dofn >> >> >>>>>>>>>>>>>>>>> @CheckpointTester >> >> >>>>>>>>>>>>>>>>> public boolean shouldCheckpoint(); >> >> >>>>>>>>>>>>>>>>> >> >> >>>>>>>>>>>>>>>>> IV. @Checkpointer Serializable getCheckpoint(); in >> >> >>>>>>>>>>>>>>>>> the >> >> >>>> dofn >> >> >>>>>> per >> >> >>>>>>>>>>>> >> >> >>>>>>>>>>>> element >> >> >>>>>>>>>>>>>>>>> >> >> >>>>>>>>>>>>>>>>> >> >> >>>>>>>>>>>>>>>>> >> >> >>>>>>>>>>>>>>>>> >> >> >>>>>>>>>>>>>>>>> >> >> >>>>>>>>>>>>>>>>> Romain Manni-Bucau >> >> >>>>>>>>>>>>>>>>> @rmannibucau | Blog | Old Blog | Github | LinkedIn >> >> >>>>>>>>>>>>>>>>> >> >> >>>>>>>>>>>>>>>>> >> >> >>>>>>>>>>>>>>>>> 2017-11-17 6:06 GMT+01:00 Raghu Angadi >> >> >>>>>>> <rang...@google.com.invalid >> >> >>>>>>>>>>>>> >> >> >>>>>>>>>>>>> : >> >> >>>>>>>>>>>>>>>>>> >> >> >>>>>>>>>>>>>>>>>> How would you define it (rough API is fine)?. >> >> >>>>>>>>>>>>>>>>>> Without >> >> >>>> more >> >> >>>>>>>>> details, >> >> >>>>>>>>>>>>>> >> >> >>>>>>>>>>>>>> it is >> >> >>>>>>>>>>>>>>>>>> >> >> >>>>>>>>>>>>>>>>>> not easy to see wider applicability and feasibility >> >> >>>>>>>>>>>>>>>>>> in >> >> >>>>>>> runners. >> >> >>>>>>>>>>>>>>>>>> >> >> >>>>>>>>>>>>>>>>>> On Thu, Nov 16, 2017 at 1:13 PM, Romain Manni-Bucau >> >> >>>>>>>>>>>>>>>>>> < >> >> >>>>>>>>>>>>>>>>> >> >> >>>>>>>>>>>>>>>>> rmannibu...@gmail.com> >> >> >>>>>>>>>>>>>>>>>> >> >> >>>>>>>>>>>>>>>>>> wrote: >> >> >>>>>>>>>>>>>>>>>> >> >> >>>>>>>>>>>>>>>>>>> This is a fair summary of the current state but >> >> >>>>>>>>>>>>>>>>>>> also >> >> >>>>> where >> >> >>>>>>> beam >> >> >>>>>>>>>>>> >> >> >>>>>>>>>>>> can >> >> >>>>>>>>>>>>>>>>> >> >> >>>>>>>>>>>>>>>>> have a >> >> >>>>>>>>>>>>>>>>>>> >> >> >>>>>>>>>>>>>>>>>>> very strong added value and make big data great and >> >> >>>>> smooth. >> >> >>>>>>>>>>>>>>>>>>> >> >> >>>>>>>>>>>>>>>>>>> Instead of this replay feature isnt checkpointing >> >> >>>>> willable? >> >> >>>>>>> In >> >> >>>>>>>>>>>>>>>>> >> >> >>>>>>>>>>>>>>>>> particular >> >> >>>>>>>>>>>>>>>>>>> >> >> >>>>>>>>>>>>>>>>>>> with SDF no? >> >> >>>>>>>>>>>>>>>>>>> >> >> >>>>>>>>>>>>>>>>>>> >> >> >>>>>>>>>>>>>>>>>>> Le 16 nov. 2017 19:50, "Raghu Angadi" >> >> >>>>>>>>> <rang...@google.com.invalid> >> >> >>>>>>>>>>>> >> >> >>>>>>>>>>>> a >> >> >>>>>>>>>>>>>>>>>>> >> >> >>>>>>>>>>>>>>>>>>> écrit : >> >> >>>>>>>>>>>>>>>>>>> >> >> >>>>>>>>>>>>>>>>>>>> Core issue here is that there is no explicit >> >> >>>>>>>>>>>>>>>>>>>> concept >> >> >>>> of >> >> >>>>>>>>>>>> >> >> >>>>>>>>>>>> 'checkpoint' >> >> >>>>>>>>>>>>>>>>> >> >> >>>>>>>>>>>>>>>>> in >> >> >>>>>>>>>>>>>>>>>>>> >> >> >>>>>>>>>>>>>>>>>>>> Beam (UnboundedSource has a method >> >> >>>>>>>>>>>>>>>>>>>> 'getCheckpointMark' >> >> >>>>> but >> >> >>>>>>> that >> >> >>>>>>>>>>>>>>>>> >> >> >>>>>>>>>>>>>>>>> refers to >> >> >>>>>>>>>>>>>>>>>>>> >> >> >>>>>>>>>>>>>>>>>>>> the checkoint on external source). Runners do >> >> >>>> checkpoint >> >> >>>>>>>>>>>> >> >> >>>>>>>>>>>> internally >> >> >>>>>>>>>>>>>> >> >> >>>>>>>>>>>>>> as >> >> >>>>>>>>>>>>>>>>>>>> >> >> >>>>>>>>>>>>>>>>>>>> implementation detail. Flink's checkpoint model is >> >> >>>>>> entirely >> >> >>>>>>>>>>>>>> >> >> >>>>>>>>>>>>>> different >> >> >>>>>>>>>>>>>>>>>>> >> >> >>>>>>>>>>>>>>>>>>> from >> >> >>>>>>>>>>>>>>>>>>>> >> >> >>>>>>>>>>>>>>>>>>>> Dataflow's and Spark's. >> >> >>>>>>>>>>>>>>>>>>>> >> >> >>>>>>>>>>>>>>>>>>>> @StableReplay helps, but it does not explicitly >> >> >>>>>>>>>>>>>>>>>>>> talk >> >> >>>>> about >> >> >>>>>> a >> >> >>>>>>>>>>>>>>>>> >> >> >>>>>>>>>>>>>>>>> checkpoint >> >> >>>>>>>>>>>>>>>>>>> >> >> >>>>>>>>>>>>>>>>>>> by >> >> >>>>>>>>>>>>>>>>>>>> >> >> >>>>>>>>>>>>>>>>>>>> design. >> >> >>>>>>>>>>>>>>>>>>>> >> >> >>>>>>>>>>>>>>>>>>>> If you are looking to achieve some guarantees with >> >> >>>>>>>>>>>>>>>>>>>> a >> >> >>>>>>>>> sink/DoFn, I >> >> >>>>>>>>>>>>>>>>> >> >> >>>>>>>>>>>>>>>>> think >> >> >>>>>>>>>>>>>>>>>>> >> >> >>>>>>>>>>>>>>>>>>> it >> >> >>>>>>>>>>>>>>>>>>>> >> >> >>>>>>>>>>>>>>>>>>>> is better to start with the requirements. I worked >> >> >>>>>>>>>>>>>>>>>>>> on >> >> >>>>>>>>>>>> >> >> >>>>>>>>>>>> exactly-once >> >> >>>>>>>>>>>>>>>>> >> >> >>>>>>>>>>>>>>>>> sink >> >> >>>>>>>>>>>>>>>>>>> >> >> >>>>>>>>>>>>>>>>>>> for >> >> >>>>>>>>>>>>>>>>>>>> >> >> >>>>>>>>>>>>>>>>>>>> Kafka (see KafkaIO.write().withEOS()), where we >> >> >>>>>> essentially >> >> >>>>>>>>>>>> >> >> >>>>>>>>>>>> reshard >> >> >>>>>>>>>>>>>>>>> >> >> >>>>>>>>>>>>>>>>> the >> >> >>>>>>>>>>>>>>>>>>>> >> >> >>>>>>>>>>>>>>>>>>>> elements and assign sequence numbers to elements >> >> >>>>>>>>>>>>>>>>>>>> with >> >> >>>> in >> >> >>>>>>> each >> >> >>>>>>>>>>>> >> >> >>>>>>>>>>>> shard. >> >> >>>>>>>>>>>>>>>>>>>> >> >> >>>>>>>>>>>>>>>>>>>> Duplicates in replays are avoided based on these >> >> >>>>> sequence >> >> >>>>>>>>>>>> >> >> >>>>>>>>>>>> numbers. >> >> >>>>>>>>>>>>>>>>> >> >> >>>>>>>>>>>>>>>>> DoFn >> >> >>>>>>>>>>>>>>>>>>>> >> >> >>>>>>>>>>>>>>>>>>>> state API is used to buffer out-of order replays. >> >> >>>>>>>>>>>>>>>>>>>> The >> >> >>>>>>>>>>>> >> >> >>>>>>>>>>>> implementation >> >> >>>>>>>>>>>>>>>>>>>> >> >> >>>>>>>>>>>>>>>>>>>> strategy works in Dataflow but not in Flink which >> >> >>>>>>>>>>>>>>>>>>>> has >> >> >>>> a >> >> >>>>>>>>>>>> >> >> >>>>>>>>>>>> horizontal >> >> >>>>>>>>>>>>>>>>>>>> >> >> >>>>>>>>>>>>>>>>>>>> checkpoint. KafkaIO checks for compatibility. >> >> >>>>>>>>>>>>>>>>>>>> >> >> >>>>>>>>>>>>>>>>>>>> On Wed, Nov 15, 2017 at 12:38 AM, Romain >> >> >>>>>>>>>>>>>>>>>>>> Manni-Bucau >> >> >>>>>>>>>>>>>>>>>>>> < >> >> >>>>>>>>>>>>>>>>>>>> rmannibu...@gmail.com> >> >> >> >> >>>>>>>>>>>>>>>>>>>> wrote: >> >> >>>>>>>>>>>>>>>>>>>> >> >> >>>>>>>>>>>>>>>>>>>>> Hi guys, >> >> >>>>>>>>>>>>>>>>>>>>> >> >> >>>>>>>>>>>>>>>>>>>>> The subject is a bit provocative but the topic is >> >> >>>> real >> >> >>>>>> and >> >> >>>>>>>>>>>> >> >> >>>>>>>>>>>> coming >> >> >>>>>>>>>>>>>>>>>>>>> >> >> >>>>>>>>>>>>>>>>>>>>> again and again with the beam usage: how a dofn >> >> >>>>>>>>>>>>>>>>>>>>> can >> >> >>>>>> handle >> >> >>>>>>>>> some >> >> >>>>>>>>>>>>>>>>>>>>> "chunking". >> >> >>>>>>>>>>>>>>>>>>>>> >> >> >>>>>>>>>>>>>>>>>>>>> The need is to be able to commit each N records >> >> >>>>>>>>>>>>>>>>>>>>> but >> >> >>>>> with >> >> >>>>>> N >> >> >>>>>>> not >> >> >>>>>>>>>>>> >> >> >>>>>>>>>>>> too >> >> >>>>>>>>>>>>>>>>> >> >> >>>>>>>>>>>>>>>>> big. >> >> >>>>>>>>>>>>>>>>>>>>> >> >> >>>>>>>>>>>>>>>>>>>>> >> >> >>>>>>>>>>>>>>>>>>>>> The natural API for that in beam is the bundle >> >> >>>>>>>>>>>>>>>>>>>>> one >> >> >>>> but >> >> >>>>>>> bundles >> >> >>>>>>>>>>>> >> >> >>>>>>>>>>>> are >> >> >>>>>>>>>>>>>>>>> >> >> >>>>>>>>>>>>>>>>> not >> >> >>>>>>>>>>>>>>>>>>>>> >> >> >>>>>>>>>>>>>>>>>>>>> reliable since they can be very small (flink) - >> >> >>>>>>>>>>>>>>>>>>>>> we >> >> >>>> can >> >> >>>>>> say >> >> >>>>>>> it >> >> >>>>>>>>> is >> >> >>>>>>>>>>>>>>>>> >> >> >>>>>>>>>>>>>>>>> "ok" >> >> >>>>>>>>>>>>>>>>>>>>> >> >> >>>>>>>>>>>>>>>>>>>>> even if it has some perf impacts - or too big >> >> >>>>>>>>>>>>>>>>>>>>> (spark >> >> >>>>> does >> >> >>>>>>> full >> >> >>>>>>>>>>>> >> >> >>>>>>>>>>>> size >> >> >>>>>>>>>>>>>>>>> >> >> >>>>>>>>>>>>>>>>> / >> >> >>>>>>>>>>>>>>>>>>>>> >> >> >>>>>>>>>>>>>>>>>>>>> #workers). >> >> >>>>>>>>>>>>>>>>>>>>> >> >> >>>>>>>>>>>>>>>>>>>>> The workaround is what we see in the ES I/O: a >> >> >>>> maxSize >> >> >>>>>>> which >> >> >>>>>>>>>>>> >> >> >>>>>>>>>>>> does >> >> >>>>>>>>>>>>>> >> >> >>>>>>>>>>>>>> an >> >> >>>>>>>>>>>>>>>>>>>>> >> >> >>>>>>>>>>>>>>>>>>>>> eager flush. The issue is that then the >> >> >>>>>>>>>>>>>>>>>>>>> checkpoint >> >> >>>>>>>>>>>>>>>>>>>>> is >> >> >>>>> not >> >> >>>>>>>>>>>> >> >> >>>>>>>>>>>> respected >> >> >>>>>>>>>>>>>>>>>>>>> >> >> >>>>>>>>>>>>>>>>>>>>> and you can process multiple times the same >> >> >>>>>>>>>>>>>>>>>>>>> records. >> >> >>>>>>>>>>>>>>>>>>>>> >> >> >>>>>>>>>>>>>>>>>>>>> Any plan to make this API reliable and >> >> >>>>>>>>>>>>>>>>>>>>> controllable >> >> >>>>> from >> >> >>>>>> a >> >> >>>>>>>>> beam >> >> >>>>>>>>>>>>>>>>> >> >> >>>>>>>>>>>>>>>>> point >> >> >>>>>>>>>>>>>>>>>>>>> >> >> >>>>>>>>>>>>>>>>>>>>> of view (at least in a max manner)? >> >> >>>>>>>>>>>>>>>>>>>>> >> >> >>>>>>>>>>>>>>>>>>>>> Thanks, >> >> >>>>>>>>>>>>>>>>>>>>> Romain Manni-Bucau >> >> >>>>>>>>>>>>>>>>>>>>> @rmannibucau | Blog | Old Blog | Github | >> >> >>>>>>>>>>>>>>>>>>>>> LinkedIn >> >> >>>>>>>>>>>>>>>>>>>>> >> >> >>>>>>>>>>>>>>>>>>>> >> >> >>>>>>>>>>>>>>>>>>> >> >> >>>>>>>>>>>>>>>>> >> >> >>>>>>>>>>>>>> >> >> >>>>>>>>>>>>>> -- >> >> >>>>>>>>>>>>>> Jean-Baptiste Onofré >> >> >>>>>>>>>>>>>> jbono...@apache.org >> >> >>>>>>>>>>>>>> http://blog.nanthrax.net >> >> >>>>>>>>>>>>>> Talend - http://www.talend.com >> >> >>>>>>>>>>>>>> >> >> >>>>>>>>>>>> >> >> >>>>>>>>>>> >> >> >>>>>>>>>> >> >> >>>>>>>>>> -- >> >> >>>>>>>>>> Jean-Baptiste Onofré >> >> >>>>>>>>>> jbono...@apache.org >> >> >>>>>>>>>> http://blog.nanthrax.net >> >> >>>>>>>>>> Talend - http://www.talend.com >> >> >>>>>>>>> >> >> >>>>>>> >> >> >>>>>> >> >> >>>>> >> >> >>>> >> >> >> >> -- >> >> Jean-Baptiste Onofré >> >> jbono...@apache.org >> >> http://blog.nanthrax.net >> >> Talend - http://www.talend.com >> >