Great to know you get it working on Dataflow easily Reuven. As a new feature it looks great!
Agree with Kenn maybe worth to open a new thread to discuss the changes still needed to support this in portable runners. On Mon, Feb 10, 2020 at 8:25 PM Kenneth Knowles <k...@apache.org> wrote: > I think the (lack of) portability bit may have been buried in this thread. > Maybe a new thread about the design for that? > > Kenn > > On Sun, Feb 9, 2020 at 11:36 AM Reuven Lax <re...@google.com> wrote: > >> FYI, this is now fixed for Dataflow. I also added better rejection so >> that runners that don't support this feature will reject the pipeline. >> >> On Sat, Feb 8, 2020 at 12:10 AM Reuven Lax <re...@google.com> wrote: >> >>> I took a look, and I think this was a simple bug. Testing a fix now. >>> >>> A larger question is how to support this in the portability layer. Right >>> now portability assumes that each timer id corresponds to a logical input >>> PCollection, but that assumption no longer works as we now support a >>> dynamic set of timers, each with their own id. We could instead model each >>> timer family as a PColleciton, but the FnApiRunner would need to >>> dynamically get the timer id in order to invoke it, and today it statically >>> reads the timer id from the PCollection name. >>> >>> Reuven >>> >>> On Fri, Feb 7, 2020 at 2:22 PM Reuven Lax <re...@google.com> wrote: >>> >>>> Thanks for finding this. Hopefully the bug is easy .to fix. The tests >>>> indeed never ran on any runner except for the DirectRunner, which is >>>> something I should've noticed in the code review. >>>> >>>> Reuven >>>> >>>> On Mon, Feb 3, 2020 at 12:50 AM Ismaël Mejía <ieme...@gmail.com> wrote: >>>> >>>>> I had a discussion with Rehman last week and we discovered that the >>>>> TimersMap >>>>> related tests were not running for all runners because they were not >>>>> tagged as >>>>> part of the ValidatesRunner category. I opened a PR [1] to enable >>>>> this, so >>>>> please someone help me with the review/merge. >>>>> >>>>> I took a look just for curiosity and discovered that they are only >>>>> passing for >>>>> Direct runner and for the classic Flink runner in batch mode. They are >>>>> not >>>>> passing for Dataflow [2][3] and for the Portable Flink runner, so >>>>> probably worth >>>>> to reopen the issue to investigate/fix. >>>>> >>>>> [1] https://github.com/apache/beam/pull/10747 >>>>> [2] >>>>> https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_PR/210/ >>>>> [3] >>>>> https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_PortabilityApi_Dataflow_PR/76/ >>>>> >>>>> >>>>> On Sat, Jan 25, 2020 at 1:26 AM Reuven Lax <re...@google.com> wrote: >>>>> >>>>>> Yes. For now we exclude the flink runner, but fixing this should be >>>>>> fairly trivial. >>>>>> >>>>>> On Fri, Jan 24, 2020 at 3:35 PM Maximilian Michels <m...@apache.org> >>>>>> wrote: >>>>>> >>>>>>> The Flink Runner was allowing to set a timer multiple times before >>>>>>> we >>>>>>> made it comply with the Beam semantics of overwriting past >>>>>>> invocations. >>>>>>> I wouldn't be surprised if the Spark Runner never addressed this. >>>>>>> Flink >>>>>>> and Spark itself allow for a timer to be set to multiple times. In >>>>>>> order >>>>>>> to fix this for Beam, the Flink Runner has to maintain a >>>>>>> checkpointed >>>>>>> map which sits outside of its builtin TimerService. >>>>>>> >>>>>>> As far as I can see, multiple timer families are currently not >>>>>>> supported >>>>>>> in the Flink Runner due to the map not taking the family name into >>>>>>> account. This can be easily fixed though. >>>>>>> >>>>>>> -Max >>>>>>> >>>>>>> On 24.01.20 21:31, Reuven Lax wrote: >>>>>>> > The new timer family is in the portability protos. I think >>>>>>> TimerReceiver >>>>>>> > needs to be updated to set it though (I think a 1-line change). >>>>>>> > >>>>>>> > The TimerInternals class that runners implement today already >>>>>>> handles >>>>>>> > dynamic timers, so most of the work was in the Beam SDK to >>>>>>> provide an >>>>>>> > API that allows users to access this feature. >>>>>>> > >>>>>>> > The main work needed in the runner was to take in account the >>>>>>> timer >>>>>>> > family. Beam semantics say that if a timer is set twice with the >>>>>>> same >>>>>>> > id, then the second timer overwrites the first. Several runners >>>>>>> > therefore had maps from timer id -> timer. However since the >>>>>>> > timer family scopes the timers, we now allow two timers with the >>>>>>> same id >>>>>>> > as long as the timer families are different. Runners had to be >>>>>>> updated >>>>>>> > to include the timer family id in the map keys. >>>>>>> > >>>>>>> > Surprisingly, the new TimerMap tests seem to pass on Spark >>>>>>> > ValidatesRunner, even though the Spark runner wasn't updated! I >>>>>>> wonder >>>>>>> > if this means that the Spark runner was incorrectly implementing >>>>>>> the >>>>>>> > Beam semantics before, and setTimer was not overwriting timers >>>>>>> with the >>>>>>> > same id? >>>>>>> > >>>>>>> > Reuven >>>>>>> > >>>>>>> > On Fri, Jan 24, 2020 at 7:31 AM Ismaël Mejía <ieme...@gmail.com >>>>>>> > <mailto:ieme...@gmail.com>> wrote: >>>>>>> > >>>>>>> > This looks great, thanks for the contribution Rehman! >>>>>>> > >>>>>>> > I have some questions (note I have not looked at the code at >>>>>>> all). >>>>>>> > >>>>>>> > - Is this working for both portable and non portable runners? >>>>>>> > - What do other runners need to implement to support this >>>>>>> (e.g. Spark)? >>>>>>> > >>>>>>> > Maybe worth to add this to the website Compatibility Matrix. >>>>>>> > >>>>>>> > Regards, >>>>>>> > Ismaël >>>>>>> > >>>>>>> > >>>>>>> > On Fri, Jan 24, 2020 at 8:42 AM Rehman Murad Ali >>>>>>> > <rehman.murad...@venturedive.com >>>>>>> > <mailto:rehman.murad...@venturedive.com>> wrote: >>>>>>> > >>>>>>> > Thank you Reuven for the guidance throughout the >>>>>>> development >>>>>>> > process. I am delighted to contribute my two cents to the >>>>>>> Beam >>>>>>> > project. >>>>>>> > >>>>>>> > Looking forward to more active contributions. >>>>>>> > >>>>>>> > * >>>>>>> > * >>>>>>> > >>>>>>> > *Thanks & Regards____* >>>>>>> > >>>>>>> > >>>>>>> > >>>>>>> > *Rehman Murad Ali* >>>>>>> > Software Engineer >>>>>>> > Mobile: +92 3452076766 <+92%20345%202076766> >>>>>>> <tel:+92%20345%202076766> >>>>>>> > Skype: rehman.muradali >>>>>>> > >>>>>>> > >>>>>>> > >>>>>>> > On Thu, Jan 23, 2020 at 11:09 PM Reuven Lax < >>>>>>> re...@google.com >>>>>>> > <mailto:re...@google.com>> wrote: >>>>>>> > >>>>>>> > Thanks to a lot of hard work by Rehman, Beam now >>>>>>> supports >>>>>>> > dynamic timers. As a reminder, this was discussed on >>>>>>> the dev >>>>>>> > list some time back. >>>>>>> > >>>>>>> > As background, previously one had to statically >>>>>>> declare all >>>>>>> > timers in your code. So if you wanted to have two >>>>>>> timers, >>>>>>> > you needed to create two timer variables and two >>>>>>> callbacks - >>>>>>> > one for each timer. A number of users kept hitting >>>>>>> stumbling >>>>>>> > blocks where they needed a dynamic set of timers (often >>>>>>> > based on the element), which was not supported in >>>>>>> Beam. The >>>>>>> > workarounds were quite ugly and complicated. >>>>>>> > >>>>>>> > The new support allows declaring a TimerMap, which is >>>>>>> a map >>>>>>> > of timers. Each TimerMap is scoped by a family name, >>>>>>> so you >>>>>>> > can create multiple TimerMaps each with its own >>>>>>> callback. >>>>>>> > The use looks as follows: >>>>>>> > >>>>>>> > class MyDoFn extends DoFn<...> { >>>>>>> > @TimerFamily("timers") >>>>>>> > private final TimerSpec timerMap = >>>>>>> > TimerSpecs.timerMap(TimeDomain.EVENT_TIME); >>>>>>> > >>>>>>> > @ProcessElement >>>>>>> > public void process(@TimerFamily("timers") >>>>>>> TimerMap >>>>>>> > timers, @Element Type e) { >>>>>>> > timers.set("mainTimer", timestamp); >>>>>>> > timers.set("actionType" + e.getActionType(), >>>>>>> timestamp); >>>>>>> > } >>>>>>> > >>>>>>> > @OnTimerFamily . >>>>>>> > public void onTimer(@TimerId String timerId) { >>>>>>> > System.out.println("Timer fired. id: " + >>>>>>> timerId); >>>>>>> > } >>>>>>> > } >>>>>>> > >>>>>>> > This currently works for the Flink and the Dataflow >>>>>>> runners. >>>>>>> > >>>>>>> > Thank you Rehman for getting this done! Beam users >>>>>>> will find >>>>>>> > it very valuable. >>>>>>> > >>>>>>> > Reuven >>>>>>> > >>>>>>> >>>>>>