This is running as part of the validatesRunnerBatch test, but it is executing a streaming test. Maybe that's why it's failing?
On Fri, Feb 14, 2020 at 9:42 AM Reuven Lax <[email protected]> wrote: > Ismael, > > As part of that fix I added some new tests to make sure to run these tests > on both batch and streaming runners, as I realized that the test was > running only on batch runners before. I did this by explicitly setting the > isBounded attribute on the output of the Create transform. Somehow these > new tests I added are making the Flink runner unhappy. > > I'm not sure why explicitly setting the PCollection to be unbounded is > breaking on the Flink runner. We can try and exclude the flink runner from > these tests for now, but maybe Max has an idea. > > Also Ismael, what makes this a batch mode test? When I look at the failing > stack trace, the failure is in FlinkStreamingPipelineTranslator. > > Reuven > > On Fri, Feb 14, 2020 at 8:52 AM Ismaël Mejía <[email protected]> wrote: > >> Apparently the fix of Dynamic Timers for Dataflow broke the >> ValidatesRunner tests for Flink in batch mode that were passing before. >> Can you please take a look Reuven or Rehman. >> Tests are failing since the exact commit for the fix: >> 7719708a04d5d0eff3048dbd58ac1337889f8ba5 >> For details on the exception: >> >> https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/6608/ >> >> >> On Wed, Feb 12, 2020 at 10:37 AM Ismaël Mejía <[email protected]> wrote: >> >>> Great to know you get it working on Dataflow easily Reuven. As a new >>> feature it >>> looks great! >>> >>> Agree with Kenn maybe worth to open a new thread to discuss the changes >>> still >>> needed to support this in portable runners. >>> >>> On Mon, Feb 10, 2020 at 8:25 PM Kenneth Knowles <[email protected]> wrote: >>> >>>> I think the (lack of) portability bit may have been buried in this >>>> thread. Maybe a new thread about the design for that? >>>> >>>> Kenn >>>> >>>> On Sun, Feb 9, 2020 at 11:36 AM Reuven Lax <[email protected]> wrote: >>>> >>>>> FYI, this is now fixed for Dataflow. I also added better rejection so >>>>> that runners that don't support this feature will reject the pipeline. >>>>> >>>>> On Sat, Feb 8, 2020 at 12:10 AM Reuven Lax <[email protected]> wrote: >>>>> >>>>>> I took a look, and I think this was a simple bug. Testing a fix now. >>>>>> >>>>>> A larger question is how to support this in the portability layer. >>>>>> Right now portability assumes that each timer id corresponds to a logical >>>>>> input PCollection, but that assumption no longer works as we now support >>>>>> a >>>>>> dynamic set of timers, each with their own id. We could instead model >>>>>> each >>>>>> timer family as a PColleciton, but the FnApiRunner would need to >>>>>> dynamically get the timer id in order to invoke it, and today it >>>>>> statically >>>>>> reads the timer id from the PCollection name. >>>>>> >>>>>> Reuven >>>>>> >>>>>> On Fri, Feb 7, 2020 at 2:22 PM Reuven Lax <[email protected]> wrote: >>>>>> >>>>>>> Thanks for finding this. Hopefully the bug is easy .to fix. The >>>>>>> tests indeed never ran on any runner except for the DirectRunner, which >>>>>>> is >>>>>>> something I should've noticed in the code review. >>>>>>> >>>>>>> Reuven >>>>>>> >>>>>>> On Mon, Feb 3, 2020 at 12:50 AM Ismaël Mejía <[email protected]> >>>>>>> wrote: >>>>>>> >>>>>>>> I had a discussion with Rehman last week and we discovered that the >>>>>>>> TimersMap >>>>>>>> related tests were not running for all runners because they were >>>>>>>> not tagged as >>>>>>>> part of the ValidatesRunner category. I opened a PR [1] to enable >>>>>>>> this, so >>>>>>>> please someone help me with the review/merge. >>>>>>>> >>>>>>>> I took a look just for curiosity and discovered that they are only >>>>>>>> passing for >>>>>>>> Direct runner and for the classic Flink runner in batch mode. They >>>>>>>> are not >>>>>>>> passing for Dataflow [2][3] and for the Portable Flink runner, so >>>>>>>> probably worth >>>>>>>> to reopen the issue to investigate/fix. >>>>>>>> >>>>>>>> [1] https://github.com/apache/beam/pull/10747 >>>>>>>> [2] >>>>>>>> https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_PR/210/ >>>>>>>> [3] >>>>>>>> https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_PortabilityApi_Dataflow_PR/76/ >>>>>>>> >>>>>>>> >>>>>>>> On Sat, Jan 25, 2020 at 1:26 AM Reuven Lax <[email protected]> >>>>>>>> wrote: >>>>>>>> >>>>>>>>> Yes. For now we exclude the flink runner, but fixing this should >>>>>>>>> be fairly trivial. >>>>>>>>> >>>>>>>>> On Fri, Jan 24, 2020 at 3:35 PM Maximilian Michels <[email protected]> >>>>>>>>> wrote: >>>>>>>>> >>>>>>>>>> The Flink Runner was allowing to set a timer multiple times >>>>>>>>>> before we >>>>>>>>>> made it comply with the Beam semantics of overwriting past >>>>>>>>>> invocations. >>>>>>>>>> I wouldn't be surprised if the Spark Runner never addressed this. >>>>>>>>>> Flink >>>>>>>>>> and Spark itself allow for a timer to be set to multiple times. >>>>>>>>>> In order >>>>>>>>>> to fix this for Beam, the Flink Runner has to maintain a >>>>>>>>>> checkpointed >>>>>>>>>> map which sits outside of its builtin TimerService. >>>>>>>>>> >>>>>>>>>> As far as I can see, multiple timer families are currently not >>>>>>>>>> supported >>>>>>>>>> in the Flink Runner due to the map not taking the family name >>>>>>>>>> into >>>>>>>>>> account. This can be easily fixed though. >>>>>>>>>> >>>>>>>>>> -Max >>>>>>>>>> >>>>>>>>>> On 24.01.20 21:31, Reuven Lax wrote: >>>>>>>>>> > The new timer family is in the portability protos. I think >>>>>>>>>> TimerReceiver >>>>>>>>>> > needs to be updated to set it though (I think a 1-line change). >>>>>>>>>> > >>>>>>>>>> > The TimerInternals class that runners implement today already >>>>>>>>>> handles >>>>>>>>>> > dynamic timers, so most of the work was in the Beam SDK to >>>>>>>>>> provide an >>>>>>>>>> > API that allows users to access this feature. >>>>>>>>>> > >>>>>>>>>> > The main work needed in the runner was to take in account the >>>>>>>>>> timer >>>>>>>>>> > family. Beam semantics say that if a timer is set twice with >>>>>>>>>> the same >>>>>>>>>> > id, then the second timer overwrites the first. Several >>>>>>>>>> runners >>>>>>>>>> > therefore had maps from timer id -> timer. However since the >>>>>>>>>> > timer family scopes the timers, we now allow two timers with >>>>>>>>>> the same id >>>>>>>>>> > as long as the timer families are different. Runners had to be >>>>>>>>>> updated >>>>>>>>>> > to include the timer family id in the map keys. >>>>>>>>>> > >>>>>>>>>> > Surprisingly, the new TimerMap tests seem to pass on Spark >>>>>>>>>> > ValidatesRunner, even though the Spark runner wasn't updated! I >>>>>>>>>> wonder >>>>>>>>>> > if this means that the Spark runner was incorrectly >>>>>>>>>> implementing the >>>>>>>>>> > Beam semantics before, and setTimer was not overwriting timers >>>>>>>>>> with the >>>>>>>>>> > same id? >>>>>>>>>> > >>>>>>>>>> > Reuven >>>>>>>>>> > >>>>>>>>>> > On Fri, Jan 24, 2020 at 7:31 AM Ismaël Mejía <[email protected] >>>>>>>>>> > <mailto:[email protected]>> wrote: >>>>>>>>>> > >>>>>>>>>> > This looks great, thanks for the contribution Rehman! >>>>>>>>>> > >>>>>>>>>> > I have some questions (note I have not looked at the code >>>>>>>>>> at all). >>>>>>>>>> > >>>>>>>>>> > - Is this working for both portable and non portable >>>>>>>>>> runners? >>>>>>>>>> > - What do other runners need to implement to support this >>>>>>>>>> (e.g. Spark)? >>>>>>>>>> > >>>>>>>>>> > Maybe worth to add this to the website Compatibility Matrix. >>>>>>>>>> > >>>>>>>>>> > Regards, >>>>>>>>>> > Ismaël >>>>>>>>>> > >>>>>>>>>> > >>>>>>>>>> > On Fri, Jan 24, 2020 at 8:42 AM Rehman Murad Ali >>>>>>>>>> > <[email protected] >>>>>>>>>> > <mailto:[email protected]>> wrote: >>>>>>>>>> > >>>>>>>>>> > Thank you Reuven for the guidance throughout the >>>>>>>>>> development >>>>>>>>>> > process. I am delighted to contribute my two cents to >>>>>>>>>> the Beam >>>>>>>>>> > project. >>>>>>>>>> > >>>>>>>>>> > Looking forward to more active contributions. >>>>>>>>>> > >>>>>>>>>> > * >>>>>>>>>> > * >>>>>>>>>> > >>>>>>>>>> > *Thanks & Regards____* >>>>>>>>>> > >>>>>>>>>> > >>>>>>>>>> > >>>>>>>>>> > *Rehman Murad Ali* >>>>>>>>>> > Software Engineer >>>>>>>>>> > Mobile: +92 3452076766 <+92%20345%202076766> >>>>>>>>>> <tel:+92%20345%202076766> >>>>>>>>>> > Skype: rehman.muradali >>>>>>>>>> > >>>>>>>>>> > >>>>>>>>>> > >>>>>>>>>> > On Thu, Jan 23, 2020 at 11:09 PM Reuven Lax < >>>>>>>>>> [email protected] >>>>>>>>>> > <mailto:[email protected]>> wrote: >>>>>>>>>> > >>>>>>>>>> > Thanks to a lot of hard work by Rehman, Beam now >>>>>>>>>> supports >>>>>>>>>> > dynamic timers. As a reminder, this was discussed >>>>>>>>>> on the dev >>>>>>>>>> > list some time back. >>>>>>>>>> > >>>>>>>>>> > As background, previously one had to statically >>>>>>>>>> declare all >>>>>>>>>> > timers in your code. So if you wanted to have two >>>>>>>>>> timers, >>>>>>>>>> > you needed to create two timer variables and two >>>>>>>>>> callbacks - >>>>>>>>>> > one for each timer. A number of users kept hitting >>>>>>>>>> stumbling >>>>>>>>>> > blocks where they needed a dynamic set of timers >>>>>>>>>> (often >>>>>>>>>> > based on the element), which was not supported in >>>>>>>>>> Beam. The >>>>>>>>>> > workarounds were quite ugly and complicated. >>>>>>>>>> > >>>>>>>>>> > The new support allows declaring a TimerMap, which >>>>>>>>>> is a map >>>>>>>>>> > of timers. Each TimerMap is scoped by a family >>>>>>>>>> name, so you >>>>>>>>>> > can create multiple TimerMaps each with its own >>>>>>>>>> callback. >>>>>>>>>> > The use looks as follows: >>>>>>>>>> > >>>>>>>>>> > class MyDoFn extends DoFn<...> { >>>>>>>>>> > @TimerFamily("timers") >>>>>>>>>> > private final TimerSpec timerMap = >>>>>>>>>> > TimerSpecs.timerMap(TimeDomain.EVENT_TIME); >>>>>>>>>> > >>>>>>>>>> > @ProcessElement >>>>>>>>>> > public void process(@TimerFamily("timers") >>>>>>>>>> TimerMap >>>>>>>>>> > timers, @Element Type e) { >>>>>>>>>> > timers.set("mainTimer", timestamp); >>>>>>>>>> > timers.set("actionType" + >>>>>>>>>> e.getActionType(), timestamp); >>>>>>>>>> > } >>>>>>>>>> > >>>>>>>>>> > @OnTimerFamily . >>>>>>>>>> > public void onTimer(@TimerId String timerId) { >>>>>>>>>> > System.out.println("Timer fired. id: " + >>>>>>>>>> timerId); >>>>>>>>>> > } >>>>>>>>>> > } >>>>>>>>>> > >>>>>>>>>> > This currently works for the Flink and the Dataflow >>>>>>>>>> runners. >>>>>>>>>> > >>>>>>>>>> > Thank you Rehman for getting this done! Beam users >>>>>>>>>> will find >>>>>>>>>> > it very valuable. >>>>>>>>>> > >>>>>>>>>> > Reuven >>>>>>>>>> > >>>>>>>>>> >>>>>>>>>
