Thanks for figuring this out! I didn't know about the UsesUnboundedPCollections
category.

On Fri, Feb 14, 2020 at 12:57 PM Ismaël Mejía <ieme...@gmail.com> wrote:

> Exact, since the new tests use Unbounded PCollections we have to add the
> UsesUnboundedPCollections category.
> Also the Flink runner was not excluding this category for the batch
> (bounded) tests.
> I opened this one to fix it, PTAL Reuven
> https://github.com/apache/beam/pull/10871
>
> On Fri, Feb 14, 2020 at 8:52 PM Reuven Lax <re...@google.com> wrote:
>
>> This is running as part of the validatesRunnerBatch test, but it is
>> executing a streaming test. Maybe that's why it's failing?
>>
>> On Fri, Feb 14, 2020 at 9:42 AM Reuven Lax <re...@google.com> wrote:
>>
>>> Ismael,
>>>
>>> As part of that fix I added some new tests to make sure to run these
>>> tests on both batch and streaming runners, as I realized that the test was
>>> running only on batch runners before.  I did this by explicitly setting the
>>> isBounded attribute on the output of the Create transform. Somehow these
>>> new tests I added are making the Flink runner unhappy.
>>>
>>> I'm not sure why explicitly setting the PCollection to be unbounded is
>>> breaking on the Flink runner. We can try and exclude the flink runner from
>>> these tests for now, but maybe Max has an idea.
>>>
>>> Also Ismael, what makes this a batch mode test? When I look at the
>>> failing stack trace, the failure is in FlinkStreamingPipelineTranslator.
>>>
>>> Reuven
>>>
>>> On Fri, Feb 14, 2020 at 8:52 AM Ismaël Mejía <ieme...@gmail.com> wrote:
>>>
>>>> Apparently the fix of Dynamic Timers for Dataflow broke the
>>>> ValidatesRunner tests for Flink in batch mode that were passing before.
>>>> Can you please take a look Reuven or Rehman.
>>>> Tests are failing since the exact commit for the fix:
>>>> 7719708a04d5d0eff3048dbd58ac1337889f8ba5
>>>> For details on the exception:
>>>>
>>>> https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/6608/
>>>>
>>>>
>>>> On Wed, Feb 12, 2020 at 10:37 AM Ismaël Mejía <ieme...@gmail.com>
>>>> wrote:
>>>>
>>>>> Great to know you get it working on Dataflow easily Reuven. As a new
>>>>> feature it
>>>>> looks great!
>>>>>
>>>>> Agree with Kenn maybe worth to open a new thread to discuss the
>>>>> changes still
>>>>> needed to support this in portable runners.
>>>>>
>>>>> On Mon, Feb 10, 2020 at 8:25 PM Kenneth Knowles <k...@apache.org>
>>>>> wrote:
>>>>>
>>>>>> I think the (lack of) portability bit may have been buried in this
>>>>>> thread. Maybe a new thread about the design for that?
>>>>>>
>>>>>> Kenn
>>>>>>
>>>>>> On Sun, Feb 9, 2020 at 11:36 AM Reuven Lax <re...@google.com> wrote:
>>>>>>
>>>>>>> FYI, this is now fixed for Dataflow. I also added better rejection
>>>>>>> so that runners that don't support this feature will reject the 
>>>>>>> pipeline.
>>>>>>>
>>>>>>> On Sat, Feb 8, 2020 at 12:10 AM Reuven Lax <re...@google.com> wrote:
>>>>>>>
>>>>>>>> I took a look, and I think this was a simple bug. Testing a fix now.
>>>>>>>>
>>>>>>>> A larger question is how to support this in the portability layer.
>>>>>>>> Right now portability assumes that each timer id corresponds to a 
>>>>>>>> logical
>>>>>>>> input PCollection, but that assumption no longer works as we now 
>>>>>>>> support a
>>>>>>>> dynamic set of timers, each with their own id. We could instead model 
>>>>>>>> each
>>>>>>>> timer family as a PColleciton, but the FnApiRunner would need to
>>>>>>>> dynamically get the timer id in order to invoke it, and today it 
>>>>>>>> statically
>>>>>>>> reads the timer id from the PCollection name.
>>>>>>>>
>>>>>>>> Reuven
>>>>>>>>
>>>>>>>> On Fri, Feb 7, 2020 at 2:22 PM Reuven Lax <re...@google.com> wrote:
>>>>>>>>
>>>>>>>>> Thanks for finding this. Hopefully the bug is easy .to fix. The
>>>>>>>>> tests indeed never ran on any runner except for the DirectRunner, 
>>>>>>>>> which is
>>>>>>>>> something I should've noticed in the code review.
>>>>>>>>>
>>>>>>>>> Reuven
>>>>>>>>>
>>>>>>>>> On Mon, Feb 3, 2020 at 12:50 AM Ismaël Mejía <ieme...@gmail.com>
>>>>>>>>> wrote:
>>>>>>>>>
>>>>>>>>>> I had a discussion with Rehman last week and we discovered that
>>>>>>>>>> the TimersMap
>>>>>>>>>> related tests were not running for all runners because they were
>>>>>>>>>> not tagged as
>>>>>>>>>> part of the ValidatesRunner category. I opened a PR [1] to enable
>>>>>>>>>> this, so
>>>>>>>>>> please someone help me with the review/merge.
>>>>>>>>>>
>>>>>>>>>> I took a look just for curiosity and discovered that they are
>>>>>>>>>> only passing for
>>>>>>>>>> Direct runner and for the classic Flink runner in batch mode.
>>>>>>>>>> They are not
>>>>>>>>>> passing for Dataflow [2][3] and for the Portable Flink runner, so
>>>>>>>>>> probably worth
>>>>>>>>>> to reopen the issue to investigate/fix.
>>>>>>>>>>
>>>>>>>>>> [1] https://github.com/apache/beam/pull/10747
>>>>>>>>>> [2]
>>>>>>>>>> https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_PR/210/
>>>>>>>>>> [3]
>>>>>>>>>> https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_PortabilityApi_Dataflow_PR/76/
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> On Sat, Jan 25, 2020 at 1:26 AM Reuven Lax <re...@google.com>
>>>>>>>>>> wrote:
>>>>>>>>>>
>>>>>>>>>>> Yes. For now we exclude the flink runner, but fixing this should
>>>>>>>>>>> be fairly trivial.
>>>>>>>>>>>
>>>>>>>>>>> On Fri, Jan 24, 2020 at 3:35 PM Maximilian Michels <
>>>>>>>>>>> m...@apache.org> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> The Flink Runner was allowing to set a timer multiple times
>>>>>>>>>>>> before we
>>>>>>>>>>>> made it comply with the Beam semantics of overwriting past
>>>>>>>>>>>> invocations.
>>>>>>>>>>>> I wouldn't be surprised if the Spark Runner never addressed
>>>>>>>>>>>> this. Flink
>>>>>>>>>>>> and Spark itself allow for a timer to be set to multiple times.
>>>>>>>>>>>> In order
>>>>>>>>>>>> to fix this for Beam, the Flink Runner has to maintain a
>>>>>>>>>>>> checkpointed
>>>>>>>>>>>> map which sits outside of its builtin TimerService.
>>>>>>>>>>>>
>>>>>>>>>>>> As far as I can see, multiple timer families are currently not
>>>>>>>>>>>> supported
>>>>>>>>>>>> in the Flink Runner due to the map not taking the family name
>>>>>>>>>>>> into
>>>>>>>>>>>> account. This can be easily fixed though.
>>>>>>>>>>>>
>>>>>>>>>>>> -Max
>>>>>>>>>>>>
>>>>>>>>>>>> On 24.01.20 21:31, Reuven Lax wrote:
>>>>>>>>>>>> > The new timer family is in the portability protos. I think
>>>>>>>>>>>> TimerReceiver
>>>>>>>>>>>> > needs to be updated to set it though (I think a 1-line
>>>>>>>>>>>> change).
>>>>>>>>>>>> >
>>>>>>>>>>>> > The TimerInternals class that runners implement today already
>>>>>>>>>>>> handles
>>>>>>>>>>>> > dynamic timers, so most of the work was in the Beam SDK  to
>>>>>>>>>>>> provide an
>>>>>>>>>>>> > API that allows users to access this feature.
>>>>>>>>>>>> >
>>>>>>>>>>>> > The main work needed in the runner was to take in account the
>>>>>>>>>>>> timer
>>>>>>>>>>>> > family. Beam semantics say that if a timer is set twice with
>>>>>>>>>>>> the same
>>>>>>>>>>>> > id, then the second timer overwrites the first.  Several
>>>>>>>>>>>> runners
>>>>>>>>>>>> > therefore had maps from timer id -> timer. However since the
>>>>>>>>>>>> > timer family scopes the timers, we now allow two timers with
>>>>>>>>>>>> the same id
>>>>>>>>>>>> > as long as the timer families are different. Runners had to
>>>>>>>>>>>> be updated
>>>>>>>>>>>> > to include the timer family id in the map keys.
>>>>>>>>>>>> >
>>>>>>>>>>>> > Surprisingly, the new TimerMap tests seem to pass on Spark
>>>>>>>>>>>> > ValidatesRunner, even though the Spark runner wasn't updated!
>>>>>>>>>>>> I wonder
>>>>>>>>>>>> > if this means that the Spark runner was incorrectly
>>>>>>>>>>>> implementing the
>>>>>>>>>>>> > Beam semantics before, and setTimer was not overwriting
>>>>>>>>>>>> timers with the
>>>>>>>>>>>> > same id?
>>>>>>>>>>>> >
>>>>>>>>>>>> > Reuven
>>>>>>>>>>>> >
>>>>>>>>>>>> > On Fri, Jan 24, 2020 at 7:31 AM Ismaël Mejía <
>>>>>>>>>>>> ieme...@gmail.com
>>>>>>>>>>>> > <mailto:ieme...@gmail.com>> wrote:
>>>>>>>>>>>> >
>>>>>>>>>>>> >     This looks great, thanks for the contribution Rehman!
>>>>>>>>>>>> >
>>>>>>>>>>>> >     I have some questions (note I have not looked at the code
>>>>>>>>>>>> at all).
>>>>>>>>>>>> >
>>>>>>>>>>>> >     - Is this working for both portable and non portable
>>>>>>>>>>>> runners?
>>>>>>>>>>>> >     - What do other runners need to implement to support this
>>>>>>>>>>>> (e.g. Spark)?
>>>>>>>>>>>> >
>>>>>>>>>>>> >     Maybe worth to add this to the website Compatibility
>>>>>>>>>>>> Matrix.
>>>>>>>>>>>> >
>>>>>>>>>>>> >     Regards,
>>>>>>>>>>>> >     Ismaël
>>>>>>>>>>>> >
>>>>>>>>>>>> >
>>>>>>>>>>>> >     On Fri, Jan 24, 2020 at 8:42 AM Rehman Murad Ali
>>>>>>>>>>>> >     <rehman.murad...@venturedive.com
>>>>>>>>>>>> >     <mailto:rehman.murad...@venturedive.com>> wrote:
>>>>>>>>>>>> >
>>>>>>>>>>>> >         Thank you Reuven for the guidance throughout the
>>>>>>>>>>>> development
>>>>>>>>>>>> >         process. I am delighted to contribute my two cents to
>>>>>>>>>>>> the Beam
>>>>>>>>>>>> >         project.
>>>>>>>>>>>> >
>>>>>>>>>>>> >         Looking forward to more active contributions.
>>>>>>>>>>>> >
>>>>>>>>>>>> >         *
>>>>>>>>>>>> >         *
>>>>>>>>>>>> >
>>>>>>>>>>>> >         *Thanks & Regards____*
>>>>>>>>>>>> >
>>>>>>>>>>>> >
>>>>>>>>>>>> >
>>>>>>>>>>>> >         *Rehman Murad Ali*
>>>>>>>>>>>> >         Software Engineer
>>>>>>>>>>>> >         Mobile: +92 3452076766 <+92%20345%202076766>
>>>>>>>>>>>> <tel:+92%20345%202076766>
>>>>>>>>>>>> >         Skype: rehman.muradali
>>>>>>>>>>>> >
>>>>>>>>>>>> >
>>>>>>>>>>>> >
>>>>>>>>>>>> >         On Thu, Jan 23, 2020 at 11:09 PM Reuven Lax <
>>>>>>>>>>>> re...@google.com
>>>>>>>>>>>> >         <mailto:re...@google.com>> wrote:
>>>>>>>>>>>> >
>>>>>>>>>>>> >             Thanks to a lot of hard work by Rehman, Beam now
>>>>>>>>>>>> supports
>>>>>>>>>>>> >             dynamic timers. As a reminder, this was discussed
>>>>>>>>>>>> on the dev
>>>>>>>>>>>> >             list some time back.
>>>>>>>>>>>> >
>>>>>>>>>>>> >             As background, previously one had to statically
>>>>>>>>>>>> declare all
>>>>>>>>>>>> >             timers in your code. So if you wanted to have two
>>>>>>>>>>>> timers,
>>>>>>>>>>>> >             you needed to create two timer variables and two
>>>>>>>>>>>> callbacks -
>>>>>>>>>>>> >             one for each timer. A number of users kept
>>>>>>>>>>>> hitting stumbling
>>>>>>>>>>>> >             blocks where they needed a dynamic set of timers
>>>>>>>>>>>> (often
>>>>>>>>>>>> >             based on the element), which was not supported in
>>>>>>>>>>>> Beam. The
>>>>>>>>>>>> >             workarounds were quite ugly and complicated.
>>>>>>>>>>>> >
>>>>>>>>>>>> >             The new support allows declaring a TimerMap,
>>>>>>>>>>>> which is a map
>>>>>>>>>>>> >             of timers. Each TimerMap is scoped by a family
>>>>>>>>>>>> name, so you
>>>>>>>>>>>> >             can create multiple TimerMaps each with its own
>>>>>>>>>>>> callback.
>>>>>>>>>>>> >             The use looks as follows:
>>>>>>>>>>>> >
>>>>>>>>>>>> >             class MyDoFn extends DoFn<...> {
>>>>>>>>>>>> >                 @TimerFamily("timers")
>>>>>>>>>>>> >                 private final TimerSpec timerMap =
>>>>>>>>>>>> >             TimerSpecs.timerMap(TimeDomain.EVENT_TIME);
>>>>>>>>>>>> >
>>>>>>>>>>>> >                 @ProcessElement
>>>>>>>>>>>> >                  public void process(@TimerFamily("timers")
>>>>>>>>>>>> TimerMap
>>>>>>>>>>>> >             timers, @Element Type e) {
>>>>>>>>>>>> >                     timers.set("mainTimer", timestamp);
>>>>>>>>>>>> >                     timers.set("actionType" +
>>>>>>>>>>>> e.getActionType(), timestamp);
>>>>>>>>>>>> >                 }
>>>>>>>>>>>> >
>>>>>>>>>>>> >                @OnTimerFamily .
>>>>>>>>>>>> >                public void onTimer(@TimerId String timerId) {
>>>>>>>>>>>> >                   System.out.println("Timer fired. id: " +
>>>>>>>>>>>> timerId);
>>>>>>>>>>>> >                }
>>>>>>>>>>>> >             }
>>>>>>>>>>>> >
>>>>>>>>>>>> >             This currently works for the Flink and the
>>>>>>>>>>>> Dataflow runners.
>>>>>>>>>>>> >
>>>>>>>>>>>> >             Thank you Rehman for getting this done! Beam
>>>>>>>>>>>> users will find
>>>>>>>>>>>> >             it very valuable.
>>>>>>>>>>>> >
>>>>>>>>>>>> >             Reuven
>>>>>>>>>>>> >
>>>>>>>>>>>>
>>>>>>>>>>>

Reply via email to