Just for the thread I want to comment on another, more drastic approach: eliminate continuation triggers from the model, leaving downstream triggering up to a runner. This approach is not viable because transforms may need to change their behavior based on whether or not a trigger will fire more than once. Transforms can and do inspect the windowing strategy to do things differently.
Kenn On Wed, Feb 17, 2021 at 11:47 AM Reuven Lax <re...@google.com> wrote: > I'll say that synchronized processing time has confused users before. > Users sometimes use processing-time triggers to optimize latency, banking > that that will decouple stage latency from the long-tail latency of > previous stages. However continuation triggers silently switching to > synchronized processing time has defeated that, and it wasn't clear to > users why. > > On Wed, Feb 17, 2021 at 11:12 AM Robert Bradshaw <rober...@google.com> > wrote: > >> On Fri, Feb 12, 2021 at 9:09 AM Kenneth Knowles <k...@apache.org> wrote: >> >>> >>> On Thu, Feb 11, 2021 at 9:38 PM Robert Bradshaw <rober...@google.com> >>> wrote: >>> >>>> Of course the right answer is to just implement sink triggers and >>>> sidestep the question altogether :). >>>> >>>> In the meantime, I think leaving AfterSynchronizedProcessingTime in the >>>> model makes the most sense, and runners can choose an implementation >>>> between firing eagerly and waiting some amount of time until they think all >>>> (most?) downstream results are in before firing, depending on how smart the >>>> runner wants to be. As you point out, they're all correct, and we'll have >>>> multiple firings due to the upstream trigger anyway, and this is safer than >>>> it used to be (though still possibly requires work). >>>> >>> >>> Just to clarify, as I got a little confused, is your suggestion: Leave >>> AfterSynchronizedProcessingTime* triggers in the model/proto, let the SDK >>> put them in where they want, and let runners decide how to interpret them? >>> (this SGTM and requires the least/no changes) >>> >> >> Yep. We may want to update Python/Go to produce >> AfterSynchronizedProcessingTime downstream of ProcessingTime triggers too, >> eventually, to better express intent. >> >> >>> Kenn >>> >>> *noting that TimeDomain.SYNCHRONIZED_PROCESSING_TIME is not related to >>> this, except in implementation, and should be removed either way. >>> >>> >>>> On Wed, Feb 10, 2021 at 1:37 PM Kenneth Knowles <k...@apache.org> >>>> wrote: >>>> >>>>> Hi all, >>>>> >>>>> TL;DR: >>>>> 1. should we replace "after synchronized processing time" with "after >>>>> count 1"? >>>>> 2. should we remove "continuation trigger" and leave this to runners? >>>>> >>>>> ---- >>>>> >>>>> "AfterSynchronizedProcessingTime" triggers were invented to solve a >>>>> specific problem. They are inconsistent across SDKs today. >>>>> >>>>> - You have an aggregation/GBK with aligned processing time trigger >>>>> like ("output every minute on the minute") >>>>> - You have a downstream aggregation/GBK between that and the sink >>>>> - You expect to have about one output every minute per key+window pair >>>>> >>>>> Any output of the upstream aggregation may contribute to any >>>>> key+window of the downstream aggregation. The >>>>> AfterSynchronizedProcessingTime trigger waits for all the processing time >>>>> based triggers to fire and commit their outputs. The downstream >>>>> aggregation >>>>> will output as fast as possible in panes consistent with the upstream >>>>> aggregation. >>>>> >>>>> - The Java SDK behavior is as above, to output "as fast as >>>>> reasonable". >>>>> - The Python SDK never uses "AfterSynchronizedProcessingTime" >>>>> triggers but simply propagates the same trigger to the next GBK, creating >>>>> additional delay. >>>>> - I don't know what the Go SDK may do, if it supports this at all. >>>>> >>>>> Any behavior could be defined as "correct". A simple option could be >>>>> to have the downstream aggregation "fire always" aka "after element count >>>>> 1". How would this change things? We would potentially see many more >>>>> outputs. >>>>> >>>>> Why did we do this in the first place? There are (at least) these >>>>> reasons: >>>>> >>>>> - Previously, triggers could "finish" an aggregation thus dropping >>>>> all further data. In this case, waiting for all outputs is critical or >>>>> else >>>>> you lose data. Now triggers cannot finish aggregations. >>>>> - Whenever there may be more than one pane, a user has to write logic >>>>> to compensate and deal with it. Changing from guaranteed single pane to >>>>> multi-pane would break things. So if the user configures a single firing, >>>>> all downstream aggregations must respect it. Now that triggers cannot >>>>> finish, I think processing time can only be used in multi-pane contexts >>>>> anyhow. >>>>> - The above example illustrates how the behavior in Java maintains >>>>> something that the user will expect. Or so we think. Maybe users don't >>>>> care. >>>>> >>>>> How did we get into this inconsistent state? When the user specifies >>>>> triggering it applies to the very nearest aggregation/GBK. The SDK decides >>>>> what triggering to insert downstream. One possibility is to remove this >>>>> and >>>>> have it unspecified, left to runner behavior. >>>>> >>>>> I think maybe these pieces of complexity are both not helpful and also >>>>> not (necessarily) breaking changes to alter, especially considering we >>>>> have >>>>> inconsistency in the model. >>>>> >>>>> WDYT? And I wonder what this means for xlang and portability... how >>>>> does continuation triggering even work? (if at all) >>>>> >>>>> Kenn >>>>> >>>>