+1 for portable implementation and design. Having features only developed
using the non-portable implementation in mind will mean that the
portability effort gets bogged down with filling in features that were
partially completed.

On Fri, Feb 14, 2020 at 8:52 AM Ismaël Mejía <[email protected]> wrote:

> Apparently the fix of Dynamic Timers for Dataflow broke the
> ValidatesRunner tests for Flink in batch mode that were passing before.
> Can you please take a look Reuven or Rehman.
> Tests are failing since the exact commit for the fix:
> 7719708a04d5d0eff3048dbd58ac1337889f8ba5
> For details on the exception:
>
> https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/6608/
>
>
> On Wed, Feb 12, 2020 at 10:37 AM Ismaël Mejía <[email protected]> wrote:
>
>> Great to know you get it working on Dataflow easily Reuven. As a new
>> feature it
>> looks great!
>>
>> Agree with Kenn maybe worth to open a new thread to discuss the changes
>> still
>> needed to support this in portable runners.
>>
>> On Mon, Feb 10, 2020 at 8:25 PM Kenneth Knowles <[email protected]> wrote:
>>
>>> I think the (lack of) portability bit may have been buried in this
>>> thread. Maybe a new thread about the design for that?
>>>
>>> Kenn
>>>
>>> On Sun, Feb 9, 2020 at 11:36 AM Reuven Lax <[email protected]> wrote:
>>>
>>>> FYI, this is now fixed for Dataflow. I also added better rejection so
>>>> that runners that don't support this feature will reject the pipeline.
>>>>
>>>> On Sat, Feb 8, 2020 at 12:10 AM Reuven Lax <[email protected]> wrote:
>>>>
>>>>> I took a look, and I think this was a simple bug. Testing a fix now.
>>>>>
>>>>> A larger question is how to support this in the portability layer.
>>>>> Right now portability assumes that each timer id corresponds to a logical
>>>>> input PCollection, but that assumption no longer works as we now support a
>>>>> dynamic set of timers, each with their own id. We could instead model each
>>>>> timer family as a PColleciton, but the FnApiRunner would need to
>>>>> dynamically get the timer id in order to invoke it, and today it 
>>>>> statically
>>>>> reads the timer id from the PCollection name.
>>>>>
>>>>> Reuven
>>>>>
>>>>> On Fri, Feb 7, 2020 at 2:22 PM Reuven Lax <[email protected]> wrote:
>>>>>
>>>>>> Thanks for finding this. Hopefully the bug is easy .to fix. The tests
>>>>>> indeed never ran on any runner except for the DirectRunner, which is
>>>>>> something I should've noticed in the code review.
>>>>>>
>>>>>> Reuven
>>>>>>
>>>>>> On Mon, Feb 3, 2020 at 12:50 AM Ismaël Mejía <[email protected]>
>>>>>> wrote:
>>>>>>
>>>>>>> I had a discussion with Rehman last week and we discovered that the
>>>>>>> TimersMap
>>>>>>> related tests were not running for all runners because they were not
>>>>>>> tagged as
>>>>>>> part of the ValidatesRunner category. I opened a PR [1] to enable
>>>>>>> this, so
>>>>>>> please someone help me with the review/merge.
>>>>>>>
>>>>>>> I took a look just for curiosity and discovered that they are only
>>>>>>> passing for
>>>>>>> Direct runner and for the classic Flink runner in batch mode. They
>>>>>>> are not
>>>>>>> passing for Dataflow [2][3] and for the Portable Flink runner, so
>>>>>>> probably worth
>>>>>>> to reopen the issue to investigate/fix.
>>>>>>>
>>>>>>> [1] https://github.com/apache/beam/pull/10747
>>>>>>> [2]
>>>>>>> https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_PR/210/
>>>>>>> [3]
>>>>>>> https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_PortabilityApi_Dataflow_PR/76/
>>>>>>>
>>>>>>>
>>>>>>> On Sat, Jan 25, 2020 at 1:26 AM Reuven Lax <[email protected]> wrote:
>>>>>>>
>>>>>>>> Yes. For now we exclude the flink runner, but fixing this should be
>>>>>>>> fairly trivial.
>>>>>>>>
>>>>>>>> On Fri, Jan 24, 2020 at 3:35 PM Maximilian Michels <[email protected]>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> The Flink Runner was allowing to set a timer multiple times before
>>>>>>>>> we
>>>>>>>>> made it comply with the Beam semantics of overwriting past
>>>>>>>>> invocations.
>>>>>>>>> I wouldn't be surprised if the Spark Runner never addressed this.
>>>>>>>>> Flink
>>>>>>>>> and Spark itself allow for a timer to be set to multiple times. In
>>>>>>>>> order
>>>>>>>>> to fix this for Beam, the Flink Runner has to maintain a
>>>>>>>>> checkpointed
>>>>>>>>> map which sits outside of its builtin TimerService.
>>>>>>>>>
>>>>>>>>> As far as I can see, multiple timer families are currently not
>>>>>>>>> supported
>>>>>>>>> in the Flink Runner due to the map not taking the family name into
>>>>>>>>> account. This can be easily fixed though.
>>>>>>>>>
>>>>>>>>> -Max
>>>>>>>>>
>>>>>>>>> On 24.01.20 21:31, Reuven Lax wrote:
>>>>>>>>> > The new timer family is in the portability protos. I think
>>>>>>>>> TimerReceiver
>>>>>>>>> > needs to be updated to set it though (I think a 1-line change).
>>>>>>>>> >
>>>>>>>>> > The TimerInternals class that runners implement today already
>>>>>>>>> handles
>>>>>>>>> > dynamic timers, so most of the work was in the Beam SDK  to
>>>>>>>>> provide an
>>>>>>>>> > API that allows users to access this feature.
>>>>>>>>> >
>>>>>>>>> > The main work needed in the runner was to take in account the
>>>>>>>>> timer
>>>>>>>>> > family. Beam semantics say that if a timer is set twice with the
>>>>>>>>> same
>>>>>>>>> > id, then the second timer overwrites the first.  Several runners
>>>>>>>>> > therefore had maps from timer id -> timer. However since the
>>>>>>>>> > timer family scopes the timers, we now allow two timers with the
>>>>>>>>> same id
>>>>>>>>> > as long as the timer families are different. Runners had to be
>>>>>>>>> updated
>>>>>>>>> > to include the timer family id in the map keys.
>>>>>>>>> >
>>>>>>>>> > Surprisingly, the new TimerMap tests seem to pass on Spark
>>>>>>>>> > ValidatesRunner, even though the Spark runner wasn't updated! I
>>>>>>>>> wonder
>>>>>>>>> > if this means that the Spark runner was incorrectly implementing
>>>>>>>>> the
>>>>>>>>> > Beam semantics before, and setTimer was not overwriting timers
>>>>>>>>> with the
>>>>>>>>> > same id?
>>>>>>>>> >
>>>>>>>>> > Reuven
>>>>>>>>> >
>>>>>>>>> > On Fri, Jan 24, 2020 at 7:31 AM Ismaël Mejía <[email protected]
>>>>>>>>> > <mailto:[email protected]>> wrote:
>>>>>>>>> >
>>>>>>>>> >     This looks great, thanks for the contribution Rehman!
>>>>>>>>> >
>>>>>>>>> >     I have some questions (note I have not looked at the code at
>>>>>>>>> all).
>>>>>>>>> >
>>>>>>>>> >     - Is this working for both portable and non portable runners?
>>>>>>>>> >     - What do other runners need to implement to support this
>>>>>>>>> (e.g. Spark)?
>>>>>>>>> >
>>>>>>>>> >     Maybe worth to add this to the website Compatibility Matrix.
>>>>>>>>> >
>>>>>>>>> >     Regards,
>>>>>>>>> >     Ismaël
>>>>>>>>> >
>>>>>>>>> >
>>>>>>>>> >     On Fri, Jan 24, 2020 at 8:42 AM Rehman Murad Ali
>>>>>>>>> >     <[email protected]
>>>>>>>>> >     <mailto:[email protected]>> wrote:
>>>>>>>>> >
>>>>>>>>> >         Thank you Reuven for the guidance throughout the
>>>>>>>>> development
>>>>>>>>> >         process. I am delighted to contribute my two cents to
>>>>>>>>> the Beam
>>>>>>>>> >         project.
>>>>>>>>> >
>>>>>>>>> >         Looking forward to more active contributions.
>>>>>>>>> >
>>>>>>>>> >         *
>>>>>>>>> >         *
>>>>>>>>> >
>>>>>>>>> >         *Thanks & Regards____*
>>>>>>>>> >
>>>>>>>>> >
>>>>>>>>> >
>>>>>>>>> >         *Rehman Murad Ali*
>>>>>>>>> >         Software Engineer
>>>>>>>>> >         Mobile: +92 3452076766 <+92%20345%202076766>
>>>>>>>>> <tel:+92%20345%202076766>
>>>>>>>>> >         Skype: rehman.muradali
>>>>>>>>> >
>>>>>>>>> >
>>>>>>>>> >
>>>>>>>>> >         On Thu, Jan 23, 2020 at 11:09 PM Reuven Lax <
>>>>>>>>> [email protected]
>>>>>>>>> >         <mailto:[email protected]>> wrote:
>>>>>>>>> >
>>>>>>>>> >             Thanks to a lot of hard work by Rehman, Beam now
>>>>>>>>> supports
>>>>>>>>> >             dynamic timers. As a reminder, this was discussed on
>>>>>>>>> the dev
>>>>>>>>> >             list some time back.
>>>>>>>>> >
>>>>>>>>> >             As background, previously one had to statically
>>>>>>>>> declare all
>>>>>>>>> >             timers in your code. So if you wanted to have two
>>>>>>>>> timers,
>>>>>>>>> >             you needed to create two timer variables and two
>>>>>>>>> callbacks -
>>>>>>>>> >             one for each timer. A number of users kept hitting
>>>>>>>>> stumbling
>>>>>>>>> >             blocks where they needed a dynamic set of timers
>>>>>>>>> (often
>>>>>>>>> >             based on the element), which was not supported in
>>>>>>>>> Beam. The
>>>>>>>>> >             workarounds were quite ugly and complicated.
>>>>>>>>> >
>>>>>>>>> >             The new support allows declaring a TimerMap, which
>>>>>>>>> is a map
>>>>>>>>> >             of timers. Each TimerMap is scoped by a family name,
>>>>>>>>> so you
>>>>>>>>> >             can create multiple TimerMaps each with its own
>>>>>>>>> callback.
>>>>>>>>> >             The use looks as follows:
>>>>>>>>> >
>>>>>>>>> >             class MyDoFn extends DoFn<...> {
>>>>>>>>> >                 @TimerFamily("timers")
>>>>>>>>> >                 private final TimerSpec timerMap =
>>>>>>>>> >             TimerSpecs.timerMap(TimeDomain.EVENT_TIME);
>>>>>>>>> >
>>>>>>>>> >                 @ProcessElement
>>>>>>>>> >                  public void process(@TimerFamily("timers")
>>>>>>>>> TimerMap
>>>>>>>>> >             timers, @Element Type e) {
>>>>>>>>> >                     timers.set("mainTimer", timestamp);
>>>>>>>>> >                     timers.set("actionType" + e.getActionType(),
>>>>>>>>> timestamp);
>>>>>>>>> >                 }
>>>>>>>>> >
>>>>>>>>> >                @OnTimerFamily .
>>>>>>>>> >                public void onTimer(@TimerId String timerId) {
>>>>>>>>> >                   System.out.println("Timer fired. id: " +
>>>>>>>>> timerId);
>>>>>>>>> >                }
>>>>>>>>> >             }
>>>>>>>>> >
>>>>>>>>> >             This currently works for the Flink and the Dataflow
>>>>>>>>> runners.
>>>>>>>>> >
>>>>>>>>> >             Thank you Rehman for getting this done! Beam users
>>>>>>>>> will find
>>>>>>>>> >             it very valuable.
>>>>>>>>> >
>>>>>>>>> >             Reuven
>>>>>>>>> >
>>>>>>>>>
>>>>>>>>

Reply via email to