This is running as part of the validatesRunnerBatch test, but it is
executing a streaming test. Maybe that's why it's failing?

On Fri, Feb 14, 2020 at 9:42 AM Reuven Lax <re...@google.com> wrote:

> Ismael,
>
> As part of that fix I added some new tests to make sure to run these tests
> on both batch and streaming runners, as I realized that the test was
> running only on batch runners before.  I did this by explicitly setting the
> isBounded attribute on the output of the Create transform. Somehow these
> new tests I added are making the Flink runner unhappy.
>
> I'm not sure why explicitly setting the PCollection to be unbounded is
> breaking on the Flink runner. We can try and exclude the flink runner from
> these tests for now, but maybe Max has an idea.
>
> Also Ismael, what makes this a batch mode test? When I look at the failing
> stack trace, the failure is in FlinkStreamingPipelineTranslator.
>
> Reuven
>
> On Fri, Feb 14, 2020 at 8:52 AM Ismaël Mejía <ieme...@gmail.com> wrote:
>
>> Apparently the fix of Dynamic Timers for Dataflow broke the
>> ValidatesRunner tests for Flink in batch mode that were passing before.
>> Can you please take a look Reuven or Rehman.
>> Tests are failing since the exact commit for the fix:
>> 7719708a04d5d0eff3048dbd58ac1337889f8ba5
>> For details on the exception:
>>
>> https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/6608/
>>
>>
>> On Wed, Feb 12, 2020 at 10:37 AM Ismaël Mejía <ieme...@gmail.com> wrote:
>>
>>> Great to know you get it working on Dataflow easily Reuven. As a new
>>> feature it
>>> looks great!
>>>
>>> Agree with Kenn maybe worth to open a new thread to discuss the changes
>>> still
>>> needed to support this in portable runners.
>>>
>>> On Mon, Feb 10, 2020 at 8:25 PM Kenneth Knowles <k...@apache.org> wrote:
>>>
>>>> I think the (lack of) portability bit may have been buried in this
>>>> thread. Maybe a new thread about the design for that?
>>>>
>>>> Kenn
>>>>
>>>> On Sun, Feb 9, 2020 at 11:36 AM Reuven Lax <re...@google.com> wrote:
>>>>
>>>>> FYI, this is now fixed for Dataflow. I also added better rejection so
>>>>> that runners that don't support this feature will reject the pipeline.
>>>>>
>>>>> On Sat, Feb 8, 2020 at 12:10 AM Reuven Lax <re...@google.com> wrote:
>>>>>
>>>>>> I took a look, and I think this was a simple bug. Testing a fix now.
>>>>>>
>>>>>> A larger question is how to support this in the portability layer.
>>>>>> Right now portability assumes that each timer id corresponds to a logical
>>>>>> input PCollection, but that assumption no longer works as we now support 
>>>>>> a
>>>>>> dynamic set of timers, each with their own id. We could instead model 
>>>>>> each
>>>>>> timer family as a PColleciton, but the FnApiRunner would need to
>>>>>> dynamically get the timer id in order to invoke it, and today it 
>>>>>> statically
>>>>>> reads the timer id from the PCollection name.
>>>>>>
>>>>>> Reuven
>>>>>>
>>>>>> On Fri, Feb 7, 2020 at 2:22 PM Reuven Lax <re...@google.com> wrote:
>>>>>>
>>>>>>> Thanks for finding this. Hopefully the bug is easy .to fix. The
>>>>>>> tests indeed never ran on any runner except for the DirectRunner, which 
>>>>>>> is
>>>>>>> something I should've noticed in the code review.
>>>>>>>
>>>>>>> Reuven
>>>>>>>
>>>>>>> On Mon, Feb 3, 2020 at 12:50 AM Ismaël Mejía <ieme...@gmail.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> I had a discussion with Rehman last week and we discovered that the
>>>>>>>> TimersMap
>>>>>>>> related tests were not running for all runners because they were
>>>>>>>> not tagged as
>>>>>>>> part of the ValidatesRunner category. I opened a PR [1] to enable
>>>>>>>> this, so
>>>>>>>> please someone help me with the review/merge.
>>>>>>>>
>>>>>>>> I took a look just for curiosity and discovered that they are only
>>>>>>>> passing for
>>>>>>>> Direct runner and for the classic Flink runner in batch mode. They
>>>>>>>> are not
>>>>>>>> passing for Dataflow [2][3] and for the Portable Flink runner, so
>>>>>>>> probably worth
>>>>>>>> to reopen the issue to investigate/fix.
>>>>>>>>
>>>>>>>> [1] https://github.com/apache/beam/pull/10747
>>>>>>>> [2]
>>>>>>>> https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_PR/210/
>>>>>>>> [3]
>>>>>>>> https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_PortabilityApi_Dataflow_PR/76/
>>>>>>>>
>>>>>>>>
>>>>>>>> On Sat, Jan 25, 2020 at 1:26 AM Reuven Lax <re...@google.com>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> Yes. For now we exclude the flink runner, but fixing this should
>>>>>>>>> be fairly trivial.
>>>>>>>>>
>>>>>>>>> On Fri, Jan 24, 2020 at 3:35 PM Maximilian Michels <m...@apache.org>
>>>>>>>>> wrote:
>>>>>>>>>
>>>>>>>>>> The Flink Runner was allowing to set a timer multiple times
>>>>>>>>>> before we
>>>>>>>>>> made it comply with the Beam semantics of overwriting past
>>>>>>>>>> invocations.
>>>>>>>>>> I wouldn't be surprised if the Spark Runner never addressed this.
>>>>>>>>>> Flink
>>>>>>>>>> and Spark itself allow for a timer to be set to multiple times.
>>>>>>>>>> In order
>>>>>>>>>> to fix this for Beam, the Flink Runner has to maintain a
>>>>>>>>>> checkpointed
>>>>>>>>>> map which sits outside of its builtin TimerService.
>>>>>>>>>>
>>>>>>>>>> As far as I can see, multiple timer families are currently not
>>>>>>>>>> supported
>>>>>>>>>> in the Flink Runner due to the map not taking the family name
>>>>>>>>>> into
>>>>>>>>>> account. This can be easily fixed though.
>>>>>>>>>>
>>>>>>>>>> -Max
>>>>>>>>>>
>>>>>>>>>> On 24.01.20 21:31, Reuven Lax wrote:
>>>>>>>>>> > The new timer family is in the portability protos. I think
>>>>>>>>>> TimerReceiver
>>>>>>>>>> > needs to be updated to set it though (I think a 1-line change).
>>>>>>>>>> >
>>>>>>>>>> > The TimerInternals class that runners implement today already
>>>>>>>>>> handles
>>>>>>>>>> > dynamic timers, so most of the work was in the Beam SDK  to
>>>>>>>>>> provide an
>>>>>>>>>> > API that allows users to access this feature.
>>>>>>>>>> >
>>>>>>>>>> > The main work needed in the runner was to take in account the
>>>>>>>>>> timer
>>>>>>>>>> > family. Beam semantics say that if a timer is set twice with
>>>>>>>>>> the same
>>>>>>>>>> > id, then the second timer overwrites the first.  Several
>>>>>>>>>> runners
>>>>>>>>>> > therefore had maps from timer id -> timer. However since the
>>>>>>>>>> > timer family scopes the timers, we now allow two timers with
>>>>>>>>>> the same id
>>>>>>>>>> > as long as the timer families are different. Runners had to be
>>>>>>>>>> updated
>>>>>>>>>> > to include the timer family id in the map keys.
>>>>>>>>>> >
>>>>>>>>>> > Surprisingly, the new TimerMap tests seem to pass on Spark
>>>>>>>>>> > ValidatesRunner, even though the Spark runner wasn't updated! I
>>>>>>>>>> wonder
>>>>>>>>>> > if this means that the Spark runner was incorrectly
>>>>>>>>>> implementing the
>>>>>>>>>> > Beam semantics before, and setTimer was not overwriting timers
>>>>>>>>>> with the
>>>>>>>>>> > same id?
>>>>>>>>>> >
>>>>>>>>>> > Reuven
>>>>>>>>>> >
>>>>>>>>>> > On Fri, Jan 24, 2020 at 7:31 AM Ismaël Mejía <ieme...@gmail.com
>>>>>>>>>> > <mailto:ieme...@gmail.com>> wrote:
>>>>>>>>>> >
>>>>>>>>>> >     This looks great, thanks for the contribution Rehman!
>>>>>>>>>> >
>>>>>>>>>> >     I have some questions (note I have not looked at the code
>>>>>>>>>> at all).
>>>>>>>>>> >
>>>>>>>>>> >     - Is this working for both portable and non portable
>>>>>>>>>> runners?
>>>>>>>>>> >     - What do other runners need to implement to support this
>>>>>>>>>> (e.g. Spark)?
>>>>>>>>>> >
>>>>>>>>>> >     Maybe worth to add this to the website Compatibility Matrix.
>>>>>>>>>> >
>>>>>>>>>> >     Regards,
>>>>>>>>>> >     Ismaël
>>>>>>>>>> >
>>>>>>>>>> >
>>>>>>>>>> >     On Fri, Jan 24, 2020 at 8:42 AM Rehman Murad Ali
>>>>>>>>>> >     <rehman.murad...@venturedive.com
>>>>>>>>>> >     <mailto:rehman.murad...@venturedive.com>> wrote:
>>>>>>>>>> >
>>>>>>>>>> >         Thank you Reuven for the guidance throughout the
>>>>>>>>>> development
>>>>>>>>>> >         process. I am delighted to contribute my two cents to
>>>>>>>>>> the Beam
>>>>>>>>>> >         project.
>>>>>>>>>> >
>>>>>>>>>> >         Looking forward to more active contributions.
>>>>>>>>>> >
>>>>>>>>>> >         *
>>>>>>>>>> >         *
>>>>>>>>>> >
>>>>>>>>>> >         *Thanks & Regards____*
>>>>>>>>>> >
>>>>>>>>>> >
>>>>>>>>>> >
>>>>>>>>>> >         *Rehman Murad Ali*
>>>>>>>>>> >         Software Engineer
>>>>>>>>>> >         Mobile: +92 3452076766 <+92%20345%202076766>
>>>>>>>>>> <tel:+92%20345%202076766>
>>>>>>>>>> >         Skype: rehman.muradali
>>>>>>>>>> >
>>>>>>>>>> >
>>>>>>>>>> >
>>>>>>>>>> >         On Thu, Jan 23, 2020 at 11:09 PM Reuven Lax <
>>>>>>>>>> re...@google.com
>>>>>>>>>> >         <mailto:re...@google.com>> wrote:
>>>>>>>>>> >
>>>>>>>>>> >             Thanks to a lot of hard work by Rehman, Beam now
>>>>>>>>>> supports
>>>>>>>>>> >             dynamic timers. As a reminder, this was discussed
>>>>>>>>>> on the dev
>>>>>>>>>> >             list some time back.
>>>>>>>>>> >
>>>>>>>>>> >             As background, previously one had to statically
>>>>>>>>>> declare all
>>>>>>>>>> >             timers in your code. So if you wanted to have two
>>>>>>>>>> timers,
>>>>>>>>>> >             you needed to create two timer variables and two
>>>>>>>>>> callbacks -
>>>>>>>>>> >             one for each timer. A number of users kept hitting
>>>>>>>>>> stumbling
>>>>>>>>>> >             blocks where they needed a dynamic set of timers
>>>>>>>>>> (often
>>>>>>>>>> >             based on the element), which was not supported in
>>>>>>>>>> Beam. The
>>>>>>>>>> >             workarounds were quite ugly and complicated.
>>>>>>>>>> >
>>>>>>>>>> >             The new support allows declaring a TimerMap, which
>>>>>>>>>> is a map
>>>>>>>>>> >             of timers. Each TimerMap is scoped by a family
>>>>>>>>>> name, so you
>>>>>>>>>> >             can create multiple TimerMaps each with its own
>>>>>>>>>> callback.
>>>>>>>>>> >             The use looks as follows:
>>>>>>>>>> >
>>>>>>>>>> >             class MyDoFn extends DoFn<...> {
>>>>>>>>>> >                 @TimerFamily("timers")
>>>>>>>>>> >                 private final TimerSpec timerMap =
>>>>>>>>>> >             TimerSpecs.timerMap(TimeDomain.EVENT_TIME);
>>>>>>>>>> >
>>>>>>>>>> >                 @ProcessElement
>>>>>>>>>> >                  public void process(@TimerFamily("timers")
>>>>>>>>>> TimerMap
>>>>>>>>>> >             timers, @Element Type e) {
>>>>>>>>>> >                     timers.set("mainTimer", timestamp);
>>>>>>>>>> >                     timers.set("actionType" +
>>>>>>>>>> e.getActionType(), timestamp);
>>>>>>>>>> >                 }
>>>>>>>>>> >
>>>>>>>>>> >                @OnTimerFamily .
>>>>>>>>>> >                public void onTimer(@TimerId String timerId) {
>>>>>>>>>> >                   System.out.println("Timer fired. id: " +
>>>>>>>>>> timerId);
>>>>>>>>>> >                }
>>>>>>>>>> >             }
>>>>>>>>>> >
>>>>>>>>>> >             This currently works for the Flink and the Dataflow
>>>>>>>>>> runners.
>>>>>>>>>> >
>>>>>>>>>> >             Thank you Rehman for getting this done! Beam users
>>>>>>>>>> will find
>>>>>>>>>> >             it very valuable.
>>>>>>>>>> >
>>>>>>>>>> >             Reuven
>>>>>>>>>> >
>>>>>>>>>>
>>>>>>>>>

Reply via email to