Hi Etienne,

If the timer is firing n times for n elements, that's a bug in the runner /
shared runner code. It should be deduped. Which runner? Can you file a JIRA
against me to investigate? I'm still in the process of fleshing out more
and more RunnableOnService (aka ValidatesRunner) tests so I will surely add
one (existing tests already OOMed without deduping, so it wasn't at the top
of my priority list)

If the end user doesn't have a natural key, I would just add one and remove
it within your transform. Not sure how easy this will be - you might need
user intervention. Of course, you still do need to shard or you'll be
processing the whole PCollection serially.

Kenn

On Wed, Feb 8, 2017 at 9:45 AM, Jean-Baptiste Onofré <j...@nanthrax.net>
wrote:

> Hi
>
> AFAIR the timer per function is in the "roadmap" (remembering discussion
> we had with Kenn).
>
> I will take a deeper look next week on your branch.
>
> Regards
> JB
>
> On Feb 8, 2017, 13:28, at 13:28, Etienne Chauchot <echauc...@gmail.com>
> wrote:
> >Hi Kenn,
> >
> >I have started using state and timer APIs, they seem awesome!
> >
> >Please take a look at
> >https://github.com/echauchot/beam/tree/BEAM-135-BATCHING-PARDO
> >
> >It contains a PTransform that does the batching trans-bundles and
> >respecting the windows (even if tests are not finished yet, see @Ignore
> >
> >and TODOs)
> >
> >  I have some questions:
> >
> >- I use the timer to detect the end of the window like you suggested.
> >But the timer can only be set in @ProcessElement and @Ontimer. Javadoc
> >says that timers are implicitly scoped to a key/window and that a timer
> >
> >can be set only for a single time per scope. I noticed that if I call
> >timer.setForNowPlus in the @ProcessElement method, it seems that the
> >timer is set n times for n elements. So I just created a state with
> >boolean to prevent setting the timer more than once per key/window.
> >=> Would it be good maybe to have a end user way of indicating that the
> >
> >timer will be set only once per key/window. Something analogous to
> >@Setup, to avoid the user having to use a state boolean?
> >
> >- I understand that state and timers need to be per-key, but if the end
> >
> >user does not need a key (lets say he just needs a
> >PCollection<String>).
> >Then, do we tell him to use a PCollection<KV> anyway like I wrote in
> >the
> >javadoc of BatchingParDo?
> >
> >WDYT?
> >
> >Thanks,
> >
> >Etienne
> >
> >
> >Le 26/01/2017 à 17:28, Etienne Chauchot a écrit :
> >> Wonderful !
> >>
> >> Thanks Kenn !
> >>
> >> Etienne
> >>
> >>
> >> Le 26/01/2017 à 15:34, Kenneth Knowles a écrit :
> >>> Hi Etienne,
> >>>
> >>> I was drafting a proposal about @OnWindowExpiration when this email
> >>> arrived. I thought I would try to quickly unblock you by responding
> >>> with a
> >>> TL;DR: you can achieve your goals with state & timers as they
> >currently
> >>> exist. You'll set a timer for
> >>> window.maxTimestamp().plus(allowedLateness)
> >>> precisely - when this timer fires, you are guaranteed that the input
> >>> watermark has exceeded this point (so all new data is droppable)
> >>> while the
> >>> output timestamp is held to this point (so you can safely output
> >into
> >>> the
> >>> window).
> >>>
> >>> @OnWindowExpiration is (1) a convenience to save you from needing a
> >>> handle
> >>> on the allowed lateness (not a problem in your case) and (2)
> >actually
> >>> meaningful and potentially less expensive to implement in the
> >absence of
> >>> state (this is why it needs a design discussion at all, really).
> >>>
> >>> Caveat: these APIs are new and not supported in every runner and
> >>> windowing
> >>> configuration.
> >>>
> >>> Kenn
> >>>
> >>> On Thu, Jan 26, 2017 at 1:48 AM, Etienne Chauchot
> ><echauc...@gmail.com>
> >>> wrote:
> >>>
> >>>> Hi,
> >>>>
> >>>> I have started to implement this ticket. For now it is implemented
> >as a
> >>>> PTransform that simply does ParDo.of(new DoFn) and all the
> >processing
> >>>> related to batching is done in the DoFn.
> >>>>
> >>>> I'm starting to deal with windows and bundles (starting to take a
> >>>> look at
> >>>> the State API to process trans-bundles, more questions about this
> >to
> >>>> come).
> >>>> My comments/questions are inline:
> >>>>
> >>>>
> >>>> Le 17/01/2017 à 18:41, Ben Chambers a écrit :
> >>>>
> >>>>> We should start by understanding the goals. If elements are in
> >>>>> different
> >>>>> windows can they be out in the same batch? If they have different
> >>>>> timestamps what timestamp should the batch have?
> >>>>>
> >>>> Regarding timestamps: currently design is as so: the transform does
> >not
> >>>> group elements in the PCollection, so the "batch" does not exist as
> >an
> >>>> element in the PCollection. There is only a user defined function
> >>>> (perBatchFn) that gets called when batchSize elements have been
> >>>> processed.
> >>>> This function takes an ArrayList as parameter. So elements keep
> >their
> >>>> original timestamps
> >>>>
> >>>>
> >>>> Regarding windowing: I guess that if elements are not in the same
> >>>> window,
> >>>> they are not expected to be in the same batch.
> >>>> I'm just starting to work on these subjects, so I might lack a bit
> >of
> >>>> information;
> >>>> what I am currently thinking about is that I need a way to know in
> >the
> >>>> DoFn that the window has expired so that I can call the perBatchFn
> >>>> even if
> >>>> batchSize is not reached.  This is the @OnWindowExpiration callback
> >
> >>>> that
> >>>> Kenneth mentioned in an email about bundles.
> >>>> Lets imagine that we have a collection of elements artificially
> >>>> timestamped every 10 seconds (for simplicity of the example) and a
> >>>> fixed
> >>>> windowing of 1 minute. Then each window contains 6 elements. If we
> >>>> were to
> >>>> buffer the elements by batches of 5 elements, then for each window
> >we
> >>>> expect to get 2 batches (one of 5 elements, one of 1 element). For
> >>>> that to
> >>>> append, we need a @OnWindowExpiration on the DoFn where we call
> >>>> perBatchFn
> >>>>
> >>>> As a composite transform this will likely require a group by key
> >>>> which may
> >>>>> affect performance. Maybe within a dofn is better.
> >>>>>
> >>>> Yes, the processing is done with a DoFn indeed.
> >>>>
> >>>>> Then it could be some annotation or API that informs the runner.
> >>>>> Should
> >>>>> batch sizes be fixed in the annotation (element count or size) or
> >>>>> should
> >>>>> the user have some method that lets them decide when to process a
> >>>>> batch
> >>>>> based on the contents?
> >>>>>
> >>>> For now, the user passes batchSize as an argument to
> >>>> BatchParDo.via() it
> >>>> is a number of elements. But batch based on content might be useful
> >
> >>>> for the
> >>>> user. Give hint to the runner might be more flexible for the
> >runner.
> >>>> Thanks.
> >>>>
> >>>>> Another thing to think about is whether this should be connected
> >to
> >>>>> the
> >>>>> ability to run parts of the bundle in parallel.
> >>>>>
> >>>> Yes!
> >>>>
> >>>>> Maybe each batch is an RPC
> >>>>> and you just want to start an async RPC for each batch. Then in
> >>>>> addition
> >>>>> to
> >>>>> start the final RPC in finishBundle, you also need to wait for all
> >the
> >>>>> RPCs
> >>>>> to complete.
> >>>>>
> >>>> Actually, currently each batch processing is whatever the user
> >wants
> >>>> (perBatchFn user defined function). If the user decides to issue an
> >
> >>>> async
> >>>> RPC in that function (call with the arrayList of input elements),
> >>>> IMHO he
> >>>> is responsible for waiting for the response in that method if he
> >>>> needs the
> >>>> response, but he can also do a send and forget, depending on his
> >use
> >>>> case.
> >>>>
> >>>> Besides, I have also included a perElementFn user function to allow
> >the
> >>>> user to do some processing on the elements before adding them to
> >the
> >>>> batch
> >>>> (example use case: convert a String to a DTO object to invoke an
> >>>> external
> >>>> service)
> >>>>
> >>>> Etienne
> >>>>
> >>>> On Tue, Jan 17, 2017, 8:48 AM Etienne Chauchot<echauc...@gmail.com>
> >>>>> wrote:
> >>>>>
> >>>>> Hi JB,
> >>>>>
> >>>>> I meant jira vote but discussion on the ML works also :)
> >>>>>
> >>>>> As I understand the need (see stackoverflow links in jira ticket)
> >the
> >>>>> aim is to avoid the user having to code the batching logic in his
> >own
> >>>>> DoFn.processElement() and DoFn.finishBundle() regardless of the
> >>>>> bundles.
> >>>>> For example, possible use case is to batch a call to an external
> >>>>> service
> >>>>> (for performance).
> >>>>>
> >>>>> I was thinking about providing a PTransform that implements the
> >>>>> batching
> >>>>> in its own DoFn and that takes user defined functions for
> >>>>> customization.
> >>>>>
> >>>>> Etienne
> >>>>>
> >>>>> Le 17/01/2017 à 17:30, Jean-Baptiste Onofré a écrit :
> >>>>>
> >>>>>> Hi
> >>>>>>
> >>>>>> I guess you mean discussion on the mailing list about that, right
> >?
> >>>>>>
> >>>>>> AFAIR the ide⁣​a is to provide a utility class to deal with
> >>>>>>
> >>>>> pooling/batching. However not sure it's required as with
> >>>>> @StartBundle etc
> >>>>> in DoFn and batching depends of the end user "logic".
> >>>>>
> >>>>>> Regards
> >>>>>> JB
> >>>>>>
> >>>>>> On Jan 17, 2017, 08:26, at 08:26, Etienne
> >>>>>> Chauchot<echauc...@gmail.com>
> >>>>>>
> >>>>> wrote:
> >>>>>
> >>>>>> Hi all,
> >>>>>>> I have started to work on this ticket
> >>>>>>> https://issues.apache.org/jira/browse/BEAM-135
> >>>>>>>
> >>>>>>> As there where no vote since March 18th, is the issue still
> >>>>>>> relevant/needed?
> >>>>>>>
> >>>>>>> Regards,
> >>>>>>>
> >>>>>>> Etienne
> >>>>>>>
> >>
>

Reply via email to