On Wed, Feb 8, 2017 at 10:48 AM, Kenneth Knowles <k...@google.com.invalid>
wrote:

> Hi Etienne,
>
> If the timer is firing n times for n elements, that's a bug in the runner /
> shared runner code. It should be deduped. Which runner? Can you file a JIRA
> against me to investigate? I'm still in the process of fleshing out more
> and more RunnableOnService (aka ValidatesRunner) tests so I will surely add
> one (existing tests already OOMed without deduping, so it wasn't at the top
> of my priority list)
>
> If the end user doesn't have a natural key, I would just add one and remove
> it within your transform. Not sure how easy this will be - you might need
> user intervention. Of course, you still do need to shard or you'll be
> processing the whole PCollection serially.
>

We should also provide the algorithm without using timers/state, as adding
(then sharding by) the key is likely to be expensive.


> On Wed, Feb 8, 2017 at 9:45 AM, Jean-Baptiste Onofré <j...@nanthrax.net>
> wrote:
>
> > Hi
> >
> > AFAIR the timer per function is in the "roadmap" (remembering discussion
> > we had with Kenn).
> >
> > I will take a deeper look next week on your branch.
> >
> > Regards
> > JB
> >
> > On Feb 8, 2017, 13:28, at 13:28, Etienne Chauchot <echauc...@gmail.com>
> > wrote:
> > >Hi Kenn,
> > >
> > >I have started using state and timer APIs, they seem awesome!
> > >
> > >Please take a look at
> > >https://github.com/echauchot/beam/tree/BEAM-135-BATCHING-PARDO
> > >
> > >It contains a PTransform that does the batching trans-bundles and
> > >respecting the windows (even if tests are not finished yet, see @Ignore
> > >
> > >and TODOs)
> > >
> > >  I have some questions:
> > >
> > >- I use the timer to detect the end of the window like you suggested.
> > >But the timer can only be set in @ProcessElement and @Ontimer. Javadoc
> > >says that timers are implicitly scoped to a key/window and that a timer
> > >
> > >can be set only for a single time per scope. I noticed that if I call
> > >timer.setForNowPlus in the @ProcessElement method, it seems that the
> > >timer is set n times for n elements. So I just created a state with
> > >boolean to prevent setting the timer more than once per key/window.
> > >=> Would it be good maybe to have a end user way of indicating that the
> > >
> > >timer will be set only once per key/window. Something analogous to
> > >@Setup, to avoid the user having to use a state boolean?
> > >
> > >- I understand that state and timers need to be per-key, but if the end
> > >
> > >user does not need a key (lets say he just needs a
> > >PCollection<String>).
> > >Then, do we tell him to use a PCollection<KV> anyway like I wrote in
> > >the
> > >javadoc of BatchingParDo?
> > >
> > >WDYT?
> > >
> > >Thanks,
> > >
> > >Etienne
> > >
> > >
> > >Le 26/01/2017 à 17:28, Etienne Chauchot a écrit :
> > >> Wonderful !
> > >>
> > >> Thanks Kenn !
> > >>
> > >> Etienne
> > >>
> > >>
> > >> Le 26/01/2017 à 15:34, Kenneth Knowles a écrit :
> > >>> Hi Etienne,
> > >>>
> > >>> I was drafting a proposal about @OnWindowExpiration when this email
> > >>> arrived. I thought I would try to quickly unblock you by responding
> > >>> with a
> > >>> TL;DR: you can achieve your goals with state & timers as they
> > >currently
> > >>> exist. You'll set a timer for
> > >>> window.maxTimestamp().plus(allowedLateness)
> > >>> precisely - when this timer fires, you are guaranteed that the input
> > >>> watermark has exceeded this point (so all new data is droppable)
> > >>> while the
> > >>> output timestamp is held to this point (so you can safely output
> > >into
> > >>> the
> > >>> window).
> > >>>
> > >>> @OnWindowExpiration is (1) a convenience to save you from needing a
> > >>> handle
> > >>> on the allowed lateness (not a problem in your case) and (2)
> > >actually
> > >>> meaningful and potentially less expensive to implement in the
> > >absence of
> > >>> state (this is why it needs a design discussion at all, really).
> > >>>
> > >>> Caveat: these APIs are new and not supported in every runner and
> > >>> windowing
> > >>> configuration.
> > >>>
> > >>> Kenn
> > >>>
> > >>> On Thu, Jan 26, 2017 at 1:48 AM, Etienne Chauchot
> > ><echauc...@gmail.com>
> > >>> wrote:
> > >>>
> > >>>> Hi,
> > >>>>
> > >>>> I have started to implement this ticket. For now it is implemented
> > >as a
> > >>>> PTransform that simply does ParDo.of(new DoFn) and all the
> > >processing
> > >>>> related to batching is done in the DoFn.
> > >>>>
> > >>>> I'm starting to deal with windows and bundles (starting to take a
> > >>>> look at
> > >>>> the State API to process trans-bundles, more questions about this
> > >to
> > >>>> come).
> > >>>> My comments/questions are inline:
> > >>>>
> > >>>>
> > >>>> Le 17/01/2017 à 18:41, Ben Chambers a écrit :
> > >>>>
> > >>>>> We should start by understanding the goals. If elements are in
> > >>>>> different
> > >>>>> windows can they be out in the same batch? If they have different
> > >>>>> timestamps what timestamp should the batch have?
> > >>>>>
> > >>>> Regarding timestamps: currently design is as so: the transform does
> > >not
> > >>>> group elements in the PCollection, so the "batch" does not exist as
> > >an
> > >>>> element in the PCollection. There is only a user defined function
> > >>>> (perBatchFn) that gets called when batchSize elements have been
> > >>>> processed.
> > >>>> This function takes an ArrayList as parameter. So elements keep
> > >their
> > >>>> original timestamps
> > >>>>
> > >>>>
> > >>>> Regarding windowing: I guess that if elements are not in the same
> > >>>> window,
> > >>>> they are not expected to be in the same batch.
> > >>>> I'm just starting to work on these subjects, so I might lack a bit
> > >of
> > >>>> information;
> > >>>> what I am currently thinking about is that I need a way to know in
> > >the
> > >>>> DoFn that the window has expired so that I can call the perBatchFn
> > >>>> even if
> > >>>> batchSize is not reached.  This is the @OnWindowExpiration callback
> > >
> > >>>> that
> > >>>> Kenneth mentioned in an email about bundles.
> > >>>> Lets imagine that we have a collection of elements artificially
> > >>>> timestamped every 10 seconds (for simplicity of the example) and a
> > >>>> fixed
> > >>>> windowing of 1 minute. Then each window contains 6 elements. If we
> > >>>> were to
> > >>>> buffer the elements by batches of 5 elements, then for each window
> > >we
> > >>>> expect to get 2 batches (one of 5 elements, one of 1 element). For
> > >>>> that to
> > >>>> append, we need a @OnWindowExpiration on the DoFn where we call
> > >>>> perBatchFn
> > >>>>
> > >>>> As a composite transform this will likely require a group by key
> > >>>> which may
> > >>>>> affect performance. Maybe within a dofn is better.
> > >>>>>
> > >>>> Yes, the processing is done with a DoFn indeed.
> > >>>>
> > >>>>> Then it could be some annotation or API that informs the runner.
> > >>>>> Should
> > >>>>> batch sizes be fixed in the annotation (element count or size) or
> > >>>>> should
> > >>>>> the user have some method that lets them decide when to process a
> > >>>>> batch
> > >>>>> based on the contents?
> > >>>>>
> > >>>> For now, the user passes batchSize as an argument to
> > >>>> BatchParDo.via() it
> > >>>> is a number of elements. But batch based on content might be useful
> > >
> > >>>> for the
> > >>>> user. Give hint to the runner might be more flexible for the
> > >runner.
> > >>>> Thanks.
> > >>>>
> > >>>>> Another thing to think about is whether this should be connected
> > >to
> > >>>>> the
> > >>>>> ability to run parts of the bundle in parallel.
> > >>>>>
> > >>>> Yes!
> > >>>>
> > >>>>> Maybe each batch is an RPC
> > >>>>> and you just want to start an async RPC for each batch. Then in
> > >>>>> addition
> > >>>>> to
> > >>>>> start the final RPC in finishBundle, you also need to wait for all
> > >the
> > >>>>> RPCs
> > >>>>> to complete.
> > >>>>>
> > >>>> Actually, currently each batch processing is whatever the user
> > >wants
> > >>>> (perBatchFn user defined function). If the user decides to issue an
> > >
> > >>>> async
> > >>>> RPC in that function (call with the arrayList of input elements),
> > >>>> IMHO he
> > >>>> is responsible for waiting for the response in that method if he
> > >>>> needs the
> > >>>> response, but he can also do a send and forget, depending on his
> > >use
> > >>>> case.
> > >>>>
> > >>>> Besides, I have also included a perElementFn user function to allow
> > >the
> > >>>> user to do some processing on the elements before adding them to
> > >the
> > >>>> batch
> > >>>> (example use case: convert a String to a DTO object to invoke an
> > >>>> external
> > >>>> service)
> > >>>>
> > >>>> Etienne
> > >>>>
> > >>>> On Tue, Jan 17, 2017, 8:48 AM Etienne Chauchot<echauc...@gmail.com>
> > >>>>> wrote:
> > >>>>>
> > >>>>> Hi JB,
> > >>>>>
> > >>>>> I meant jira vote but discussion on the ML works also :)
> > >>>>>
> > >>>>> As I understand the need (see stackoverflow links in jira ticket)
> > >the
> > >>>>> aim is to avoid the user having to code the batching logic in his
> > >own
> > >>>>> DoFn.processElement() and DoFn.finishBundle() regardless of the
> > >>>>> bundles.
> > >>>>> For example, possible use case is to batch a call to an external
> > >>>>> service
> > >>>>> (for performance).
> > >>>>>
> > >>>>> I was thinking about providing a PTransform that implements the
> > >>>>> batching
> > >>>>> in its own DoFn and that takes user defined functions for
> > >>>>> customization.
> > >>>>>
> > >>>>> Etienne
> > >>>>>
> > >>>>> Le 17/01/2017 à 17:30, Jean-Baptiste Onofré a écrit :
> > >>>>>
> > >>>>>> Hi
> > >>>>>>
> > >>>>>> I guess you mean discussion on the mailing list about that, right
> > >?
> > >>>>>>
> > >>>>>> AFAIR the ide⁣​a is to provide a utility class to deal with
> > >>>>>>
> > >>>>> pooling/batching. However not sure it's required as with
> > >>>>> @StartBundle etc
> > >>>>> in DoFn and batching depends of the end user "logic".
> > >>>>>
> > >>>>>> Regards
> > >>>>>> JB
> > >>>>>>
> > >>>>>> On Jan 17, 2017, 08:26, at 08:26, Etienne
> > >>>>>> Chauchot<echauc...@gmail.com>
> > >>>>>>
> > >>>>> wrote:
> > >>>>>
> > >>>>>> Hi all,
> > >>>>>>> I have started to work on this ticket
> > >>>>>>> https://issues.apache.org/jira/browse/BEAM-135
> > >>>>>>>
> > >>>>>>> As there where no vote since March 18th, is the issue still
> > >>>>>>> relevant/needed?
> > >>>>>>>
> > >>>>>>> Regards,
> > >>>>>>>
> > >>>>>>> Etienne
> > >>>>>>>
> > >>
> >
>

Reply via email to