Hi Etienne, If the timer is firing n times for n elements, that's a bug in the runner / shared runner code. It should be deduped. Which runner? Can you file a JIRA against me to investigate? I'm still in the process of fleshing out more and more RunnableOnService (aka ValidatesRunner) tests so I will surely add one (existing tests already OOMed without deduping, so it wasn't at the top of my priority list)
If the end user doesn't have a natural key, I would just add one and remove it within your transform. Not sure how easy this will be - you might need user intervention. Of course, you still do need to shard or you'll be processing the whole PCollection serially. Kenn On Wed, Feb 8, 2017 at 9:45 AM, Jean-Baptiste Onofré <j...@nanthrax.net> wrote: > Hi > > AFAIR the timer per function is in the "roadmap" (remembering discussion > we had with Kenn). > > I will take a deeper look next week on your branch. > > Regards > JB > > On Feb 8, 2017, 13:28, at 13:28, Etienne Chauchot <echauc...@gmail.com> > wrote: > >Hi Kenn, > > > >I have started using state and timer APIs, they seem awesome! > > > >Please take a look at > >https://github.com/echauchot/beam/tree/BEAM-135-BATCHING-PARDO > > > >It contains a PTransform that does the batching trans-bundles and > >respecting the windows (even if tests are not finished yet, see @Ignore > > > >and TODOs) > > > > I have some questions: > > > >- I use the timer to detect the end of the window like you suggested. > >But the timer can only be set in @ProcessElement and @Ontimer. Javadoc > >says that timers are implicitly scoped to a key/window and that a timer > > > >can be set only for a single time per scope. I noticed that if I call > >timer.setForNowPlus in the @ProcessElement method, it seems that the > >timer is set n times for n elements. So I just created a state with > >boolean to prevent setting the timer more than once per key/window. > >=> Would it be good maybe to have a end user way of indicating that the > > > >timer will be set only once per key/window. Something analogous to > >@Setup, to avoid the user having to use a state boolean? > > > >- I understand that state and timers need to be per-key, but if the end > > > >user does not need a key (lets say he just needs a > >PCollection<String>). > >Then, do we tell him to use a PCollection<KV> anyway like I wrote in > >the > >javadoc of BatchingParDo? > > > >WDYT? > > > >Thanks, > > > >Etienne > > > > > >Le 26/01/2017 à 17:28, Etienne Chauchot a écrit : > >> Wonderful ! > >> > >> Thanks Kenn ! > >> > >> Etienne > >> > >> > >> Le 26/01/2017 à 15:34, Kenneth Knowles a écrit : > >>> Hi Etienne, > >>> > >>> I was drafting a proposal about @OnWindowExpiration when this email > >>> arrived. I thought I would try to quickly unblock you by responding > >>> with a > >>> TL;DR: you can achieve your goals with state & timers as they > >currently > >>> exist. You'll set a timer for > >>> window.maxTimestamp().plus(allowedLateness) > >>> precisely - when this timer fires, you are guaranteed that the input > >>> watermark has exceeded this point (so all new data is droppable) > >>> while the > >>> output timestamp is held to this point (so you can safely output > >into > >>> the > >>> window). > >>> > >>> @OnWindowExpiration is (1) a convenience to save you from needing a > >>> handle > >>> on the allowed lateness (not a problem in your case) and (2) > >actually > >>> meaningful and potentially less expensive to implement in the > >absence of > >>> state (this is why it needs a design discussion at all, really). > >>> > >>> Caveat: these APIs are new and not supported in every runner and > >>> windowing > >>> configuration. > >>> > >>> Kenn > >>> > >>> On Thu, Jan 26, 2017 at 1:48 AM, Etienne Chauchot > ><echauc...@gmail.com> > >>> wrote: > >>> > >>>> Hi, > >>>> > >>>> I have started to implement this ticket. For now it is implemented > >as a > >>>> PTransform that simply does ParDo.of(new DoFn) and all the > >processing > >>>> related to batching is done in the DoFn. > >>>> > >>>> I'm starting to deal with windows and bundles (starting to take a > >>>> look at > >>>> the State API to process trans-bundles, more questions about this > >to > >>>> come). > >>>> My comments/questions are inline: > >>>> > >>>> > >>>> Le 17/01/2017 à 18:41, Ben Chambers a écrit : > >>>> > >>>>> We should start by understanding the goals. If elements are in > >>>>> different > >>>>> windows can they be out in the same batch? If they have different > >>>>> timestamps what timestamp should the batch have? > >>>>> > >>>> Regarding timestamps: currently design is as so: the transform does > >not > >>>> group elements in the PCollection, so the "batch" does not exist as > >an > >>>> element in the PCollection. There is only a user defined function > >>>> (perBatchFn) that gets called when batchSize elements have been > >>>> processed. > >>>> This function takes an ArrayList as parameter. So elements keep > >their > >>>> original timestamps > >>>> > >>>> > >>>> Regarding windowing: I guess that if elements are not in the same > >>>> window, > >>>> they are not expected to be in the same batch. > >>>> I'm just starting to work on these subjects, so I might lack a bit > >of > >>>> information; > >>>> what I am currently thinking about is that I need a way to know in > >the > >>>> DoFn that the window has expired so that I can call the perBatchFn > >>>> even if > >>>> batchSize is not reached. This is the @OnWindowExpiration callback > > > >>>> that > >>>> Kenneth mentioned in an email about bundles. > >>>> Lets imagine that we have a collection of elements artificially > >>>> timestamped every 10 seconds (for simplicity of the example) and a > >>>> fixed > >>>> windowing of 1 minute. Then each window contains 6 elements. If we > >>>> were to > >>>> buffer the elements by batches of 5 elements, then for each window > >we > >>>> expect to get 2 batches (one of 5 elements, one of 1 element). For > >>>> that to > >>>> append, we need a @OnWindowExpiration on the DoFn where we call > >>>> perBatchFn > >>>> > >>>> As a composite transform this will likely require a group by key > >>>> which may > >>>>> affect performance. Maybe within a dofn is better. > >>>>> > >>>> Yes, the processing is done with a DoFn indeed. > >>>> > >>>>> Then it could be some annotation or API that informs the runner. > >>>>> Should > >>>>> batch sizes be fixed in the annotation (element count or size) or > >>>>> should > >>>>> the user have some method that lets them decide when to process a > >>>>> batch > >>>>> based on the contents? > >>>>> > >>>> For now, the user passes batchSize as an argument to > >>>> BatchParDo.via() it > >>>> is a number of elements. But batch based on content might be useful > > > >>>> for the > >>>> user. Give hint to the runner might be more flexible for the > >runner. > >>>> Thanks. > >>>> > >>>>> Another thing to think about is whether this should be connected > >to > >>>>> the > >>>>> ability to run parts of the bundle in parallel. > >>>>> > >>>> Yes! > >>>> > >>>>> Maybe each batch is an RPC > >>>>> and you just want to start an async RPC for each batch. Then in > >>>>> addition > >>>>> to > >>>>> start the final RPC in finishBundle, you also need to wait for all > >the > >>>>> RPCs > >>>>> to complete. > >>>>> > >>>> Actually, currently each batch processing is whatever the user > >wants > >>>> (perBatchFn user defined function). If the user decides to issue an > > > >>>> async > >>>> RPC in that function (call with the arrayList of input elements), > >>>> IMHO he > >>>> is responsible for waiting for the response in that method if he > >>>> needs the > >>>> response, but he can also do a send and forget, depending on his > >use > >>>> case. > >>>> > >>>> Besides, I have also included a perElementFn user function to allow > >the > >>>> user to do some processing on the elements before adding them to > >the > >>>> batch > >>>> (example use case: convert a String to a DTO object to invoke an > >>>> external > >>>> service) > >>>> > >>>> Etienne > >>>> > >>>> On Tue, Jan 17, 2017, 8:48 AM Etienne Chauchot<echauc...@gmail.com> > >>>>> wrote: > >>>>> > >>>>> Hi JB, > >>>>> > >>>>> I meant jira vote but discussion on the ML works also :) > >>>>> > >>>>> As I understand the need (see stackoverflow links in jira ticket) > >the > >>>>> aim is to avoid the user having to code the batching logic in his > >own > >>>>> DoFn.processElement() and DoFn.finishBundle() regardless of the > >>>>> bundles. > >>>>> For example, possible use case is to batch a call to an external > >>>>> service > >>>>> (for performance). > >>>>> > >>>>> I was thinking about providing a PTransform that implements the > >>>>> batching > >>>>> in its own DoFn and that takes user defined functions for > >>>>> customization. > >>>>> > >>>>> Etienne > >>>>> > >>>>> Le 17/01/2017 à 17:30, Jean-Baptiste Onofré a écrit : > >>>>> > >>>>>> Hi > >>>>>> > >>>>>> I guess you mean discussion on the mailing list about that, right > >? > >>>>>> > >>>>>> AFAIR the idea is to provide a utility class to deal with > >>>>>> > >>>>> pooling/batching. However not sure it's required as with > >>>>> @StartBundle etc > >>>>> in DoFn and batching depends of the end user "logic". > >>>>> > >>>>>> Regards > >>>>>> JB > >>>>>> > >>>>>> On Jan 17, 2017, 08:26, at 08:26, Etienne > >>>>>> Chauchot<echauc...@gmail.com> > >>>>>> > >>>>> wrote: > >>>>> > >>>>>> Hi all, > >>>>>>> I have started to work on this ticket > >>>>>>> https://issues.apache.org/jira/browse/BEAM-135 > >>>>>>> > >>>>>>> As there where no vote since March 18th, is the issue still > >>>>>>> relevant/needed? > >>>>>>> > >>>>>>> Regards, > >>>>>>> > >>>>>>> Etienne > >>>>>>> > >> >