First, +1 to the conclusion of this thread.

One note regarding the composite transforms and triggers *inside* those transforms - I think that propagating the triggering from input PCollection might be even dangerous and composite PTransforms that would be sensitive to the change of triggering will (should!) override the input triggering, and therefore adjusting it upfront will not work. There is clear option for composite PTransform (which includes one or more GBKs) to create API to specify the _input_ triggering of the composite as a whole, i.e.

 input.apply(MyComposite.create().triggering())

which (consistently with how triggering works for pure GBK) would change the input triggering (if we define trigger as "buffer input in state, flush buffer when trigger fires") of the PTransform. The PTransform knows how it expands and so it is quite easy to do the output triggering correctly.

Regarding the sink triggering - out of curiosity, how does that differ from applying the triggering on the very first GBK(s) and the subsequently trigger all downstream GBKs using AfterPane.elementCountAtLeast(1)? It seems to me, that from user perspective what I will want to define is not "how often output should be written", but "how quickly output should react to the change of input" - therefore I *must* trigger with at least this frequency from the source and then propagate each pane as quickly as possible to the output. Am I missing something?

 Jan


On 2/22/21 9:53 PM, Reuven Lax wrote:
I really wish that we had found the time to build sink triggers. Jan is right - specifying triggers up front and having them propagate down is confusing (it's also a bit confusing for Windows, but with Windows the propagation at least makes sense). The fact that users rarely have access to the actual GBK operation means that allowing them to specify triggers on their sinks is the best approach.

On Mon, Feb 22, 2021 at 12:48 PM Robert Bradshaw <rober...@google.com <mailto:rober...@google.com>> wrote:

    On Mon, Feb 22, 2021 at 11:51 AM Kenneth Knowles <k...@apache.org
    <mailto:k...@apache.org>> wrote:

        I agree completely: Triggers control the output of the GBK.

        The issue is composite transforms, where there will be a GBK
        deep inside some code and the user cannot adjust the triggering.

        What a user really wants is "sink triggers
        <https://s.apache.org/beam-sink-triggers>" [1], a purely
        hypothetical feature where they specify the latency
        requirements on each _output_ and everything else is figured
        out automatically. Unfortunately, sink triggers require
        retractions, so each PCollection can be a complete changelog.
        Otherwise transformations cannot be transparently correct
        throughout a pipeline and triggers cannot be decoupled from
        pipeline logic. Retractions themselves are not necessarily
        complex in some cases (Flink SQL has them - they are extra
        easy for "pure" code) but require a massive working of the
        library of transforms, particularly IOs. And backwards
        compatibility concerns for existing DoFns are somewhat tricky.
        We've had two prototypes [2] [3] and some important design
        investigations [4], but no time to really finish adding them,
        even as just an optional experiment. And once we have
        retractions, there is still a lot to figure out to finish sink
        triggers. They may not even really be possible!

        So for now, we do our best with the user setting up triggering
        at the beginning of the pipeline instead of the end of the
        pipeline. The very first GBK (which may be deep in library
        code) is controlled by the triggering they set up and all the
        rest get the "continuation trigger" which tries to just let
        the data flow. Unless they set up another bit of triggering.
        Some of our transforms do this for various reasons.

        I think the conclusion of this particular thread is:

         - make all the SDKs use AfterSynchronizedProcessingTime triggers
         - allow runners to do whatever they want when they see
        AfterSynchronizedProcessingTime trigger
         - remove TimeDomain.afterSynchronizedProcessingTime from the
        proto since it is only for timers and they should not use this
         - later, figure out if we want to add support for making
        downstream triggering optional (could be useful prep for sink
        triggers)


    +1

        [1] https://s.apache.org/beam-sink-triggers
        [2] https://github.com/apache/beam/pull/4742
        [3] https://github.com/apache/beam/pull/9199
        [4] https://s.apache.org/beam-retractions

        On Mon, Feb 22, 2021 at 1:28 AM Jan Lukavský <je...@seznam.cz
        <mailto:je...@seznam.cz>> wrote:

            The same holds true for pane accumulation mode.

             Jan

            On 2/22/21 10:21 AM, Jan Lukavský wrote:

            Hi,

            I'm not sure if I got everything from this thread right,
            but from my point of view, triggers are property of GBK.
            They are property of neither windowing, nor PCollection,
            but relate solely to GBK. This can be seen from the fact,
            that unlike windowFn, triggers are completely ignored in
            stateful ParDo (there is no semantics for them, which is
            fine). It would be cool if the model could be adjusted
            for that - this would actually mean, that the correct
            place, where to specify triggering is not Window
            PTransform, but the GBK, i.e.

             input.apply(GroupByKey.create().triggering(...))

            That would imply we simply have default trigger for all
            GBKs, unless explicitly changed, but for that particular
            instance only. I'm not sure what the impacts on pipeline
            compatibility would be, though.

             Jan

            On 2/19/21 12:09 AM, Robert Bradshaw wrote:
            On Wed, Feb 17, 2021 at 1:56 PM Kenneth Knowles
            <k...@apache.org <mailto:k...@apache.org>> wrote:


                On Wed, Feb 17, 2021 at 1:06 PM Robert Bradshaw
                <rober...@google.com <mailto:rober...@google.com>>
                wrote:

                    I would prefer to leave downstream triggering up
                    to the runner (or, better, leave upstream
                    triggering up to the runner, a la sink
                    triggers), but one problem is that without an
                    explicit AfterSynchronizedProcessingTime one
                    can't tell if the downstream ProcessingTime
                    between two groupings is due to an explicit
                    re-triggering between them or inherited from one
                    to the other.


                I mean to propose that there should be no triggering
                specified unless due to explicit re-triggering.


            You're saying that we leave the trigger (and perhaps
            other) fields of the WindowingStrategy attached to
            PCollections downstream the first GBK unset in the
            proto? And let runners walk over the graph to infer it?
            I could be OK with making this legal, though updating
            all SDKs and Runners to handle this doesn't seem
            high priority at the moment.


                (and BTW yes I agree about sink triggers, but we
                need retractions and probably some theoretical work
                before we can aim for that)

                Kenn

                    On Wed, Feb 17, 2021 at 12:37 PM Kenneth Knowles
                    <k...@apache.org <mailto:k...@apache.org>> wrote:

                        Just for the thread I want to comment on
                        another, more drastic approach: eliminate
                        continuation triggers from the model,
                        leaving downstream triggering up to a
                        runner. This approach is not viable because
                        transforms may need to change their behavior
                        based on whether or not a trigger will fire
                        more than once. Transforms can and do
                        inspect the windowing strategy to do things
                        differently.

                        Kenn

                        On Wed, Feb 17, 2021 at 11:47 AM Reuven Lax
                        <re...@google.com <mailto:re...@google.com>>
                        wrote:

                            I'll say that synchronized processing
                            time has confused users before. Users
                            sometimes use processing-time triggers
                            to optimize latency, banking that that
                            will decouple stage latency from the
                            long-tail latency of previous stages.
                            However continuation triggers silently
                            switching to synchronized processing
                            time has defeated that, and it wasn't
                            clear to users why.

                            On Wed, Feb 17, 2021 at 11:12 AM Robert
                            Bradshaw <rober...@google.com
                            <mailto:rober...@google.com>> wrote:

                                On Fri, Feb 12, 2021 at 9:09 AM
                                Kenneth Knowles <k...@apache.org
                                <mailto:k...@apache.org>> wrote:


                                    On Thu, Feb 11, 2021 at 9:38 PM
                                    Robert Bradshaw
                                    <rober...@google.com
                                    <mailto:rober...@google.com>> wrote:

                                        Of course the right answer
                                        is to just implement sink
                                        triggers and sidestep the
                                        question altogether :).

                                        In the meantime, I think
                                        leaving
                                        AfterSynchronizedProcessingTime
                                        in the model makes the most
                                        sense, and runners can
                                        choose an implementation
                                        between firing eagerly and
                                        waiting some amount of time
                                        until they think all (most?)
                                        downstream results are in
                                        before firing, depending on
                                        how smart the runner wants
                                        to be. As you point out,
                                        they're all correct, and
                                        we'll have multiple firings
                                        due to the upstream trigger
                                        anyway, and this is safer
                                        than it used to be (though
                                        still possibly requires work).


                                    Just to clarify, as I got a
                                    little confused, is your
                                    suggestion: Leave
                                    AfterSynchronizedProcessingTime*
                                    triggers in the model/proto, let
                                    the SDK put them in where they
                                    want, and let runners decide how
                                    to interpret them? (this SGTM
                                    and requires the least/no changes)


                                Yep. We may want to update Python/Go
                                to produce
                                AfterSynchronizedProcessingTime
                                downstream of ProcessingTime
                                triggers too, eventually, to better
                                express intent.

                                    Kenn

                                    *noting that
                                    TimeDomain.SYNCHRONIZED_PROCESSING_TIME
                                    is not related to this, except
                                    in implementation, and should be
                                    removed either way.

                                        On Wed, Feb 10, 2021 at 1:37
                                        PM Kenneth Knowles
                                        <k...@apache.org
                                        <mailto:k...@apache.org>> wrote:

                                            Hi all,

                                            TL;DR:
                                            1. should we replace
                                            "after synchronized
                                            processing time" with
                                            "after count 1"?
                                            2. should we remove
                                            "continuation trigger"
                                            and leave this to runners?

                                            ----

                                            "AfterSynchronizedProcessingTime"
                                            triggers were invented
                                            to solve a specific
                                            problem. They are
                                            inconsistent across SDKs
                                            today.

                                             - You have an
                                            aggregation/GBK with
                                            aligned processing time
                                            trigger like ("output
                                            every minute on the minute")
                                             - You have a downstream
                                            aggregation/GBK between
                                            that and the sink
                                             - You expect to have
                                            about one output every
                                            minute per key+window pair

                                            Any output of the
                                            upstream aggregation may
                                            contribute to any
                                            key+window of the
                                            downstream aggregation.
                                            The
                                            AfterSynchronizedProcessingTime
                                            trigger waits for all
                                            the processing time
                                            based triggers to fire
                                            and commit their
                                            outputs. The downstream
                                            aggregation will output
                                            as fast as possible in
                                            panes consistent with
                                            the upstream aggregation.

                                             - The Java SDK behavior
                                            is as above, to output
                                            "as fast as reasonable".
                                             - The Python SDK never
                                            uses
                                            "AfterSynchronizedProcessingTime"
                                            triggers but simply
                                            propagates the same
                                            trigger to the next GBK,
                                            creating additional delay.
                                             - I don't know what the
                                            Go SDK may do, if it
                                            supports this at all.

                                            Any behavior could be
                                            defined as "correct". A
                                            simple option could be
                                            to have the downstream
                                            aggregation "fire
                                            always" aka "after
                                            element count 1". How
                                            would this change
                                            things? We would
                                            potentially see many
                                            more outputs.

                                            Why did we do this in
                                            the first place? There
                                            are (at least) these
                                            reasons:

                                             - Previously, triggers
                                            could "finish" an
                                            aggregation thus
                                            dropping all further
                                            data. In this case,
                                            waiting for all outputs
                                            is critical or else you
                                            lose data. Now triggers
                                            cannot finish aggregations.
                                             - Whenever there may be
                                            more than one pane, a
                                            user has to write logic
                                            to compensate and deal
                                            with it. Changing from
                                            guaranteed single pane
                                            to multi-pane would
                                            break things. So if the
                                            user configures a single
                                            firing, all downstream
                                            aggregations must
                                            respect it. Now that
                                            triggers cannot finish,
                                            I think processing time
                                            can only be used in
                                            multi-pane contexts anyhow.
                                             - The above example
                                            illustrates how the
                                            behavior in Java
                                            maintains something that
                                            the user will expect. Or
                                            so we think. Maybe users
                                            don't care.

                                            How did we get into this
                                            inconsistent state? When
                                            the user specifies
                                            triggering it applies to
                                            the very nearest
                                            aggregation/GBK. The SDK
                                            decides what triggering
                                            to insert downstream.
                                            One possibility is to
                                            remove this and have it
                                            unspecified, left to
                                            runner behavior.

                                            I think maybe these
                                            pieces of complexity are
                                            both not helpful and
                                            also not (necessarily)
                                            breaking changes to
                                            alter, especially
                                            considering we have
                                            inconsistency in the model.

                                            WDYT? And I wonder what
                                            this means for xlang and
                                            portability... how does
                                            continuation triggering
                                            even work? (if at all)

                                            Kenn

Reply via email to