This sounds like a useful feature, if I understand it: a generic transform
(build on a generic stateful DoFn) where the end-user provides a monotonic
predicate over the input it has seen. It emits a notification exactly once
when the predicate is first satisfied. To be efficient, it will also need
some form of summarization over the input seen.

    Notify
      .withSummarizer(combineFn)
      .withPredicate(summary -> ...)

Something like that? The complexity is not much less than just writing a
stateful DoFn directly, but the boilerplate is much less.

Kenn

On Thu, Nov 7, 2019 at 2:02 PM Steve Niemitz <sniem...@apache.org> wrote:

> Interestingly enough, we just had a use case come up that I think could
> have been solved by finishing triggers.
>
> Basically, we want to emit a notification when a certain threshold is
> reached (in this case, we saw at least N elements for a given key), and
> then never notify again within that window.  As mentioned, we can
> accomplish this using a stateful DoFn as mentioned above, but I thought it
> was interesting that this just came up, and wanted to share.
>
> Maybe it'd be worth building something to simulate this into the SDK?
>
> On Mon, Nov 4, 2019 at 8:15 PM Kenneth Knowles <k...@apache.org> wrote:
>
>> By the way, adding this guard uncovered two bugs in Beam's Java codebase,
>> luckily only benchmarks and tests. There were *no* non-buggy instances of a
>> finishing trigger. They both declare allowed lateness that is never used.
>>
>> Nexmark query 10:
>>
>>         // Clear fancy triggering from above.
>>         .apply(
>>             Window.<KV<Void, OutputFile>>into(...)
>>                 .triggering(AfterWatermark.pastEndOfWindow())
>>                 // We expect no late data here, but we'll assume the
>> worst so we can detect any.
>>                 .withAllowedLateness(Duration.standardDays(1))
>>                 .discardingFiredPanes())
>>
>> This is nonsensical: the trigger will fire once and close, never firing
>> again. So the allowed lateness has no effect except to change counters from
>> "dropped due to lateness" to "dropped due to trigger closing". The intent
>> would appear to be to restore the default triggering, but it failed.
>>
>> PipelineTranslationTest:
>>
>>            Window.<Long>into(FixedWindows.of(Duration.standardMinutes(7)))
>>                 .triggering(
>>                     AfterWatermark.pastEndOfWindow()
>>
>> .withEarlyFirings(AfterPane.elementCountAtLeast(19)))
>>                 .accumulatingFiredPanes()
>>                 .withAllowedLateness(Duration.standardMinutes(3L)));
>>
>> Again, the allowed lateness has no effect. This test is just to test
>> portable proto round-trip. But still it is odd to write a nonsensical
>> pipeline for this.
>>
>> Takeaway: experienced Beam developers never use this pattern, but they
>> still get it wrong and create pipelines that would have data loss bugs
>> because of it.
>>
>> Since there is no other discussion here, I will trust the community is OK
>> with this change and follow Jan's review of my implementation of his idea.
>>
>> Kenn
>>
>>
>> On Thu, Oct 31, 2019 at 4:06 PM Kenneth Knowles <k...@apache.org> wrote:
>>
>>> Opened https://github.com/apache/beam/pull/9960 for this idea. This
>>> will alert users to broken pipelines and force them to alter them.
>>>
>>> Kenn
>>>
>>> On Thu, Oct 31, 2019 at 2:12 PM Kenneth Knowles <k...@apache.org> wrote:
>>>
>>>> On Thu, Oct 31, 2019 at 2:11 AM Jan Lukavský <je...@seznam.cz> wrote:
>>>>
>>>>> Hi Kenn,
>>>>>
>>>>> does there still remain some use for trigger to finish? If we don't
>>>>> drop
>>>>> data, would it still be of any use to users? If not, would it be
>>>>> better
>>>>> to just remove the functionality completely, so that users who use it
>>>>> (and it will possibly break for them) are aware of it at compile time?
>>>>>
>>>>> Jan
>>>>>
>>>>
>>>> Good point. I believe there is no good use for a top-level trigger
>>>> finishing. As mentioned, the intended uses aren't really met by triggers,
>>>> but are met by stateful DoFn.
>>>>
>>>> Eugene's bug even has this title :-). We could not change any behavior
>>>> but just reject pipelines with broken top-level triggers. This is probably
>>>> a better solution. Because if a user has a broken trigger, the new behavior
>>>> is probably not enough to magically fix their pipeline. They are better off
>>>> knowing that they are broken and fixing it.
>>>>
>>>> And at that point, there is a lot of dead code and my PR is really just
>>>> cleaning it up as a simplification.
>>>>
>>>> Kenn
>>>>
>>>>
>>>>
>>>>> On 10/30/19 11:26 PM, Kenneth Knowles wrote:
>>>>> > Problem: a trigger can "finish" which causes a window to "close" and
>>>>> > drop all remaining data arriving for that window.
>>>>> >
>>>>> > This has been discussed many times and I thought fixed, but it seems
>>>>> > to not be fixed. It does not seem to have its own Jira or thread
>>>>> that
>>>>> > I can find. But here are some pointers:
>>>>> >
>>>>> >  - data loss bug:
>>>>> >
>>>>> https://lists.apache.org/thread.html/ce413231d0b7d52019668765186ef27a7ffb69b151fdb34f4bf80b0f@%3Cdev.beam.apache.org%3E
>>>>> >  - user hitting the bug:
>>>>> >
>>>>> https://lists.apache.org/thread.html/28879bc80cd5c7ef1a3e38cb1d2c063165d40c13c02894bbccd66aca@%3Cuser.beam.apache.org%3E
>>>>> >  - user confusion:
>>>>> >
>>>>> https://lists.apache.org/thread.html/2707aa449c8c6de1c6e3e8229db396323122304c14931c44d0081449@%3Cuser.beam.apache.org%3E
>>>>> >  - thread from 2016 on the topic:
>>>>> >
>>>>> https://lists.apache.org/thread.html/5f44b62fdaf34094ccff8da2a626b7cd344d29a8a0fff6eac8e148ea@%3Cdev.beam.apache.org%3E
>>>>> >
>>>>> > In theory, trigger finishing was intended for users who can get
>>>>> their
>>>>> > answers from a smaller amount of data and then drop the rest. In
>>>>> > practice, triggers aren't really expressive enough for this.
>>>>> Stateful
>>>>> > DoFn is the solution for these cases.
>>>>> >
>>>>> > I've opened https://github.com/apache/beam/pull/9942 which makes
>>>>> the
>>>>> > following changes:
>>>>> >
>>>>> >  - when a trigger says it is finished, it never fires again but data
>>>>> > is still kept
>>>>> >  - at GC time the final output will be emitted
>>>>> >
>>>>> > As with all bugfixes, this is backwards-incompatible (if your
>>>>> pipeline
>>>>> > relies on buggy behavior, it will stop working). So this is a major
>>>>> > change that I wanted to discuss on dev@.
>>>>> >
>>>>> > Kenn
>>>>> >
>>>>>
>>>>

Reply via email to