On Wed, Feb 17, 2021 at 1:06 PM Robert Bradshaw <rober...@google.com> wrote:
> I would prefer to leave downstream triggering up to the runner (or, > better, leave upstream triggering up to the runner, a la sink triggers), > but one problem is that without an explicit AfterSynchronizedProcessingTime > one can't tell if the downstream ProcessingTime between two groupings is > due to an explicit re-triggering between them or inherited from one to the > other. > I mean to propose that there should be no triggering specified unless due to explicit re-triggering. (and BTW yes I agree about sink triggers, but we need retractions and probably some theoretical work before we can aim for that) Kenn > On Wed, Feb 17, 2021 at 12:37 PM Kenneth Knowles <k...@apache.org> wrote: > >> Just for the thread I want to comment on another, more drastic approach: >> eliminate continuation triggers from the model, leaving downstream >> triggering up to a runner. This approach is not viable because transforms >> may need to change their behavior based on whether or not a trigger will >> fire more than once. Transforms can and do inspect the windowing strategy >> to do things differently. >> >> Kenn >> >> On Wed, Feb 17, 2021 at 11:47 AM Reuven Lax <re...@google.com> wrote: >> >>> I'll say that synchronized processing time has confused users before. >>> Users sometimes use processing-time triggers to optimize latency, banking >>> that that will decouple stage latency from the long-tail latency of >>> previous stages. However continuation triggers silently switching to >>> synchronized processing time has defeated that, and it wasn't clear to >>> users why. >>> >>> On Wed, Feb 17, 2021 at 11:12 AM Robert Bradshaw <rober...@google.com> >>> wrote: >>> >>>> On Fri, Feb 12, 2021 at 9:09 AM Kenneth Knowles <k...@apache.org> >>>> wrote: >>>> >>>>> >>>>> On Thu, Feb 11, 2021 at 9:38 PM Robert Bradshaw <rober...@google.com> >>>>> wrote: >>>>> >>>>>> Of course the right answer is to just implement sink triggers and >>>>>> sidestep the question altogether :). >>>>>> >>>>>> In the meantime, I think leaving AfterSynchronizedProcessingTime in >>>>>> the model makes the most sense, and runners can choose an implementation >>>>>> between firing eagerly and waiting some amount of time until they think >>>>>> all >>>>>> (most?) downstream results are in before firing, depending on how smart >>>>>> the >>>>>> runner wants to be. As you point out, they're all correct, and we'll have >>>>>> multiple firings due to the upstream trigger anyway, and this is safer >>>>>> than >>>>>> it used to be (though still possibly requires work). >>>>>> >>>>> >>>>> Just to clarify, as I got a little confused, is your suggestion: Leave >>>>> AfterSynchronizedProcessingTime* triggers in the model/proto, let the SDK >>>>> put them in where they want, and let runners decide how to interpret them? >>>>> (this SGTM and requires the least/no changes) >>>>> >>>> >>>> Yep. We may want to update Python/Go to produce >>>> AfterSynchronizedProcessingTime downstream of ProcessingTime triggers too, >>>> eventually, to better express intent. >>>> >>>> >>>>> Kenn >>>>> >>>>> *noting that TimeDomain.SYNCHRONIZED_PROCESSING_TIME is not related to >>>>> this, except in implementation, and should be removed either way. >>>>> >>>>> >>>>>> On Wed, Feb 10, 2021 at 1:37 PM Kenneth Knowles <k...@apache.org> >>>>>> wrote: >>>>>> >>>>>>> Hi all, >>>>>>> >>>>>>> TL;DR: >>>>>>> 1. should we replace "after synchronized processing time" with >>>>>>> "after count 1"? >>>>>>> 2. should we remove "continuation trigger" and leave this to runners? >>>>>>> >>>>>>> ---- >>>>>>> >>>>>>> "AfterSynchronizedProcessingTime" triggers were invented to solve a >>>>>>> specific problem. They are inconsistent across SDKs today. >>>>>>> >>>>>>> - You have an aggregation/GBK with aligned processing time trigger >>>>>>> like ("output every minute on the minute") >>>>>>> - You have a downstream aggregation/GBK between that and the sink >>>>>>> - You expect to have about one output every minute per key+window >>>>>>> pair >>>>>>> >>>>>>> Any output of the upstream aggregation may contribute to any >>>>>>> key+window of the downstream aggregation. The >>>>>>> AfterSynchronizedProcessingTime trigger waits for all the processing >>>>>>> time >>>>>>> based triggers to fire and commit their outputs. The downstream >>>>>>> aggregation >>>>>>> will output as fast as possible in panes consistent with the upstream >>>>>>> aggregation. >>>>>>> >>>>>>> - The Java SDK behavior is as above, to output "as fast as >>>>>>> reasonable". >>>>>>> - The Python SDK never uses "AfterSynchronizedProcessingTime" >>>>>>> triggers but simply propagates the same trigger to the next GBK, >>>>>>> creating >>>>>>> additional delay. >>>>>>> - I don't know what the Go SDK may do, if it supports this at all. >>>>>>> >>>>>>> Any behavior could be defined as "correct". A simple option could be >>>>>>> to have the downstream aggregation "fire always" aka "after element >>>>>>> count >>>>>>> 1". How would this change things? We would potentially see many more >>>>>>> outputs. >>>>>>> >>>>>>> Why did we do this in the first place? There are (at least) these >>>>>>> reasons: >>>>>>> >>>>>>> - Previously, triggers could "finish" an aggregation thus dropping >>>>>>> all further data. In this case, waiting for all outputs is critical or >>>>>>> else >>>>>>> you lose data. Now triggers cannot finish aggregations. >>>>>>> - Whenever there may be more than one pane, a user has to write >>>>>>> logic to compensate and deal with it. Changing from guaranteed single >>>>>>> pane >>>>>>> to multi-pane would break things. So if the user configures a single >>>>>>> firing, all downstream aggregations must respect it. Now that triggers >>>>>>> cannot finish, I think processing time can only be used in multi-pane >>>>>>> contexts anyhow. >>>>>>> - The above example illustrates how the behavior in Java maintains >>>>>>> something that the user will expect. Or so we think. Maybe users don't >>>>>>> care. >>>>>>> >>>>>>> How did we get into this inconsistent state? When the user specifies >>>>>>> triggering it applies to the very nearest aggregation/GBK. The SDK >>>>>>> decides >>>>>>> what triggering to insert downstream. One possibility is to remove this >>>>>>> and >>>>>>> have it unspecified, left to runner behavior. >>>>>>> >>>>>>> I think maybe these pieces of complexity are both not helpful and >>>>>>> also not (necessarily) breaking changes to alter, especially >>>>>>> considering we >>>>>>> have inconsistency in the model. >>>>>>> >>>>>>> WDYT? And I wonder what this means for xlang and portability... how >>>>>>> does continuation triggering even work? (if at all) >>>>>>> >>>>>>> Kenn >>>>>>> >>>>>>