Great to know you get it working on Dataflow easily Reuven. As a new
feature it
looks great!

Agree with Kenn maybe worth to open a new thread to discuss the changes
still
needed to support this in portable runners.

On Mon, Feb 10, 2020 at 8:25 PM Kenneth Knowles <k...@apache.org> wrote:

> I think the (lack of) portability bit may have been buried in this thread.
> Maybe a new thread about the design for that?
>
> Kenn
>
> On Sun, Feb 9, 2020 at 11:36 AM Reuven Lax <re...@google.com> wrote:
>
>> FYI, this is now fixed for Dataflow. I also added better rejection so
>> that runners that don't support this feature will reject the pipeline.
>>
>> On Sat, Feb 8, 2020 at 12:10 AM Reuven Lax <re...@google.com> wrote:
>>
>>> I took a look, and I think this was a simple bug. Testing a fix now.
>>>
>>> A larger question is how to support this in the portability layer. Right
>>> now portability assumes that each timer id corresponds to a logical input
>>> PCollection, but that assumption no longer works as we now support a
>>> dynamic set of timers, each with their own id. We could instead model each
>>> timer family as a PColleciton, but the FnApiRunner would need to
>>> dynamically get the timer id in order to invoke it, and today it statically
>>> reads the timer id from the PCollection name.
>>>
>>> Reuven
>>>
>>> On Fri, Feb 7, 2020 at 2:22 PM Reuven Lax <re...@google.com> wrote:
>>>
>>>> Thanks for finding this. Hopefully the bug is easy .to fix. The tests
>>>> indeed never ran on any runner except for the DirectRunner, which is
>>>> something I should've noticed in the code review.
>>>>
>>>> Reuven
>>>>
>>>> On Mon, Feb 3, 2020 at 12:50 AM Ismaël Mejía <ieme...@gmail.com> wrote:
>>>>
>>>>> I had a discussion with Rehman last week and we discovered that the
>>>>> TimersMap
>>>>> related tests were not running for all runners because they were not
>>>>> tagged as
>>>>> part of the ValidatesRunner category. I opened a PR [1] to enable
>>>>> this, so
>>>>> please someone help me with the review/merge.
>>>>>
>>>>> I took a look just for curiosity and discovered that they are only
>>>>> passing for
>>>>> Direct runner and for the classic Flink runner in batch mode. They are
>>>>> not
>>>>> passing for Dataflow [2][3] and for the Portable Flink runner, so
>>>>> probably worth
>>>>> to reopen the issue to investigate/fix.
>>>>>
>>>>> [1] https://github.com/apache/beam/pull/10747
>>>>> [2]
>>>>> https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_PR/210/
>>>>> [3]
>>>>> https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_PortabilityApi_Dataflow_PR/76/
>>>>>
>>>>>
>>>>> On Sat, Jan 25, 2020 at 1:26 AM Reuven Lax <re...@google.com> wrote:
>>>>>
>>>>>> Yes. For now we exclude the flink runner, but fixing this should be
>>>>>> fairly trivial.
>>>>>>
>>>>>> On Fri, Jan 24, 2020 at 3:35 PM Maximilian Michels <m...@apache.org>
>>>>>> wrote:
>>>>>>
>>>>>>> The Flink Runner was allowing to set a timer multiple times before
>>>>>>> we
>>>>>>> made it comply with the Beam semantics of overwriting past
>>>>>>> invocations.
>>>>>>> I wouldn't be surprised if the Spark Runner never addressed this.
>>>>>>> Flink
>>>>>>> and Spark itself allow for a timer to be set to multiple times. In
>>>>>>> order
>>>>>>> to fix this for Beam, the Flink Runner has to maintain a
>>>>>>> checkpointed
>>>>>>> map which sits outside of its builtin TimerService.
>>>>>>>
>>>>>>> As far as I can see, multiple timer families are currently not
>>>>>>> supported
>>>>>>> in the Flink Runner due to the map not taking the family name into
>>>>>>> account. This can be easily fixed though.
>>>>>>>
>>>>>>> -Max
>>>>>>>
>>>>>>> On 24.01.20 21:31, Reuven Lax wrote:
>>>>>>> > The new timer family is in the portability protos. I think
>>>>>>> TimerReceiver
>>>>>>> > needs to be updated to set it though (I think a 1-line change).
>>>>>>> >
>>>>>>> > The TimerInternals class that runners implement today already
>>>>>>> handles
>>>>>>> > dynamic timers, so most of the work was in the Beam SDK  to
>>>>>>> provide an
>>>>>>> > API that allows users to access this feature.
>>>>>>> >
>>>>>>> > The main work needed in the runner was to take in account the
>>>>>>> timer
>>>>>>> > family. Beam semantics say that if a timer is set twice with the
>>>>>>> same
>>>>>>> > id, then the second timer overwrites the first.  Several runners
>>>>>>> > therefore had maps from timer id -> timer. However since the
>>>>>>> > timer family scopes the timers, we now allow two timers with the
>>>>>>> same id
>>>>>>> > as long as the timer families are different. Runners had to be
>>>>>>> updated
>>>>>>> > to include the timer family id in the map keys.
>>>>>>> >
>>>>>>> > Surprisingly, the new TimerMap tests seem to pass on Spark
>>>>>>> > ValidatesRunner, even though the Spark runner wasn't updated! I
>>>>>>> wonder
>>>>>>> > if this means that the Spark runner was incorrectly implementing
>>>>>>> the
>>>>>>> > Beam semantics before, and setTimer was not overwriting timers
>>>>>>> with the
>>>>>>> > same id?
>>>>>>> >
>>>>>>> > Reuven
>>>>>>> >
>>>>>>> > On Fri, Jan 24, 2020 at 7:31 AM Ismaël Mejía <ieme...@gmail.com
>>>>>>> > <mailto:ieme...@gmail.com>> wrote:
>>>>>>> >
>>>>>>> >     This looks great, thanks for the contribution Rehman!
>>>>>>> >
>>>>>>> >     I have some questions (note I have not looked at the code at
>>>>>>> all).
>>>>>>> >
>>>>>>> >     - Is this working for both portable and non portable runners?
>>>>>>> >     - What do other runners need to implement to support this
>>>>>>> (e.g. Spark)?
>>>>>>> >
>>>>>>> >     Maybe worth to add this to the website Compatibility Matrix.
>>>>>>> >
>>>>>>> >     Regards,
>>>>>>> >     Ismaël
>>>>>>> >
>>>>>>> >
>>>>>>> >     On Fri, Jan 24, 2020 at 8:42 AM Rehman Murad Ali
>>>>>>> >     <rehman.murad...@venturedive.com
>>>>>>> >     <mailto:rehman.murad...@venturedive.com>> wrote:
>>>>>>> >
>>>>>>> >         Thank you Reuven for the guidance throughout the
>>>>>>> development
>>>>>>> >         process. I am delighted to contribute my two cents to the
>>>>>>> Beam
>>>>>>> >         project.
>>>>>>> >
>>>>>>> >         Looking forward to more active contributions.
>>>>>>> >
>>>>>>> >         *
>>>>>>> >         *
>>>>>>> >
>>>>>>> >         *Thanks & Regards____*
>>>>>>> >
>>>>>>> >
>>>>>>> >
>>>>>>> >         *Rehman Murad Ali*
>>>>>>> >         Software Engineer
>>>>>>> >         Mobile: +92 3452076766 <+92%20345%202076766>
>>>>>>> <tel:+92%20345%202076766>
>>>>>>> >         Skype: rehman.muradali
>>>>>>> >
>>>>>>> >
>>>>>>> >
>>>>>>> >         On Thu, Jan 23, 2020 at 11:09 PM Reuven Lax <
>>>>>>> re...@google.com
>>>>>>> >         <mailto:re...@google.com>> wrote:
>>>>>>> >
>>>>>>> >             Thanks to a lot of hard work by Rehman, Beam now
>>>>>>> supports
>>>>>>> >             dynamic timers. As a reminder, this was discussed on
>>>>>>> the dev
>>>>>>> >             list some time back.
>>>>>>> >
>>>>>>> >             As background, previously one had to statically
>>>>>>> declare all
>>>>>>> >             timers in your code. So if you wanted to have two
>>>>>>> timers,
>>>>>>> >             you needed to create two timer variables and two
>>>>>>> callbacks -
>>>>>>> >             one for each timer. A number of users kept hitting
>>>>>>> stumbling
>>>>>>> >             blocks where they needed a dynamic set of timers (often
>>>>>>> >             based on the element), which was not supported in
>>>>>>> Beam. The
>>>>>>> >             workarounds were quite ugly and complicated.
>>>>>>> >
>>>>>>> >             The new support allows declaring a TimerMap, which is
>>>>>>> a map
>>>>>>> >             of timers. Each TimerMap is scoped by a family name,
>>>>>>> so you
>>>>>>> >             can create multiple TimerMaps each with its own
>>>>>>> callback.
>>>>>>> >             The use looks as follows:
>>>>>>> >
>>>>>>> >             class MyDoFn extends DoFn<...> {
>>>>>>> >                 @TimerFamily("timers")
>>>>>>> >                 private final TimerSpec timerMap =
>>>>>>> >             TimerSpecs.timerMap(TimeDomain.EVENT_TIME);
>>>>>>> >
>>>>>>> >                 @ProcessElement
>>>>>>> >                  public void process(@TimerFamily("timers")
>>>>>>> TimerMap
>>>>>>> >             timers, @Element Type e) {
>>>>>>> >                     timers.set("mainTimer", timestamp);
>>>>>>> >                     timers.set("actionType" + e.getActionType(),
>>>>>>> timestamp);
>>>>>>> >                 }
>>>>>>> >
>>>>>>> >                @OnTimerFamily .
>>>>>>> >                public void onTimer(@TimerId String timerId) {
>>>>>>> >                   System.out.println("Timer fired. id: " +
>>>>>>> timerId);
>>>>>>> >                }
>>>>>>> >             }
>>>>>>> >
>>>>>>> >             This currently works for the Flink and the Dataflow
>>>>>>> runners.
>>>>>>> >
>>>>>>> >             Thank you Rehman for getting this done! Beam users
>>>>>>> will find
>>>>>>> >             it very valuable.
>>>>>>> >
>>>>>>> >             Reuven
>>>>>>> >
>>>>>>>
>>>>>>

Reply via email to