I think the (lack of) portability bit may have been buried in this thread.
Maybe a new thread about the design for that?

Kenn

On Sun, Feb 9, 2020 at 11:36 AM Reuven Lax <[email protected]> wrote:

> FYI, this is now fixed for Dataflow. I also added better rejection so that
> runners that don't support this feature will reject the pipeline.
>
> On Sat, Feb 8, 2020 at 12:10 AM Reuven Lax <[email protected]> wrote:
>
>> I took a look, and I think this was a simple bug. Testing a fix now.
>>
>> A larger question is how to support this in the portability layer. Right
>> now portability assumes that each timer id corresponds to a logical input
>> PCollection, but that assumption no longer works as we now support a
>> dynamic set of timers, each with their own id. We could instead model each
>> timer family as a PColleciton, but the FnApiRunner would need to
>> dynamically get the timer id in order to invoke it, and today it statically
>> reads the timer id from the PCollection name.
>>
>> Reuven
>>
>> On Fri, Feb 7, 2020 at 2:22 PM Reuven Lax <[email protected]> wrote:
>>
>>> Thanks for finding this. Hopefully the bug is easy .to fix. The tests
>>> indeed never ran on any runner except for the DirectRunner, which is
>>> something I should've noticed in the code review.
>>>
>>> Reuven
>>>
>>> On Mon, Feb 3, 2020 at 12:50 AM Ismaël Mejía <[email protected]> wrote:
>>>
>>>> I had a discussion with Rehman last week and we discovered that the
>>>> TimersMap
>>>> related tests were not running for all runners because they were not
>>>> tagged as
>>>> part of the ValidatesRunner category. I opened a PR [1] to enable this,
>>>> so
>>>> please someone help me with the review/merge.
>>>>
>>>> I took a look just for curiosity and discovered that they are only
>>>> passing for
>>>> Direct runner and for the classic Flink runner in batch mode. They are
>>>> not
>>>> passing for Dataflow [2][3] and for the Portable Flink runner, so
>>>> probably worth
>>>> to reopen the issue to investigate/fix.
>>>>
>>>> [1] https://github.com/apache/beam/pull/10747
>>>> [2]
>>>> https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_PR/210/
>>>> [3]
>>>> https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_PortabilityApi_Dataflow_PR/76/
>>>>
>>>>
>>>> On Sat, Jan 25, 2020 at 1:26 AM Reuven Lax <[email protected]> wrote:
>>>>
>>>>> Yes. For now we exclude the flink runner, but fixing this should be
>>>>> fairly trivial.
>>>>>
>>>>> On Fri, Jan 24, 2020 at 3:35 PM Maximilian Michels <[email protected]>
>>>>> wrote:
>>>>>
>>>>>> The Flink Runner was allowing to set a timer multiple times before we
>>>>>> made it comply with the Beam semantics of overwriting past
>>>>>> invocations.
>>>>>> I wouldn't be surprised if the Spark Runner never addressed this.
>>>>>> Flink
>>>>>> and Spark itself allow for a timer to be set to multiple times. In
>>>>>> order
>>>>>> to fix this for Beam, the Flink Runner has to maintain a checkpointed
>>>>>> map which sits outside of its builtin TimerService.
>>>>>>
>>>>>> As far as I can see, multiple timer families are currently not
>>>>>> supported
>>>>>> in the Flink Runner due to the map not taking the family name into
>>>>>> account. This can be easily fixed though.
>>>>>>
>>>>>> -Max
>>>>>>
>>>>>> On 24.01.20 21:31, Reuven Lax wrote:
>>>>>> > The new timer family is in the portability protos. I think
>>>>>> TimerReceiver
>>>>>> > needs to be updated to set it though (I think a 1-line change).
>>>>>> >
>>>>>> > The TimerInternals class that runners implement today already
>>>>>> handles
>>>>>> > dynamic timers, so most of the work was in the Beam SDK  to provide
>>>>>> an
>>>>>> > API that allows users to access this feature.
>>>>>> >
>>>>>> > The main work needed in the runner was to take in account the timer
>>>>>> > family. Beam semantics say that if a timer is set twice with the
>>>>>> same
>>>>>> > id, then the second timer overwrites the first.  Several runners
>>>>>> > therefore had maps from timer id -> timer. However since the
>>>>>> > timer family scopes the timers, we now allow two timers with the
>>>>>> same id
>>>>>> > as long as the timer families are different. Runners had to be
>>>>>> updated
>>>>>> > to include the timer family id in the map keys.
>>>>>> >
>>>>>> > Surprisingly, the new TimerMap tests seem to pass on Spark
>>>>>> > ValidatesRunner, even though the Spark runner wasn't updated! I
>>>>>> wonder
>>>>>> > if this means that the Spark runner was incorrectly implementing
>>>>>> the
>>>>>> > Beam semantics before, and setTimer was not overwriting timers with
>>>>>> the
>>>>>> > same id?
>>>>>> >
>>>>>> > Reuven
>>>>>> >
>>>>>> > On Fri, Jan 24, 2020 at 7:31 AM Ismaël Mejía <[email protected]
>>>>>> > <mailto:[email protected]>> wrote:
>>>>>> >
>>>>>> >     This looks great, thanks for the contribution Rehman!
>>>>>> >
>>>>>> >     I have some questions (note I have not looked at the code at
>>>>>> all).
>>>>>> >
>>>>>> >     - Is this working for both portable and non portable runners?
>>>>>> >     - What do other runners need to implement to support this (e.g.
>>>>>> Spark)?
>>>>>> >
>>>>>> >     Maybe worth to add this to the website Compatibility Matrix.
>>>>>> >
>>>>>> >     Regards,
>>>>>> >     Ismaël
>>>>>> >
>>>>>> >
>>>>>> >     On Fri, Jan 24, 2020 at 8:42 AM Rehman Murad Ali
>>>>>> >     <[email protected]
>>>>>> >     <mailto:[email protected]>> wrote:
>>>>>> >
>>>>>> >         Thank you Reuven for the guidance throughout the development
>>>>>> >         process. I am delighted to contribute my two cents to the
>>>>>> Beam
>>>>>> >         project.
>>>>>> >
>>>>>> >         Looking forward to more active contributions.
>>>>>> >
>>>>>> >         *
>>>>>> >         *
>>>>>> >
>>>>>> >         *Thanks & Regards____*
>>>>>> >
>>>>>> >
>>>>>> >
>>>>>> >         *Rehman Murad Ali*
>>>>>> >         Software Engineer
>>>>>> >         Mobile: +92 3452076766 <+92%20345%202076766>
>>>>>> <tel:+92%20345%202076766>
>>>>>> >         Skype: rehman.muradali
>>>>>> >
>>>>>> >
>>>>>> >
>>>>>> >         On Thu, Jan 23, 2020 at 11:09 PM Reuven Lax <
>>>>>> [email protected]
>>>>>> >         <mailto:[email protected]>> wrote:
>>>>>> >
>>>>>> >             Thanks to a lot of hard work by Rehman, Beam now
>>>>>> supports
>>>>>> >             dynamic timers. As a reminder, this was discussed on
>>>>>> the dev
>>>>>> >             list some time back.
>>>>>> >
>>>>>> >             As background, previously one had to statically declare
>>>>>> all
>>>>>> >             timers in your code. So if you wanted to have two
>>>>>> timers,
>>>>>> >             you needed to create two timer variables and two
>>>>>> callbacks -
>>>>>> >             one for each timer. A number of users kept hitting
>>>>>> stumbling
>>>>>> >             blocks where they needed a dynamic set of timers (often
>>>>>> >             based on the element), which was not supported in Beam.
>>>>>> The
>>>>>> >             workarounds were quite ugly and complicated.
>>>>>> >
>>>>>> >             The new support allows declaring a TimerMap, which is a
>>>>>> map
>>>>>> >             of timers. Each TimerMap is scoped by a family name, so
>>>>>> you
>>>>>> >             can create multiple TimerMaps each with its own
>>>>>> callback.
>>>>>> >             The use looks as follows:
>>>>>> >
>>>>>> >             class MyDoFn extends DoFn<...> {
>>>>>> >                 @TimerFamily("timers")
>>>>>> >                 private final TimerSpec timerMap =
>>>>>> >             TimerSpecs.timerMap(TimeDomain.EVENT_TIME);
>>>>>> >
>>>>>> >                 @ProcessElement
>>>>>> >                  public void process(@TimerFamily("timers") TimerMap
>>>>>> >             timers, @Element Type e) {
>>>>>> >                     timers.set("mainTimer", timestamp);
>>>>>> >                     timers.set("actionType" + e.getActionType(),
>>>>>> timestamp);
>>>>>> >                 }
>>>>>> >
>>>>>> >                @OnTimerFamily .
>>>>>> >                public void onTimer(@TimerId String timerId) {
>>>>>> >                   System.out.println("Timer fired. id: " + timerId);
>>>>>> >                }
>>>>>> >             }
>>>>>> >
>>>>>> >             This currently works for the Flink and the Dataflow
>>>>>> runners.
>>>>>> >
>>>>>> >             Thank you Rehman for getting this done! Beam users will
>>>>>> find
>>>>>> >             it very valuable.
>>>>>> >
>>>>>> >             Reuven
>>>>>> >
>>>>>>
>>>>>

Reply via email to