And another: https://stackoverflow.com/questions/55748746/issues-with-dynamic-destinations-in-dataflow
On Thu, Nov 14, 2019 at 1:35 AM Kenneth Knowles <k...@apache.org> wrote: > > > On Fri, Nov 8, 2019 at 9:44 AM Steve Niemitz <sniem...@apache.org> wrote: > >> Yeah that looks like what I had in mind too. I think the most useful >> notification output would be a KV of (K, summary)? >> > > Sounds about right. Some use cases may not care about the summary, but > just the notification. But for most runners passing extra in-memory data to > a subsequent projection which drops it is essentially free. > > Kenn > > >> On Fri, Nov 8, 2019 at 12:38 PM Kenneth Knowles <k...@apache.org> wrote: >> >>> This sounds like a useful feature, if I understand it: a generic >>> transform (build on a generic stateful DoFn) where the end-user provides a >>> monotonic predicate over the input it has seen. It emits a notification >>> exactly once when the predicate is first satisfied. To be efficient, it >>> will also need some form of summarization over the input seen. >>> >>> Notify >>> .withSummarizer(combineFn) >>> .withPredicate(summary -> ...) >>> >>> Something like that? The complexity is not much less than just writing a >>> stateful DoFn directly, but the boilerplate is much less. >>> >>> Kenn >>> >>> On Thu, Nov 7, 2019 at 2:02 PM Steve Niemitz <sniem...@apache.org> >>> wrote: >>> >>>> Interestingly enough, we just had a use case come up that I think could >>>> have been solved by finishing triggers. >>>> >>>> Basically, we want to emit a notification when a certain threshold is >>>> reached (in this case, we saw at least N elements for a given key), and >>>> then never notify again within that window. As mentioned, we can >>>> accomplish this using a stateful DoFn as mentioned above, but I thought it >>>> was interesting that this just came up, and wanted to share. >>>> >>>> Maybe it'd be worth building something to simulate this into the SDK? >>>> >>>> On Mon, Nov 4, 2019 at 8:15 PM Kenneth Knowles <k...@apache.org> wrote: >>>> >>>>> By the way, adding this guard uncovered two bugs in Beam's Java >>>>> codebase, luckily only benchmarks and tests. There were *no* non-buggy >>>>> instances of a finishing trigger. They both declare allowed lateness that >>>>> is never used. >>>>> >>>>> Nexmark query 10: >>>>> >>>>> // Clear fancy triggering from above. >>>>> .apply( >>>>> Window.<KV<Void, OutputFile>>into(...) >>>>> .triggering(AfterWatermark.pastEndOfWindow()) >>>>> // We expect no late data here, but we'll assume the >>>>> worst so we can detect any. >>>>> .withAllowedLateness(Duration.standardDays(1)) >>>>> .discardingFiredPanes()) >>>>> >>>>> This is nonsensical: the trigger will fire once and close, never >>>>> firing again. So the allowed lateness has no effect except to change >>>>> counters from "dropped due to lateness" to "dropped due to trigger >>>>> closing". The intent would appear to be to restore the default triggering, >>>>> but it failed. >>>>> >>>>> PipelineTranslationTest: >>>>> >>>>> >>>>> Window.<Long>into(FixedWindows.of(Duration.standardMinutes(7))) >>>>> .triggering( >>>>> AfterWatermark.pastEndOfWindow() >>>>> >>>>> .withEarlyFirings(AfterPane.elementCountAtLeast(19))) >>>>> .accumulatingFiredPanes() >>>>> .withAllowedLateness(Duration.standardMinutes(3L))); >>>>> >>>>> Again, the allowed lateness has no effect. This test is just to test >>>>> portable proto round-trip. But still it is odd to write a nonsensical >>>>> pipeline for this. >>>>> >>>>> Takeaway: experienced Beam developers never use this pattern, but they >>>>> still get it wrong and create pipelines that would have data loss bugs >>>>> because of it. >>>>> >>>>> Since there is no other discussion here, I will trust the community is >>>>> OK with this change and follow Jan's review of my implementation of his >>>>> idea. >>>>> >>>>> Kenn >>>>> >>>>> >>>>> On Thu, Oct 31, 2019 at 4:06 PM Kenneth Knowles <k...@apache.org> >>>>> wrote: >>>>> >>>>>> Opened https://github.com/apache/beam/pull/9960 for this idea. This >>>>>> will alert users to broken pipelines and force them to alter them. >>>>>> >>>>>> Kenn >>>>>> >>>>>> On Thu, Oct 31, 2019 at 2:12 PM Kenneth Knowles <k...@apache.org> >>>>>> wrote: >>>>>> >>>>>>> On Thu, Oct 31, 2019 at 2:11 AM Jan Lukavský <je...@seznam.cz> >>>>>>> wrote: >>>>>>> >>>>>>>> Hi Kenn, >>>>>>>> >>>>>>>> does there still remain some use for trigger to finish? If we don't >>>>>>>> drop >>>>>>>> data, would it still be of any use to users? If not, would it be >>>>>>>> better >>>>>>>> to just remove the functionality completely, so that users who use >>>>>>>> it >>>>>>>> (and it will possibly break for them) are aware of it at compile >>>>>>>> time? >>>>>>>> >>>>>>>> Jan >>>>>>>> >>>>>>> >>>>>>> Good point. I believe there is no good use for a top-level trigger >>>>>>> finishing. As mentioned, the intended uses aren't really met by >>>>>>> triggers, >>>>>>> but are met by stateful DoFn. >>>>>>> >>>>>>> Eugene's bug even has this title :-). We could not change any >>>>>>> behavior but just reject pipelines with broken top-level triggers. This >>>>>>> is >>>>>>> probably a better solution. Because if a user has a broken trigger, the >>>>>>> new >>>>>>> behavior is probably not enough to magically fix their pipeline. They >>>>>>> are >>>>>>> better off knowing that they are broken and fixing it. >>>>>>> >>>>>>> And at that point, there is a lot of dead code and my PR is really >>>>>>> just cleaning it up as a simplification. >>>>>>> >>>>>>> Kenn >>>>>>> >>>>>>> >>>>>>> >>>>>>>> On 10/30/19 11:26 PM, Kenneth Knowles wrote: >>>>>>>> > Problem: a trigger can "finish" which causes a window to "close" >>>>>>>> and >>>>>>>> > drop all remaining data arriving for that window. >>>>>>>> > >>>>>>>> > This has been discussed many times and I thought fixed, but it >>>>>>>> seems >>>>>>>> > to not be fixed. It does not seem to have its own Jira or thread >>>>>>>> that >>>>>>>> > I can find. But here are some pointers: >>>>>>>> > >>>>>>>> > - data loss bug: >>>>>>>> > >>>>>>>> https://lists.apache.org/thread.html/ce413231d0b7d52019668765186ef27a7ffb69b151fdb34f4bf80b0f@%3Cdev.beam.apache.org%3E >>>>>>>> > - user hitting the bug: >>>>>>>> > >>>>>>>> https://lists.apache.org/thread.html/28879bc80cd5c7ef1a3e38cb1d2c063165d40c13c02894bbccd66aca@%3Cuser.beam.apache.org%3E >>>>>>>> > - user confusion: >>>>>>>> > >>>>>>>> https://lists.apache.org/thread.html/2707aa449c8c6de1c6e3e8229db396323122304c14931c44d0081449@%3Cuser.beam.apache.org%3E >>>>>>>> > - thread from 2016 on the topic: >>>>>>>> > >>>>>>>> https://lists.apache.org/thread.html/5f44b62fdaf34094ccff8da2a626b7cd344d29a8a0fff6eac8e148ea@%3Cdev.beam.apache.org%3E >>>>>>>> > >>>>>>>> > In theory, trigger finishing was intended for users who can get >>>>>>>> their >>>>>>>> > answers from a smaller amount of data and then drop the rest. In >>>>>>>> > practice, triggers aren't really expressive enough for this. >>>>>>>> Stateful >>>>>>>> > DoFn is the solution for these cases. >>>>>>>> > >>>>>>>> > I've opened https://github.com/apache/beam/pull/9942 which makes >>>>>>>> the >>>>>>>> > following changes: >>>>>>>> > >>>>>>>> > - when a trigger says it is finished, it never fires again but >>>>>>>> data >>>>>>>> > is still kept >>>>>>>> > - at GC time the final output will be emitted >>>>>>>> > >>>>>>>> > As with all bugfixes, this is backwards-incompatible (if your >>>>>>>> pipeline >>>>>>>> > relies on buggy behavior, it will stop working). So this is a >>>>>>>> major >>>>>>>> > change that I wanted to discuss on dev@. >>>>>>>> > >>>>>>>> > Kenn >>>>>>>> > >>>>>>>> >>>>>>>