On Mon, Feb 22, 2021 at 11:51 AM Kenneth Knowles <k...@apache.org> wrote:
> I agree completely: Triggers control the output of the GBK. > > The issue is composite transforms, where there will be a GBK deep inside > some code and the user cannot adjust the triggering. > > What a user really wants is "sink triggers > <https://s.apache.org/beam-sink-triggers>" [1], a purely hypothetical > feature where they specify the latency requirements on each _output_ and > everything else is figured out automatically. Unfortunately, sink triggers > require retractions, so each PCollection can be a complete changelog. > Otherwise transformations cannot be transparently correct throughout a > pipeline and triggers cannot be decoupled from pipeline logic. Retractions > themselves are not necessarily complex in some cases (Flink SQL has them - > they are extra easy for "pure" code) but require a massive working of the > library of transforms, particularly IOs. And backwards compatibility > concerns for existing DoFns are somewhat tricky. We've had two prototypes > [2] [3] and some important design investigations [4], but no time to really > finish adding them, even as just an optional experiment. And once we have > retractions, there is still a lot to figure out to finish sink triggers. > They may not even really be possible! > > So for now, we do our best with the user setting up triggering at the > beginning of the pipeline instead of the end of the pipeline. The very > first GBK (which may be deep in library code) is controlled by the > triggering they set up and all the rest get the "continuation trigger" > which tries to just let the data flow. Unless they set up another bit of > triggering. Some of our transforms do this for various reasons. > > I think the conclusion of this particular thread is: > > - make all the SDKs use AfterSynchronizedProcessingTime triggers > - allow runners to do whatever they want when they see > AfterSynchronizedProcessingTime trigger > - remove TimeDomain.afterSynchronizedProcessingTime from the proto since > it is only for timers and they should not use this > - later, figure out if we want to add support for making downstream > triggering optional (could be useful prep for sink triggers) > +1 > [1] https://s.apache.org/beam-sink-triggers > [2] https://github.com/apache/beam/pull/4742 > [3] https://github.com/apache/beam/pull/9199 > [4] https://s.apache.org/beam-retractions > > On Mon, Feb 22, 2021 at 1:28 AM Jan Lukavský <je...@seznam.cz> wrote: > >> The same holds true for pane accumulation mode. >> >> Jan >> On 2/22/21 10:21 AM, Jan Lukavský wrote: >> >> Hi, >> >> I'm not sure if I got everything from this thread right, but from my >> point of view, triggers are property of GBK. They are property of neither >> windowing, nor PCollection, but relate solely to GBK. This can be seen from >> the fact, that unlike windowFn, triggers are completely ignored in stateful >> ParDo (there is no semantics for them, which is fine). It would be cool if >> the model could be adjusted for that - this would actually mean, that the >> correct place, where to specify triggering is not Window PTransform, but >> the GBK, i.e. >> >> input.apply(GroupByKey.create().triggering(...)) >> >> That would imply we simply have default trigger for all GBKs, unless >> explicitly changed, but for that particular instance only. I'm not sure >> what the impacts on pipeline compatibility would be, though. >> >> Jan >> On 2/19/21 12:09 AM, Robert Bradshaw wrote: >> >> On Wed, Feb 17, 2021 at 1:56 PM Kenneth Knowles <k...@apache.org> wrote: >> >>> >>> On Wed, Feb 17, 2021 at 1:06 PM Robert Bradshaw <rober...@google.com> >>> wrote: >>> >>>> I would prefer to leave downstream triggering up to the runner (or, >>>> better, leave upstream triggering up to the runner, a la sink triggers), >>>> but one problem is that without an explicit AfterSynchronizedProcessingTime >>>> one can't tell if the downstream ProcessingTime between two groupings is >>>> due to an explicit re-triggering between them or inherited from one to the >>>> other. >>>> >>> >>> I mean to propose that there should be no triggering specified unless >>> due to explicit re-triggering. >>> >> >> You're saying that we leave the trigger (and perhaps other) fields of the >> WindowingStrategy attached to PCollections downstream the first GBK unset >> in the proto? And let runners walk over the graph to infer it? I could be >> OK with making this legal, though updating all SDKs and Runners to handle >> this doesn't seem high priority at the moment. >> >> >>> >>> (and BTW yes I agree about sink triggers, but we need retractions and >>> probably some theoretical work before we can aim for that) >>> >>> Kenn >>> >>> >>>> On Wed, Feb 17, 2021 at 12:37 PM Kenneth Knowles <k...@apache.org> >>>> wrote: >>>> >>>>> Just for the thread I want to comment on another, more drastic >>>>> approach: eliminate continuation triggers from the model, leaving >>>>> downstream triggering up to a runner. This approach is not viable because >>>>> transforms may need to change their behavior based on whether or not a >>>>> trigger will fire more than once. Transforms can and do inspect the >>>>> windowing strategy to do things differently. >>>>> >>>>> Kenn >>>>> >>>>> On Wed, Feb 17, 2021 at 11:47 AM Reuven Lax <re...@google.com> wrote: >>>>> >>>>>> I'll say that synchronized processing time has confused users before. >>>>>> Users sometimes use processing-time triggers to optimize latency, banking >>>>>> that that will decouple stage latency from the long-tail latency of >>>>>> previous stages. However continuation triggers silently switching to >>>>>> synchronized processing time has defeated that, and it wasn't clear to >>>>>> users why. >>>>>> >>>>>> On Wed, Feb 17, 2021 at 11:12 AM Robert Bradshaw <rober...@google.com> >>>>>> wrote: >>>>>> >>>>>>> On Fri, Feb 12, 2021 at 9:09 AM Kenneth Knowles <k...@apache.org> >>>>>>> wrote: >>>>>>> >>>>>>>> >>>>>>>> On Thu, Feb 11, 2021 at 9:38 PM Robert Bradshaw < >>>>>>>> rober...@google.com> wrote: >>>>>>>> >>>>>>>>> Of course the right answer is to just implement sink triggers and >>>>>>>>> sidestep the question altogether :). >>>>>>>>> >>>>>>>>> In the meantime, I think leaving AfterSynchronizedProcessingTime >>>>>>>>> in the model makes the most sense, and runners can choose an >>>>>>>>> implementation >>>>>>>>> between firing eagerly and waiting some amount of time until they >>>>>>>>> think all >>>>>>>>> (most?) downstream results are in before firing, depending on how >>>>>>>>> smart the >>>>>>>>> runner wants to be. As you point out, they're all correct, and we'll >>>>>>>>> have >>>>>>>>> multiple firings due to the upstream trigger anyway, and this is >>>>>>>>> safer than >>>>>>>>> it used to be (though still possibly requires work). >>>>>>>>> >>>>>>>> >>>>>>>> Just to clarify, as I got a little confused, is your suggestion: >>>>>>>> Leave AfterSynchronizedProcessingTime* triggers in the model/proto, >>>>>>>> let the >>>>>>>> SDK put them in where they want, and let runners decide how to >>>>>>>> interpret >>>>>>>> them? (this SGTM and requires the least/no changes) >>>>>>>> >>>>>>> >>>>>>> Yep. We may want to update Python/Go to produce >>>>>>> AfterSynchronizedProcessingTime downstream of ProcessingTime triggers >>>>>>> too, >>>>>>> eventually, to better express intent. >>>>>>> >>>>>>> >>>>>>>> Kenn >>>>>>>> >>>>>>>> *noting that TimeDomain.SYNCHRONIZED_PROCESSING_TIME is not related >>>>>>>> to this, except in implementation, and should be removed either way. >>>>>>>> >>>>>>>> >>>>>>>>> On Wed, Feb 10, 2021 at 1:37 PM Kenneth Knowles <k...@apache.org> >>>>>>>>> wrote: >>>>>>>>> >>>>>>>>>> Hi all, >>>>>>>>>> >>>>>>>>>> TL;DR: >>>>>>>>>> 1. should we replace "after synchronized processing time" with >>>>>>>>>> "after count 1"? >>>>>>>>>> 2. should we remove "continuation trigger" and leave this to >>>>>>>>>> runners? >>>>>>>>>> >>>>>>>>>> ---- >>>>>>>>>> >>>>>>>>>> "AfterSynchronizedProcessingTime" triggers were invented to solve >>>>>>>>>> a specific problem. They are inconsistent across SDKs today. >>>>>>>>>> >>>>>>>>>> - You have an aggregation/GBK with aligned processing time >>>>>>>>>> trigger like ("output every minute on the minute") >>>>>>>>>> - You have a downstream aggregation/GBK between that and the sink >>>>>>>>>> - You expect to have about one output every minute per >>>>>>>>>> key+window pair >>>>>>>>>> >>>>>>>>>> Any output of the upstream aggregation may contribute to any >>>>>>>>>> key+window of the downstream aggregation. The >>>>>>>>>> AfterSynchronizedProcessingTime trigger waits for all the processing >>>>>>>>>> time >>>>>>>>>> based triggers to fire and commit their outputs. The downstream >>>>>>>>>> aggregation >>>>>>>>>> will output as fast as possible in panes consistent with the upstream >>>>>>>>>> aggregation. >>>>>>>>>> >>>>>>>>>> - The Java SDK behavior is as above, to output "as fast as >>>>>>>>>> reasonable". >>>>>>>>>> - The Python SDK never uses "AfterSynchronizedProcessingTime" >>>>>>>>>> triggers but simply propagates the same trigger to the next GBK, >>>>>>>>>> creating >>>>>>>>>> additional delay. >>>>>>>>>> - I don't know what the Go SDK may do, if it supports this at >>>>>>>>>> all. >>>>>>>>>> >>>>>>>>>> Any behavior could be defined as "correct". A simple option could >>>>>>>>>> be to have the downstream aggregation "fire always" aka "after >>>>>>>>>> element >>>>>>>>>> count 1". How would this change things? We would potentially see >>>>>>>>>> many more >>>>>>>>>> outputs. >>>>>>>>>> >>>>>>>>>> Why did we do this in the first place? There are (at least) these >>>>>>>>>> reasons: >>>>>>>>>> >>>>>>>>>> - Previously, triggers could "finish" an aggregation thus >>>>>>>>>> dropping all further data. In this case, waiting for all outputs is >>>>>>>>>> critical or else you lose data. Now triggers cannot finish >>>>>>>>>> aggregations. >>>>>>>>>> - Whenever there may be more than one pane, a user has to write >>>>>>>>>> logic to compensate and deal with it. Changing from guaranteed >>>>>>>>>> single pane >>>>>>>>>> to multi-pane would break things. So if the user configures a single >>>>>>>>>> firing, all downstream aggregations must respect it. Now that >>>>>>>>>> triggers >>>>>>>>>> cannot finish, I think processing time can only be used in multi-pane >>>>>>>>>> contexts anyhow. >>>>>>>>>> - The above example illustrates how the behavior in Java >>>>>>>>>> maintains something that the user will expect. Or so we think. Maybe >>>>>>>>>> users >>>>>>>>>> don't care. >>>>>>>>>> >>>>>>>>>> How did we get into this inconsistent state? When the user >>>>>>>>>> specifies triggering it applies to the very nearest aggregation/GBK. >>>>>>>>>> The >>>>>>>>>> SDK decides what triggering to insert downstream. One possibility is >>>>>>>>>> to >>>>>>>>>> remove this and have it unspecified, left to runner behavior. >>>>>>>>>> >>>>>>>>>> I think maybe these pieces of complexity are both not helpful and >>>>>>>>>> also not (necessarily) breaking changes to alter, especially >>>>>>>>>> considering we >>>>>>>>>> have inconsistency in the model. >>>>>>>>>> >>>>>>>>>> WDYT? And I wonder what this means for xlang and portability... >>>>>>>>>> how does continuation triggering even work? (if at all) >>>>>>>>>> >>>>>>>>>> Kenn >>>>>>>>>> >>>>>>>>>