Any progress on this?

On Thu, Mar 9, 2017 at 1:43 AM, Etienne Chauchot <[email protected]> wrote:
> Hi all,
>
> We had a discussion with Kenn yesterday about point 1 bellow, I would like
> to note it here on the ML:
>
> Using new method timer.set() instead of timer.setForNowPlus() makes the
> timer fire at the right time.
>
> Another thing, regarding point 2: if I inject the window in the @Ontimer
> method and print it, I see that at the moment the timer fires (at last
> timestamp of the window), the window is the GlobalWindow. I guess that is
> because the fixed window has just ended. Maybe the empty bagState that I get
> here is due to the end of window (passing to the GlobalWindow). I mean, as
> the states are scoped per window, and the window is different, then another
> bagState instance gets injected. Hence the empty bagState
>
> WDYT?
>
> I will open a PR even if this work is not finished yet, that way, we will
> have a convenient environment for discussing this code.
>
> Etienne
>
>
> Le 03/03/2017 à 11:48, Etienne Chauchot a écrit :
>>
>> Hi all,
>>
>> @Kenn: I have enhanced my streaming test in
>> https://github.com/echauchot/beam/tree/BEAM-135-BATCHING-PARDO in particular
>> to check that BatchingParDo doesn't mess up windows. It seems that it
>> actually does :)
>>
>> The input collection contains 10 elements timestamped at 1s interval and
>> it is divided into fixed windows of 5s duration (so 2 windows). startTime is
>> epoch. The timer is used to detect the end of the window and output the
>> content of the batch (buffer) then.
>>
>> I added some logs and I noticed two strange things (that might be linked):
>>
>>
>> 1-The timer is set twice, and it is set correctly
>>
>> INFOS: ***** SET TIMER ***** Delay of 4999 ms added to timestamp
>> 1970-01-01T00:00:00.000Z set for window
>> [1970-01-01T00:00:00.000Z..1970-01-01T00:00:05.000Z)
>>
>> INFOS: ***** SET TIMER ***** Delay of 4999 ms added to timestamp
>> 1970-01-01T00:00:05.000Z set for window
>> [1970-01-01T00:00:05.000Z..1970-01-01T00:00:10.000Z)
>>
>> It correctly fires twice but not at the right timeStamp:
>>
>> INFOS: ***** END OF WINDOW ***** for timer timestamp
>> 1970-01-01T00:00:04.999Z
>>
>> =>Correct
>>
>> INFOS: ***** END OF WINDOW ***** for timer timestamp
>> 1970-01-01T00:00:04.999Z
>>
>> => Incorrect (should fire at timestamp 1970-01-01T00:00:09.999Z)
>>
>> Do I need to call timer.cancel() after the timer has fired ? But
>> timer.cancel() is not supported by DirectRunner yet.
>>
>>
>>
>> 2- in @OnTimer method the injected batch bagState parameter is empty
>> whereas it was added some elements since last batch.clear() while processing
>> the same window
>>
>> INFOS: ***** BATCH ***** clear
>>
>> INFOS: ***** BATCH ***** Add element for window
>> [1970-01-01T00:00:00.000Z..1970-01-01T00:00:05.000Z)
>>
>> INFOS: ***** BATCH ***** Add element for window
>> [1970-01-01T00:00:00.000Z..1970-01-01T00:00:05.000Z)
>> ..
>> INFOS: ***** END OF WINDOW ***** for timer timestamp
>> 1970-01-01T00:00:04.999Z
>> INFOS: ***** IN ONTIMER ***** batch size 0
>>
>> Am I doing something wrong with timers or is there something not totally
>> finished with them (as you noticed they are quite new)?
>>
>> WDYT?
>>
>>
>> Thanks
>>
>> Etienne
>>
>>
>> Le 09/02/2017 à 09:55, Etienne Chauchot a écrit :
>>>
>>> Hi,
>>>
>>> @JB: good to know for the roadmap! thanks
>>>
>>> @Kenn: just to be clear: the timer fires fine. What I noticed is that it
>>> seems to be SET more than once because timer.setForNowPlus in called the
>>> @ProcessElement method. I am not 100% sure of it, what I noticed is that it
>>> started to work fine when I ensured to call timer.setForNowPlus only once. I
>>> don't say it's a bug, this is just not what I understood when I read the
>>> javadoc, I understood that it would be set  only once per window, see
>>> javadoc bellow:
>>>
>>> An implementation of Timer is implicitly scoped - it may be scoped to a
>>> key and window, or a key, window, and trigger, etc.
>>> A timer exists in one of two states: set or unset. A timer can be set
>>> only for a single time per scope.
>>>
>>> I use the DirectRunner.
>>>
>>> For the key part: ok, makes sense.
>>>
>>> Thanks for your comments
>>>
>>> I'm leaving on vacation tonight for a little more than two weeks, I'll
>>> resume work then, maybe start a PR when it's ready.
>>>
>>> Etienne
>>>
>>>
>>>
>>> Le 08/02/2017 à 19:48, Kenneth Knowles a écrit :
>>>>
>>>> Hi Etienne,
>>>>
>>>> If the timer is firing n times for n elements, that's a bug in the
>>>> runner /
>>>> shared runner code. It should be deduped. Which runner? Can you file a
>>>> JIRA
>>>> against me to investigate? I'm still in the process of fleshing out more
>>>> and more RunnableOnService (aka ValidatesRunner) tests so I will surely
>>>> add
>>>> one (existing tests already OOMed without deduping, so it wasn't at the
>>>> top
>>>> of my priority list)
>>>>
>>>> If the end user doesn't have a natural key, I would just add one and
>>>> remove
>>>> it within your transform. Not sure how easy this will be - you might
>>>> need
>>>> user intervention. Of course, you still do need to shard or you'll be
>>>> processing the whole PCollection serially.
>>>>
>>>> Kenn
>>>>
>>>> On Wed, Feb 8, 2017 at 9:45 AM, Jean-Baptiste Onofré <[email protected]>
>>>> wrote:
>>>>
>>>>> Hi
>>>>>
>>>>> AFAIR the timer per function is in the "roadmap" (remembering
>>>>> discussion
>>>>> we had with Kenn).
>>>>>
>>>>> I will take a deeper look next week on your branch.
>>>>>
>>>>> Regards
>>>>> JB
>>>>>
>>>>> On Feb 8, 2017, 13:28, at 13:28, Etienne Chauchot <[email protected]>
>>>>> wrote:
>>>>>>
>>>>>> Hi Kenn,
>>>>>>
>>>>>> I have started using state and timer APIs, they seem awesome!
>>>>>>
>>>>>> Please take a look at
>>>>>> https://github.com/echauchot/beam/tree/BEAM-135-BATCHING-PARDO
>>>>>>
>>>>>> It contains a PTransform that does the batching trans-bundles and
>>>>>> respecting the windows (even if tests are not finished yet, see
>>>>>> @Ignore
>>>>>>
>>>>>> and TODOs)
>>>>>>
>>>>>>   I have some questions:
>>>>>>
>>>>>> - I use the timer to detect the end of the window like you suggested.
>>>>>> But the timer can only be set in @ProcessElement and @Ontimer. Javadoc
>>>>>> says that timers are implicitly scoped to a key/window and that a
>>>>>> timer
>>>>>>
>>>>>> can be set only for a single time per scope. I noticed that if I call
>>>>>> timer.setForNowPlus in the @ProcessElement method, it seems that the
>>>>>> timer is set n times for n elements. So I just created a state with
>>>>>> boolean to prevent setting the timer more than once per key/window.
>>>>>> => Would it be good maybe to have a end user way of indicating that
>>>>>> the
>>>>>>
>>>>>> timer will be set only once per key/window. Something analogous to
>>>>>> @Setup, to avoid the user having to use a state boolean?
>>>>>>
>>>>>> - I understand that state and timers need to be per-key, but if the
>>>>>> end
>>>>>>
>>>>>> user does not need a key (lets say he just needs a
>>>>>> PCollection<String>).
>>>>>> Then, do we tell him to use a PCollection<KV> anyway like I wrote in
>>>>>> the
>>>>>> javadoc of BatchingParDo?
>>>>>>
>>>>>> WDYT?
>>>>>>
>>>>>> Thanks,
>>>>>>
>>>>>> Etienne
>>>>>>
>>>>>>
>>>>>> Le 26/01/2017 à 17:28, Etienne Chauchot a écrit :
>>>>>>>
>>>>>>> Wonderful !
>>>>>>>
>>>>>>> Thanks Kenn !
>>>>>>>
>>>>>>> Etienne
>>>>>>>
>>>>>>>
>>>>>>> Le 26/01/2017 à 15:34, Kenneth Knowles a écrit :
>>>>>>>>
>>>>>>>> Hi Etienne,
>>>>>>>>
>>>>>>>> I was drafting a proposal about @OnWindowExpiration when this email
>>>>>>>> arrived. I thought I would try to quickly unblock you by responding
>>>>>>>> with a
>>>>>>>> TL;DR: you can achieve your goals with state & timers as they
>>>>>>
>>>>>> currently
>>>>>>>>
>>>>>>>> exist. You'll set a timer for
>>>>>>>> window.maxTimestamp().plus(allowedLateness)
>>>>>>>> precisely - when this timer fires, you are guaranteed that the input
>>>>>>>> watermark has exceeded this point (so all new data is droppable)
>>>>>>>> while the
>>>>>>>> output timestamp is held to this point (so you can safely output
>>>>>>
>>>>>> into
>>>>>>>>
>>>>>>>> the
>>>>>>>> window).
>>>>>>>>
>>>>>>>> @OnWindowExpiration is (1) a convenience to save you from needing a
>>>>>>>> handle
>>>>>>>> on the allowed lateness (not a problem in your case) and (2)
>>>>>>
>>>>>> actually
>>>>>>>>
>>>>>>>> meaningful and potentially less expensive to implement in the
>>>>>>
>>>>>> absence of
>>>>>>>>
>>>>>>>> state (this is why it needs a design discussion at all, really).
>>>>>>>>
>>>>>>>> Caveat: these APIs are new and not supported in every runner and
>>>>>>>> windowing
>>>>>>>> configuration.
>>>>>>>>
>>>>>>>> Kenn
>>>>>>>>
>>>>>>>> On Thu, Jan 26, 2017 at 1:48 AM, Etienne Chauchot
>>>>>>
>>>>>> <[email protected]>
>>>>>>>>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> Hi,
>>>>>>>>>
>>>>>>>>> I have started to implement this ticket. For now it is implemented
>>>>>>
>>>>>> as a
>>>>>>>>>
>>>>>>>>> PTransform that simply does ParDo.of(new DoFn) and all the
>>>>>>
>>>>>> processing
>>>>>>>>>
>>>>>>>>> related to batching is done in the DoFn.
>>>>>>>>>
>>>>>>>>> I'm starting to deal with windows and bundles (starting to take a
>>>>>>>>> look at
>>>>>>>>> the State API to process trans-bundles, more questions about this
>>>>>>
>>>>>> to
>>>>>>>>>
>>>>>>>>> come).
>>>>>>>>> My comments/questions are inline:
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> Le 17/01/2017 à 18:41, Ben Chambers a écrit :
>>>>>>>>>
>>>>>>>>>> We should start by understanding the goals. If elements are in
>>>>>>>>>> different
>>>>>>>>>> windows can they be out in the same batch? If they have different
>>>>>>>>>> timestamps what timestamp should the batch have?
>>>>>>>>>>
>>>>>>>>> Regarding timestamps: currently design is as so: the transform does
>>>>>>
>>>>>> not
>>>>>>>>>
>>>>>>>>> group elements in the PCollection, so the "batch" does not exist as
>>>>>>
>>>>>> an
>>>>>>>>>
>>>>>>>>> element in the PCollection. There is only a user defined function
>>>>>>>>> (perBatchFn) that gets called when batchSize elements have been
>>>>>>>>> processed.
>>>>>>>>> This function takes an ArrayList as parameter. So elements keep
>>>>>>
>>>>>> their
>>>>>>>>>
>>>>>>>>> original timestamps
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> Regarding windowing: I guess that if elements are not in the same
>>>>>>>>> window,
>>>>>>>>> they are not expected to be in the same batch.
>>>>>>>>> I'm just starting to work on these subjects, so I might lack a bit
>>>>>>
>>>>>> of
>>>>>>>>>
>>>>>>>>> information;
>>>>>>>>> what I am currently thinking about is that I need a way to know in
>>>>>>
>>>>>> the
>>>>>>>>>
>>>>>>>>> DoFn that the window has expired so that I can call the perBatchFn
>>>>>>>>> even if
>>>>>>>>> batchSize is not reached.  This is the @OnWindowExpiration callback
>>>>>>>>> that
>>>>>>>>> Kenneth mentioned in an email about bundles.
>>>>>>>>> Lets imagine that we have a collection of elements artificially
>>>>>>>>> timestamped every 10 seconds (for simplicity of the example) and a
>>>>>>>>> fixed
>>>>>>>>> windowing of 1 minute. Then each window contains 6 elements. If we
>>>>>>>>> were to
>>>>>>>>> buffer the elements by batches of 5 elements, then for each window
>>>>>>
>>>>>> we
>>>>>>>>>
>>>>>>>>> expect to get 2 batches (one of 5 elements, one of 1 element). For
>>>>>>>>> that to
>>>>>>>>> append, we need a @OnWindowExpiration on the DoFn where we call
>>>>>>>>> perBatchFn
>>>>>>>>>
>>>>>>>>> As a composite transform this will likely require a group by key
>>>>>>>>> which may
>>>>>>>>>>
>>>>>>>>>> affect performance. Maybe within a dofn is better.
>>>>>>>>>>
>>>>>>>>> Yes, the processing is done with a DoFn indeed.
>>>>>>>>>
>>>>>>>>>> Then it could be some annotation or API that informs the runner.
>>>>>>>>>> Should
>>>>>>>>>> batch sizes be fixed in the annotation (element count or size) or
>>>>>>>>>> should
>>>>>>>>>> the user have some method that lets them decide when to process a
>>>>>>>>>> batch
>>>>>>>>>> based on the contents?
>>>>>>>>>>
>>>>>>>>> For now, the user passes batchSize as an argument to
>>>>>>>>> BatchParDo.via() it
>>>>>>>>> is a number of elements. But batch based on content might be useful
>>>>>>>>> for the
>>>>>>>>> user. Give hint to the runner might be more flexible for the
>>>>>>
>>>>>> runner.
>>>>>>>>>
>>>>>>>>> Thanks.
>>>>>>>>>
>>>>>>>>>> Another thing to think about is whether this should be connected
>>>>>>
>>>>>> to
>>>>>>>>>>
>>>>>>>>>> the
>>>>>>>>>> ability to run parts of the bundle in parallel.
>>>>>>>>>>
>>>>>>>>> Yes!
>>>>>>>>>
>>>>>>>>>> Maybe each batch is an RPC
>>>>>>>>>> and you just want to start an async RPC for each batch. Then in
>>>>>>>>>> addition
>>>>>>>>>> to
>>>>>>>>>> start the final RPC in finishBundle, you also need to wait for all
>>>>>>
>>>>>> the
>>>>>>>>>>
>>>>>>>>>> RPCs
>>>>>>>>>> to complete.
>>>>>>>>>>
>>>>>>>>> Actually, currently each batch processing is whatever the user
>>>>>>
>>>>>> wants
>>>>>>>>>
>>>>>>>>> (perBatchFn user defined function). If the user decides to issue an
>>>>>>>>> async
>>>>>>>>> RPC in that function (call with the arrayList of input elements),
>>>>>>>>> IMHO he
>>>>>>>>> is responsible for waiting for the response in that method if he
>>>>>>>>> needs the
>>>>>>>>> response, but he can also do a send and forget, depending on his
>>>>>>
>>>>>> use
>>>>>>>>>
>>>>>>>>> case.
>>>>>>>>>
>>>>>>>>> Besides, I have also included a perElementFn user function to allow
>>>>>>
>>>>>> the
>>>>>>>>>
>>>>>>>>> user to do some processing on the elements before adding them to
>>>>>>
>>>>>> the
>>>>>>>>>
>>>>>>>>> batch
>>>>>>>>> (example use case: convert a String to a DTO object to invoke an
>>>>>>>>> external
>>>>>>>>> service)
>>>>>>>>>
>>>>>>>>> Etienne
>>>>>>>>>
>>>>>>>>> On Tue, Jan 17, 2017, 8:48 AM Etienne Chauchot<[email protected]>
>>>>>>>>>>
>>>>>>>>>> wrote:
>>>>>>>>>>
>>>>>>>>>> Hi JB,
>>>>>>>>>>
>>>>>>>>>> I meant jira vote but discussion on the ML works also :)
>>>>>>>>>>
>>>>>>>>>> As I understand the need (see stackoverflow links in jira ticket)
>>>>>>
>>>>>> the
>>>>>>>>>>
>>>>>>>>>> aim is to avoid the user having to code the batching logic in his
>>>>>>
>>>>>> own
>>>>>>>>>>
>>>>>>>>>> DoFn.processElement() and DoFn.finishBundle() regardless of the
>>>>>>>>>> bundles.
>>>>>>>>>> For example, possible use case is to batch a call to an external
>>>>>>>>>> service
>>>>>>>>>> (for performance).
>>>>>>>>>>
>>>>>>>>>> I was thinking about providing a PTransform that implements the
>>>>>>>>>> batching
>>>>>>>>>> in its own DoFn and that takes user defined functions for
>>>>>>>>>> customization.
>>>>>>>>>>
>>>>>>>>>> Etienne
>>>>>>>>>>
>>>>>>>>>> Le 17/01/2017 à 17:30, Jean-Baptiste Onofré a écrit :
>>>>>>>>>>
>>>>>>>>>>> Hi
>>>>>>>>>>>
>>>>>>>>>>> I guess you mean discussion on the mailing list about that, right
>>>>>>
>>>>>> ?
>>>>>>>>>>>
>>>>>>>>>>> AFAIR the ide⁣a is to provide a utility class to deal with
>>>>>>>>>>>
>>>>>>>>>> pooling/batching. However not sure it's required as with
>>>>>>>>>> @StartBundle etc
>>>>>>>>>> in DoFn and batching depends of the end user "logic".
>>>>>>>>>>
>>>>>>>>>>> Regards
>>>>>>>>>>> JB
>>>>>>>>>>>
>>>>>>>>>>> On Jan 17, 2017, 08:26, at 08:26, Etienne
>>>>>>>>>>> Chauchot<[email protected]>
>>>>>>>>>>>
>>>>>>>>>> wrote:
>>>>>>>>>>
>>>>>>>>>>> Hi all,
>>>>>>>>>>>>
>>>>>>>>>>>> I have started to work on this ticket
>>>>>>>>>>>> https://issues.apache.org/jira/browse/BEAM-135
>>>>>>>>>>>>
>>>>>>>>>>>> As there where no vote since March 18th, is the issue still
>>>>>>>>>>>> relevant/needed?
>>>>>>>>>>>>
>>>>>>>>>>>> Regards,
>>>>>>>>>>>>
>>>>>>>>>>>> Etienne
>>>>>>>>>>>>
>>>
>>
>

Reply via email to