Ismael,

As part of that fix I added some new tests to make sure to run these tests
on both batch and streaming runners, as I realized that the test was
running only on batch runners before.  I did this by explicitly setting the
isBounded attribute on the output of the Create transform. Somehow these
new tests I added are making the Flink runner unhappy.

I'm not sure why explicitly setting the PCollection to be unbounded is
breaking on the Flink runner. We can try and exclude the flink runner from
these tests for now, but maybe Max has an idea.

Also Ismael, what makes this a batch mode test? When I look at the failing
stack trace, the failure is in FlinkStreamingPipelineTranslator.

Reuven

On Fri, Feb 14, 2020 at 8:52 AM Ismaël Mejía <[email protected]> wrote:

> Apparently the fix of Dynamic Timers for Dataflow broke the
> ValidatesRunner tests for Flink in batch mode that were passing before.
> Can you please take a look Reuven or Rehman.
> Tests are failing since the exact commit for the fix:
> 7719708a04d5d0eff3048dbd58ac1337889f8ba5
> For details on the exception:
>
> https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/6608/
>
>
> On Wed, Feb 12, 2020 at 10:37 AM Ismaël Mejía <[email protected]> wrote:
>
>> Great to know you get it working on Dataflow easily Reuven. As a new
>> feature it
>> looks great!
>>
>> Agree with Kenn maybe worth to open a new thread to discuss the changes
>> still
>> needed to support this in portable runners.
>>
>> On Mon, Feb 10, 2020 at 8:25 PM Kenneth Knowles <[email protected]> wrote:
>>
>>> I think the (lack of) portability bit may have been buried in this
>>> thread. Maybe a new thread about the design for that?
>>>
>>> Kenn
>>>
>>> On Sun, Feb 9, 2020 at 11:36 AM Reuven Lax <[email protected]> wrote:
>>>
>>>> FYI, this is now fixed for Dataflow. I also added better rejection so
>>>> that runners that don't support this feature will reject the pipeline.
>>>>
>>>> On Sat, Feb 8, 2020 at 12:10 AM Reuven Lax <[email protected]> wrote:
>>>>
>>>>> I took a look, and I think this was a simple bug. Testing a fix now.
>>>>>
>>>>> A larger question is how to support this in the portability layer.
>>>>> Right now portability assumes that each timer id corresponds to a logical
>>>>> input PCollection, but that assumption no longer works as we now support a
>>>>> dynamic set of timers, each with their own id. We could instead model each
>>>>> timer family as a PColleciton, but the FnApiRunner would need to
>>>>> dynamically get the timer id in order to invoke it, and today it 
>>>>> statically
>>>>> reads the timer id from the PCollection name.
>>>>>
>>>>> Reuven
>>>>>
>>>>> On Fri, Feb 7, 2020 at 2:22 PM Reuven Lax <[email protected]> wrote:
>>>>>
>>>>>> Thanks for finding this. Hopefully the bug is easy .to fix. The tests
>>>>>> indeed never ran on any runner except for the DirectRunner, which is
>>>>>> something I should've noticed in the code review.
>>>>>>
>>>>>> Reuven
>>>>>>
>>>>>> On Mon, Feb 3, 2020 at 12:50 AM Ismaël Mejía <[email protected]>
>>>>>> wrote:
>>>>>>
>>>>>>> I had a discussion with Rehman last week and we discovered that the
>>>>>>> TimersMap
>>>>>>> related tests were not running for all runners because they were not
>>>>>>> tagged as
>>>>>>> part of the ValidatesRunner category. I opened a PR [1] to enable
>>>>>>> this, so
>>>>>>> please someone help me with the review/merge.
>>>>>>>
>>>>>>> I took a look just for curiosity and discovered that they are only
>>>>>>> passing for
>>>>>>> Direct runner and for the classic Flink runner in batch mode. They
>>>>>>> are not
>>>>>>> passing for Dataflow [2][3] and for the Portable Flink runner, so
>>>>>>> probably worth
>>>>>>> to reopen the issue to investigate/fix.
>>>>>>>
>>>>>>> [1] https://github.com/apache/beam/pull/10747
>>>>>>> [2]
>>>>>>> https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_PR/210/
>>>>>>> [3]
>>>>>>> https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_PortabilityApi_Dataflow_PR/76/
>>>>>>>
>>>>>>>
>>>>>>> On Sat, Jan 25, 2020 at 1:26 AM Reuven Lax <[email protected]> wrote:
>>>>>>>
>>>>>>>> Yes. For now we exclude the flink runner, but fixing this should be
>>>>>>>> fairly trivial.
>>>>>>>>
>>>>>>>> On Fri, Jan 24, 2020 at 3:35 PM Maximilian Michels <[email protected]>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> The Flink Runner was allowing to set a timer multiple times before
>>>>>>>>> we
>>>>>>>>> made it comply with the Beam semantics of overwriting past
>>>>>>>>> invocations.
>>>>>>>>> I wouldn't be surprised if the Spark Runner never addressed this.
>>>>>>>>> Flink
>>>>>>>>> and Spark itself allow for a timer to be set to multiple times. In
>>>>>>>>> order
>>>>>>>>> to fix this for Beam, the Flink Runner has to maintain a
>>>>>>>>> checkpointed
>>>>>>>>> map which sits outside of its builtin TimerService.
>>>>>>>>>
>>>>>>>>> As far as I can see, multiple timer families are currently not
>>>>>>>>> supported
>>>>>>>>> in the Flink Runner due to the map not taking the family name into
>>>>>>>>> account. This can be easily fixed though.
>>>>>>>>>
>>>>>>>>> -Max
>>>>>>>>>
>>>>>>>>> On 24.01.20 21:31, Reuven Lax wrote:
>>>>>>>>> > The new timer family is in the portability protos. I think
>>>>>>>>> TimerReceiver
>>>>>>>>> > needs to be updated to set it though (I think a 1-line change).
>>>>>>>>> >
>>>>>>>>> > The TimerInternals class that runners implement today already
>>>>>>>>> handles
>>>>>>>>> > dynamic timers, so most of the work was in the Beam SDK  to
>>>>>>>>> provide an
>>>>>>>>> > API that allows users to access this feature.
>>>>>>>>> >
>>>>>>>>> > The main work needed in the runner was to take in account the
>>>>>>>>> timer
>>>>>>>>> > family. Beam semantics say that if a timer is set twice with the
>>>>>>>>> same
>>>>>>>>> > id, then the second timer overwrites the first.  Several runners
>>>>>>>>> > therefore had maps from timer id -> timer. However since the
>>>>>>>>> > timer family scopes the timers, we now allow two timers with the
>>>>>>>>> same id
>>>>>>>>> > as long as the timer families are different. Runners had to be
>>>>>>>>> updated
>>>>>>>>> > to include the timer family id in the map keys.
>>>>>>>>> >
>>>>>>>>> > Surprisingly, the new TimerMap tests seem to pass on Spark
>>>>>>>>> > ValidatesRunner, even though the Spark runner wasn't updated! I
>>>>>>>>> wonder
>>>>>>>>> > if this means that the Spark runner was incorrectly implementing
>>>>>>>>> the
>>>>>>>>> > Beam semantics before, and setTimer was not overwriting timers
>>>>>>>>> with the
>>>>>>>>> > same id?
>>>>>>>>> >
>>>>>>>>> > Reuven
>>>>>>>>> >
>>>>>>>>> > On Fri, Jan 24, 2020 at 7:31 AM Ismaël Mejía <[email protected]
>>>>>>>>> > <mailto:[email protected]>> wrote:
>>>>>>>>> >
>>>>>>>>> >     This looks great, thanks for the contribution Rehman!
>>>>>>>>> >
>>>>>>>>> >     I have some questions (note I have not looked at the code at
>>>>>>>>> all).
>>>>>>>>> >
>>>>>>>>> >     - Is this working for both portable and non portable runners?
>>>>>>>>> >     - What do other runners need to implement to support this
>>>>>>>>> (e.g. Spark)?
>>>>>>>>> >
>>>>>>>>> >     Maybe worth to add this to the website Compatibility Matrix.
>>>>>>>>> >
>>>>>>>>> >     Regards,
>>>>>>>>> >     Ismaël
>>>>>>>>> >
>>>>>>>>> >
>>>>>>>>> >     On Fri, Jan 24, 2020 at 8:42 AM Rehman Murad Ali
>>>>>>>>> >     <[email protected]
>>>>>>>>> >     <mailto:[email protected]>> wrote:
>>>>>>>>> >
>>>>>>>>> >         Thank you Reuven for the guidance throughout the
>>>>>>>>> development
>>>>>>>>> >         process. I am delighted to contribute my two cents to
>>>>>>>>> the Beam
>>>>>>>>> >         project.
>>>>>>>>> >
>>>>>>>>> >         Looking forward to more active contributions.
>>>>>>>>> >
>>>>>>>>> >         *
>>>>>>>>> >         *
>>>>>>>>> >
>>>>>>>>> >         *Thanks & Regards____*
>>>>>>>>> >
>>>>>>>>> >
>>>>>>>>> >
>>>>>>>>> >         *Rehman Murad Ali*
>>>>>>>>> >         Software Engineer
>>>>>>>>> >         Mobile: +92 3452076766 <+92%20345%202076766>
>>>>>>>>> <tel:+92%20345%202076766>
>>>>>>>>> >         Skype: rehman.muradali
>>>>>>>>> >
>>>>>>>>> >
>>>>>>>>> >
>>>>>>>>> >         On Thu, Jan 23, 2020 at 11:09 PM Reuven Lax <
>>>>>>>>> [email protected]
>>>>>>>>> >         <mailto:[email protected]>> wrote:
>>>>>>>>> >
>>>>>>>>> >             Thanks to a lot of hard work by Rehman, Beam now
>>>>>>>>> supports
>>>>>>>>> >             dynamic timers. As a reminder, this was discussed on
>>>>>>>>> the dev
>>>>>>>>> >             list some time back.
>>>>>>>>> >
>>>>>>>>> >             As background, previously one had to statically
>>>>>>>>> declare all
>>>>>>>>> >             timers in your code. So if you wanted to have two
>>>>>>>>> timers,
>>>>>>>>> >             you needed to create two timer variables and two
>>>>>>>>> callbacks -
>>>>>>>>> >             one for each timer. A number of users kept hitting
>>>>>>>>> stumbling
>>>>>>>>> >             blocks where they needed a dynamic set of timers
>>>>>>>>> (often
>>>>>>>>> >             based on the element), which was not supported in
>>>>>>>>> Beam. The
>>>>>>>>> >             workarounds were quite ugly and complicated.
>>>>>>>>> >
>>>>>>>>> >             The new support allows declaring a TimerMap, which
>>>>>>>>> is a map
>>>>>>>>> >             of timers. Each TimerMap is scoped by a family name,
>>>>>>>>> so you
>>>>>>>>> >             can create multiple TimerMaps each with its own
>>>>>>>>> callback.
>>>>>>>>> >             The use looks as follows:
>>>>>>>>> >
>>>>>>>>> >             class MyDoFn extends DoFn<...> {
>>>>>>>>> >                 @TimerFamily("timers")
>>>>>>>>> >                 private final TimerSpec timerMap =
>>>>>>>>> >             TimerSpecs.timerMap(TimeDomain.EVENT_TIME);
>>>>>>>>> >
>>>>>>>>> >                 @ProcessElement
>>>>>>>>> >                  public void process(@TimerFamily("timers")
>>>>>>>>> TimerMap
>>>>>>>>> >             timers, @Element Type e) {
>>>>>>>>> >                     timers.set("mainTimer", timestamp);
>>>>>>>>> >                     timers.set("actionType" + e.getActionType(),
>>>>>>>>> timestamp);
>>>>>>>>> >                 }
>>>>>>>>> >
>>>>>>>>> >                @OnTimerFamily .
>>>>>>>>> >                public void onTimer(@TimerId String timerId) {
>>>>>>>>> >                   System.out.println("Timer fired. id: " +
>>>>>>>>> timerId);
>>>>>>>>> >                }
>>>>>>>>> >             }
>>>>>>>>> >
>>>>>>>>> >             This currently works for the Flink and the Dataflow
>>>>>>>>> runners.
>>>>>>>>> >
>>>>>>>>> >             Thank you Rehman for getting this done! Beam users
>>>>>>>>> will find
>>>>>>>>> >             it very valuable.
>>>>>>>>> >
>>>>>>>>> >             Reuven
>>>>>>>>> >
>>>>>>>>>
>>>>>>>>

Reply via email to