This sounds like a useful feature, if I understand it: a generic transform (build on a generic stateful DoFn) where the end-user provides a monotonic predicate over the input it has seen. It emits a notification exactly once when the predicate is first satisfied. To be efficient, it will also need some form of summarization over the input seen.
Notify .withSummarizer(combineFn) .withPredicate(summary -> ...) Something like that? The complexity is not much less than just writing a stateful DoFn directly, but the boilerplate is much less. Kenn On Thu, Nov 7, 2019 at 2:02 PM Steve Niemitz <sniem...@apache.org> wrote: > Interestingly enough, we just had a use case come up that I think could > have been solved by finishing triggers. > > Basically, we want to emit a notification when a certain threshold is > reached (in this case, we saw at least N elements for a given key), and > then never notify again within that window. As mentioned, we can > accomplish this using a stateful DoFn as mentioned above, but I thought it > was interesting that this just came up, and wanted to share. > > Maybe it'd be worth building something to simulate this into the SDK? > > On Mon, Nov 4, 2019 at 8:15 PM Kenneth Knowles <k...@apache.org> wrote: > >> By the way, adding this guard uncovered two bugs in Beam's Java codebase, >> luckily only benchmarks and tests. There were *no* non-buggy instances of a >> finishing trigger. They both declare allowed lateness that is never used. >> >> Nexmark query 10: >> >> // Clear fancy triggering from above. >> .apply( >> Window.<KV<Void, OutputFile>>into(...) >> .triggering(AfterWatermark.pastEndOfWindow()) >> // We expect no late data here, but we'll assume the >> worst so we can detect any. >> .withAllowedLateness(Duration.standardDays(1)) >> .discardingFiredPanes()) >> >> This is nonsensical: the trigger will fire once and close, never firing >> again. So the allowed lateness has no effect except to change counters from >> "dropped due to lateness" to "dropped due to trigger closing". The intent >> would appear to be to restore the default triggering, but it failed. >> >> PipelineTranslationTest: >> >> Window.<Long>into(FixedWindows.of(Duration.standardMinutes(7))) >> .triggering( >> AfterWatermark.pastEndOfWindow() >> >> .withEarlyFirings(AfterPane.elementCountAtLeast(19))) >> .accumulatingFiredPanes() >> .withAllowedLateness(Duration.standardMinutes(3L))); >> >> Again, the allowed lateness has no effect. This test is just to test >> portable proto round-trip. But still it is odd to write a nonsensical >> pipeline for this. >> >> Takeaway: experienced Beam developers never use this pattern, but they >> still get it wrong and create pipelines that would have data loss bugs >> because of it. >> >> Since there is no other discussion here, I will trust the community is OK >> with this change and follow Jan's review of my implementation of his idea. >> >> Kenn >> >> >> On Thu, Oct 31, 2019 at 4:06 PM Kenneth Knowles <k...@apache.org> wrote: >> >>> Opened https://github.com/apache/beam/pull/9960 for this idea. This >>> will alert users to broken pipelines and force them to alter them. >>> >>> Kenn >>> >>> On Thu, Oct 31, 2019 at 2:12 PM Kenneth Knowles <k...@apache.org> wrote: >>> >>>> On Thu, Oct 31, 2019 at 2:11 AM Jan Lukavský <je...@seznam.cz> wrote: >>>> >>>>> Hi Kenn, >>>>> >>>>> does there still remain some use for trigger to finish? If we don't >>>>> drop >>>>> data, would it still be of any use to users? If not, would it be >>>>> better >>>>> to just remove the functionality completely, so that users who use it >>>>> (and it will possibly break for them) are aware of it at compile time? >>>>> >>>>> Jan >>>>> >>>> >>>> Good point. I believe there is no good use for a top-level trigger >>>> finishing. As mentioned, the intended uses aren't really met by triggers, >>>> but are met by stateful DoFn. >>>> >>>> Eugene's bug even has this title :-). We could not change any behavior >>>> but just reject pipelines with broken top-level triggers. This is probably >>>> a better solution. Because if a user has a broken trigger, the new behavior >>>> is probably not enough to magically fix their pipeline. They are better off >>>> knowing that they are broken and fixing it. >>>> >>>> And at that point, there is a lot of dead code and my PR is really just >>>> cleaning it up as a simplification. >>>> >>>> Kenn >>>> >>>> >>>> >>>>> On 10/30/19 11:26 PM, Kenneth Knowles wrote: >>>>> > Problem: a trigger can "finish" which causes a window to "close" and >>>>> > drop all remaining data arriving for that window. >>>>> > >>>>> > This has been discussed many times and I thought fixed, but it seems >>>>> > to not be fixed. It does not seem to have its own Jira or thread >>>>> that >>>>> > I can find. But here are some pointers: >>>>> > >>>>> > - data loss bug: >>>>> > >>>>> https://lists.apache.org/thread.html/ce413231d0b7d52019668765186ef27a7ffb69b151fdb34f4bf80b0f@%3Cdev.beam.apache.org%3E >>>>> > - user hitting the bug: >>>>> > >>>>> https://lists.apache.org/thread.html/28879bc80cd5c7ef1a3e38cb1d2c063165d40c13c02894bbccd66aca@%3Cuser.beam.apache.org%3E >>>>> > - user confusion: >>>>> > >>>>> https://lists.apache.org/thread.html/2707aa449c8c6de1c6e3e8229db396323122304c14931c44d0081449@%3Cuser.beam.apache.org%3E >>>>> > - thread from 2016 on the topic: >>>>> > >>>>> https://lists.apache.org/thread.html/5f44b62fdaf34094ccff8da2a626b7cd344d29a8a0fff6eac8e148ea@%3Cdev.beam.apache.org%3E >>>>> > >>>>> > In theory, trigger finishing was intended for users who can get >>>>> their >>>>> > answers from a smaller amount of data and then drop the rest. In >>>>> > practice, triggers aren't really expressive enough for this. >>>>> Stateful >>>>> > DoFn is the solution for these cases. >>>>> > >>>>> > I've opened https://github.com/apache/beam/pull/9942 which makes >>>>> the >>>>> > following changes: >>>>> > >>>>> > - when a trigger says it is finished, it never fires again but data >>>>> > is still kept >>>>> > - at GC time the final output will be emitted >>>>> > >>>>> > As with all bugfixes, this is backwards-incompatible (if your >>>>> pipeline >>>>> > relies on buggy behavior, it will stop working). So this is a major >>>>> > change that I wanted to discuss on dev@. >>>>> > >>>>> > Kenn >>>>> > >>>>> >>>>