+1 for portable implementation and design. Having features only developed using the non-portable implementation in mind will mean that the portability effort gets bogged down with filling in features that were partially completed.
On Fri, Feb 14, 2020 at 8:52 AM Ismaël Mejía <[email protected]> wrote: > Apparently the fix of Dynamic Timers for Dataflow broke the > ValidatesRunner tests for Flink in batch mode that were passing before. > Can you please take a look Reuven or Rehman. > Tests are failing since the exact commit for the fix: > 7719708a04d5d0eff3048dbd58ac1337889f8ba5 > For details on the exception: > > https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/6608/ > > > On Wed, Feb 12, 2020 at 10:37 AM Ismaël Mejía <[email protected]> wrote: > >> Great to know you get it working on Dataflow easily Reuven. As a new >> feature it >> looks great! >> >> Agree with Kenn maybe worth to open a new thread to discuss the changes >> still >> needed to support this in portable runners. >> >> On Mon, Feb 10, 2020 at 8:25 PM Kenneth Knowles <[email protected]> wrote: >> >>> I think the (lack of) portability bit may have been buried in this >>> thread. Maybe a new thread about the design for that? >>> >>> Kenn >>> >>> On Sun, Feb 9, 2020 at 11:36 AM Reuven Lax <[email protected]> wrote: >>> >>>> FYI, this is now fixed for Dataflow. I also added better rejection so >>>> that runners that don't support this feature will reject the pipeline. >>>> >>>> On Sat, Feb 8, 2020 at 12:10 AM Reuven Lax <[email protected]> wrote: >>>> >>>>> I took a look, and I think this was a simple bug. Testing a fix now. >>>>> >>>>> A larger question is how to support this in the portability layer. >>>>> Right now portability assumes that each timer id corresponds to a logical >>>>> input PCollection, but that assumption no longer works as we now support a >>>>> dynamic set of timers, each with their own id. We could instead model each >>>>> timer family as a PColleciton, but the FnApiRunner would need to >>>>> dynamically get the timer id in order to invoke it, and today it >>>>> statically >>>>> reads the timer id from the PCollection name. >>>>> >>>>> Reuven >>>>> >>>>> On Fri, Feb 7, 2020 at 2:22 PM Reuven Lax <[email protected]> wrote: >>>>> >>>>>> Thanks for finding this. Hopefully the bug is easy .to fix. The tests >>>>>> indeed never ran on any runner except for the DirectRunner, which is >>>>>> something I should've noticed in the code review. >>>>>> >>>>>> Reuven >>>>>> >>>>>> On Mon, Feb 3, 2020 at 12:50 AM Ismaël Mejía <[email protected]> >>>>>> wrote: >>>>>> >>>>>>> I had a discussion with Rehman last week and we discovered that the >>>>>>> TimersMap >>>>>>> related tests were not running for all runners because they were not >>>>>>> tagged as >>>>>>> part of the ValidatesRunner category. I opened a PR [1] to enable >>>>>>> this, so >>>>>>> please someone help me with the review/merge. >>>>>>> >>>>>>> I took a look just for curiosity and discovered that they are only >>>>>>> passing for >>>>>>> Direct runner and for the classic Flink runner in batch mode. They >>>>>>> are not >>>>>>> passing for Dataflow [2][3] and for the Portable Flink runner, so >>>>>>> probably worth >>>>>>> to reopen the issue to investigate/fix. >>>>>>> >>>>>>> [1] https://github.com/apache/beam/pull/10747 >>>>>>> [2] >>>>>>> https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_PR/210/ >>>>>>> [3] >>>>>>> https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_PortabilityApi_Dataflow_PR/76/ >>>>>>> >>>>>>> >>>>>>> On Sat, Jan 25, 2020 at 1:26 AM Reuven Lax <[email protected]> wrote: >>>>>>> >>>>>>>> Yes. For now we exclude the flink runner, but fixing this should be >>>>>>>> fairly trivial. >>>>>>>> >>>>>>>> On Fri, Jan 24, 2020 at 3:35 PM Maximilian Michels <[email protected]> >>>>>>>> wrote: >>>>>>>> >>>>>>>>> The Flink Runner was allowing to set a timer multiple times before >>>>>>>>> we >>>>>>>>> made it comply with the Beam semantics of overwriting past >>>>>>>>> invocations. >>>>>>>>> I wouldn't be surprised if the Spark Runner never addressed this. >>>>>>>>> Flink >>>>>>>>> and Spark itself allow for a timer to be set to multiple times. In >>>>>>>>> order >>>>>>>>> to fix this for Beam, the Flink Runner has to maintain a >>>>>>>>> checkpointed >>>>>>>>> map which sits outside of its builtin TimerService. >>>>>>>>> >>>>>>>>> As far as I can see, multiple timer families are currently not >>>>>>>>> supported >>>>>>>>> in the Flink Runner due to the map not taking the family name into >>>>>>>>> account. This can be easily fixed though. >>>>>>>>> >>>>>>>>> -Max >>>>>>>>> >>>>>>>>> On 24.01.20 21:31, Reuven Lax wrote: >>>>>>>>> > The new timer family is in the portability protos. I think >>>>>>>>> TimerReceiver >>>>>>>>> > needs to be updated to set it though (I think a 1-line change). >>>>>>>>> > >>>>>>>>> > The TimerInternals class that runners implement today already >>>>>>>>> handles >>>>>>>>> > dynamic timers, so most of the work was in the Beam SDK to >>>>>>>>> provide an >>>>>>>>> > API that allows users to access this feature. >>>>>>>>> > >>>>>>>>> > The main work needed in the runner was to take in account the >>>>>>>>> timer >>>>>>>>> > family. Beam semantics say that if a timer is set twice with the >>>>>>>>> same >>>>>>>>> > id, then the second timer overwrites the first. Several runners >>>>>>>>> > therefore had maps from timer id -> timer. However since the >>>>>>>>> > timer family scopes the timers, we now allow two timers with the >>>>>>>>> same id >>>>>>>>> > as long as the timer families are different. Runners had to be >>>>>>>>> updated >>>>>>>>> > to include the timer family id in the map keys. >>>>>>>>> > >>>>>>>>> > Surprisingly, the new TimerMap tests seem to pass on Spark >>>>>>>>> > ValidatesRunner, even though the Spark runner wasn't updated! I >>>>>>>>> wonder >>>>>>>>> > if this means that the Spark runner was incorrectly implementing >>>>>>>>> the >>>>>>>>> > Beam semantics before, and setTimer was not overwriting timers >>>>>>>>> with the >>>>>>>>> > same id? >>>>>>>>> > >>>>>>>>> > Reuven >>>>>>>>> > >>>>>>>>> > On Fri, Jan 24, 2020 at 7:31 AM Ismaël Mejía <[email protected] >>>>>>>>> > <mailto:[email protected]>> wrote: >>>>>>>>> > >>>>>>>>> > This looks great, thanks for the contribution Rehman! >>>>>>>>> > >>>>>>>>> > I have some questions (note I have not looked at the code at >>>>>>>>> all). >>>>>>>>> > >>>>>>>>> > - Is this working for both portable and non portable runners? >>>>>>>>> > - What do other runners need to implement to support this >>>>>>>>> (e.g. Spark)? >>>>>>>>> > >>>>>>>>> > Maybe worth to add this to the website Compatibility Matrix. >>>>>>>>> > >>>>>>>>> > Regards, >>>>>>>>> > Ismaël >>>>>>>>> > >>>>>>>>> > >>>>>>>>> > On Fri, Jan 24, 2020 at 8:42 AM Rehman Murad Ali >>>>>>>>> > <[email protected] >>>>>>>>> > <mailto:[email protected]>> wrote: >>>>>>>>> > >>>>>>>>> > Thank you Reuven for the guidance throughout the >>>>>>>>> development >>>>>>>>> > process. I am delighted to contribute my two cents to >>>>>>>>> the Beam >>>>>>>>> > project. >>>>>>>>> > >>>>>>>>> > Looking forward to more active contributions. >>>>>>>>> > >>>>>>>>> > * >>>>>>>>> > * >>>>>>>>> > >>>>>>>>> > *Thanks & Regards____* >>>>>>>>> > >>>>>>>>> > >>>>>>>>> > >>>>>>>>> > *Rehman Murad Ali* >>>>>>>>> > Software Engineer >>>>>>>>> > Mobile: +92 3452076766 <+92%20345%202076766> >>>>>>>>> <tel:+92%20345%202076766> >>>>>>>>> > Skype: rehman.muradali >>>>>>>>> > >>>>>>>>> > >>>>>>>>> > >>>>>>>>> > On Thu, Jan 23, 2020 at 11:09 PM Reuven Lax < >>>>>>>>> [email protected] >>>>>>>>> > <mailto:[email protected]>> wrote: >>>>>>>>> > >>>>>>>>> > Thanks to a lot of hard work by Rehman, Beam now >>>>>>>>> supports >>>>>>>>> > dynamic timers. As a reminder, this was discussed on >>>>>>>>> the dev >>>>>>>>> > list some time back. >>>>>>>>> > >>>>>>>>> > As background, previously one had to statically >>>>>>>>> declare all >>>>>>>>> > timers in your code. So if you wanted to have two >>>>>>>>> timers, >>>>>>>>> > you needed to create two timer variables and two >>>>>>>>> callbacks - >>>>>>>>> > one for each timer. A number of users kept hitting >>>>>>>>> stumbling >>>>>>>>> > blocks where they needed a dynamic set of timers >>>>>>>>> (often >>>>>>>>> > based on the element), which was not supported in >>>>>>>>> Beam. The >>>>>>>>> > workarounds were quite ugly and complicated. >>>>>>>>> > >>>>>>>>> > The new support allows declaring a TimerMap, which >>>>>>>>> is a map >>>>>>>>> > of timers. Each TimerMap is scoped by a family name, >>>>>>>>> so you >>>>>>>>> > can create multiple TimerMaps each with its own >>>>>>>>> callback. >>>>>>>>> > The use looks as follows: >>>>>>>>> > >>>>>>>>> > class MyDoFn extends DoFn<...> { >>>>>>>>> > @TimerFamily("timers") >>>>>>>>> > private final TimerSpec timerMap = >>>>>>>>> > TimerSpecs.timerMap(TimeDomain.EVENT_TIME); >>>>>>>>> > >>>>>>>>> > @ProcessElement >>>>>>>>> > public void process(@TimerFamily("timers") >>>>>>>>> TimerMap >>>>>>>>> > timers, @Element Type e) { >>>>>>>>> > timers.set("mainTimer", timestamp); >>>>>>>>> > timers.set("actionType" + e.getActionType(), >>>>>>>>> timestamp); >>>>>>>>> > } >>>>>>>>> > >>>>>>>>> > @OnTimerFamily . >>>>>>>>> > public void onTimer(@TimerId String timerId) { >>>>>>>>> > System.out.println("Timer fired. id: " + >>>>>>>>> timerId); >>>>>>>>> > } >>>>>>>>> > } >>>>>>>>> > >>>>>>>>> > This currently works for the Flink and the Dataflow >>>>>>>>> runners. >>>>>>>>> > >>>>>>>>> > Thank you Rehman for getting this done! Beam users >>>>>>>>> will find >>>>>>>>> > it very valuable. >>>>>>>>> > >>>>>>>>> > Reuven >>>>>>>>> > >>>>>>>>> >>>>>>>>
