I would prefer to leave downstream triggering up to the runner (or, better, leave upstream triggering up to the runner, a la sink triggers), but one problem is that without an explicit AfterSynchronizedProcessingTime one can't tell if the downstream ProcessingTime between two groupings is due to an explicit re-triggering between them or inherited from one to the other.
On Wed, Feb 17, 2021 at 12:37 PM Kenneth Knowles <k...@apache.org> wrote: > Just for the thread I want to comment on another, more drastic approach: > eliminate continuation triggers from the model, leaving downstream > triggering up to a runner. This approach is not viable because transforms > may need to change their behavior based on whether or not a trigger will > fire more than once. Transforms can and do inspect the windowing strategy > to do things differently. > > Kenn > > On Wed, Feb 17, 2021 at 11:47 AM Reuven Lax <re...@google.com> wrote: > >> I'll say that synchronized processing time has confused users before. >> Users sometimes use processing-time triggers to optimize latency, banking >> that that will decouple stage latency from the long-tail latency of >> previous stages. However continuation triggers silently switching to >> synchronized processing time has defeated that, and it wasn't clear to >> users why. >> >> On Wed, Feb 17, 2021 at 11:12 AM Robert Bradshaw <rober...@google.com> >> wrote: >> >>> On Fri, Feb 12, 2021 at 9:09 AM Kenneth Knowles <k...@apache.org> wrote: >>> >>>> >>>> On Thu, Feb 11, 2021 at 9:38 PM Robert Bradshaw <rober...@google.com> >>>> wrote: >>>> >>>>> Of course the right answer is to just implement sink triggers and >>>>> sidestep the question altogether :). >>>>> >>>>> In the meantime, I think leaving AfterSynchronizedProcessingTime in >>>>> the model makes the most sense, and runners can choose an implementation >>>>> between firing eagerly and waiting some amount of time until they think >>>>> all >>>>> (most?) downstream results are in before firing, depending on how smart >>>>> the >>>>> runner wants to be. As you point out, they're all correct, and we'll have >>>>> multiple firings due to the upstream trigger anyway, and this is safer >>>>> than >>>>> it used to be (though still possibly requires work). >>>>> >>>> >>>> Just to clarify, as I got a little confused, is your suggestion: Leave >>>> AfterSynchronizedProcessingTime* triggers in the model/proto, let the SDK >>>> put them in where they want, and let runners decide how to interpret them? >>>> (this SGTM and requires the least/no changes) >>>> >>> >>> Yep. We may want to update Python/Go to produce >>> AfterSynchronizedProcessingTime downstream of ProcessingTime triggers too, >>> eventually, to better express intent. >>> >>> >>>> Kenn >>>> >>>> *noting that TimeDomain.SYNCHRONIZED_PROCESSING_TIME is not related to >>>> this, except in implementation, and should be removed either way. >>>> >>>> >>>>> On Wed, Feb 10, 2021 at 1:37 PM Kenneth Knowles <k...@apache.org> >>>>> wrote: >>>>> >>>>>> Hi all, >>>>>> >>>>>> TL;DR: >>>>>> 1. should we replace "after synchronized processing time" with "after >>>>>> count 1"? >>>>>> 2. should we remove "continuation trigger" and leave this to runners? >>>>>> >>>>>> ---- >>>>>> >>>>>> "AfterSynchronizedProcessingTime" triggers were invented to solve a >>>>>> specific problem. They are inconsistent across SDKs today. >>>>>> >>>>>> - You have an aggregation/GBK with aligned processing time trigger >>>>>> like ("output every minute on the minute") >>>>>> - You have a downstream aggregation/GBK between that and the sink >>>>>> - You expect to have about one output every minute per key+window >>>>>> pair >>>>>> >>>>>> Any output of the upstream aggregation may contribute to any >>>>>> key+window of the downstream aggregation. The >>>>>> AfterSynchronizedProcessingTime trigger waits for all the processing time >>>>>> based triggers to fire and commit their outputs. The downstream >>>>>> aggregation >>>>>> will output as fast as possible in panes consistent with the upstream >>>>>> aggregation. >>>>>> >>>>>> - The Java SDK behavior is as above, to output "as fast as >>>>>> reasonable". >>>>>> - The Python SDK never uses "AfterSynchronizedProcessingTime" >>>>>> triggers but simply propagates the same trigger to the next GBK, creating >>>>>> additional delay. >>>>>> - I don't know what the Go SDK may do, if it supports this at all. >>>>>> >>>>>> Any behavior could be defined as "correct". A simple option could be >>>>>> to have the downstream aggregation "fire always" aka "after element count >>>>>> 1". How would this change things? We would potentially see many more >>>>>> outputs. >>>>>> >>>>>> Why did we do this in the first place? There are (at least) these >>>>>> reasons: >>>>>> >>>>>> - Previously, triggers could "finish" an aggregation thus dropping >>>>>> all further data. In this case, waiting for all outputs is critical or >>>>>> else >>>>>> you lose data. Now triggers cannot finish aggregations. >>>>>> - Whenever there may be more than one pane, a user has to write >>>>>> logic to compensate and deal with it. Changing from guaranteed single >>>>>> pane >>>>>> to multi-pane would break things. So if the user configures a single >>>>>> firing, all downstream aggregations must respect it. Now that triggers >>>>>> cannot finish, I think processing time can only be used in multi-pane >>>>>> contexts anyhow. >>>>>> - The above example illustrates how the behavior in Java maintains >>>>>> something that the user will expect. Or so we think. Maybe users don't >>>>>> care. >>>>>> >>>>>> How did we get into this inconsistent state? When the user specifies >>>>>> triggering it applies to the very nearest aggregation/GBK. The SDK >>>>>> decides >>>>>> what triggering to insert downstream. One possibility is to remove this >>>>>> and >>>>>> have it unspecified, left to runner behavior. >>>>>> >>>>>> I think maybe these pieces of complexity are both not helpful and >>>>>> also not (necessarily) breaking changes to alter, especially considering >>>>>> we >>>>>> have inconsistency in the model. >>>>>> >>>>>> WDYT? And I wonder what this means for xlang and portability... how >>>>>> does continuation triggering even work? (if at all) >>>>>> >>>>>> Kenn >>>>>> >>>>>