Hi Kenn,

> The idea with sink triggering is that the level of abstraction is raised. You have a DoFn (more generally IO transform) that writes to some external system, and you request updates every ten seconds. This specification is propagated to cause all the GBKs in the pipeline to emit data at a rate to enable updates to that IO every ten seconds.

I get this, this makes totally sense. But - what else could the propagation meaningfully do, then to propagate the 10 seconds triggering to the very first GBK(s) and then try to push the outcome of these PTransforms as fast as possible through the pipeline? Yes, seems it would require retractions, at least in cases when the DAG contains multiple paths from root(s) to leaf. It seems to me, that the intermediate GBK(s) play no role, because if they do not trigger as fast as possible (and retract wrongly triggered outputs due to out-of-orderness), what they do, is they add latency and actually make the "sink triggering" not trigger at the configured frequency. Everything else seems clear to me, I just don't get this part. Is is possible to describe a specific a example where an inner GBK would trigger with some different trigger than with each pane?

 Jan

On 2/25/21 12:44 AM, Kenneth Knowles wrote:


On Wed, Feb 24, 2021 at 12:44 AM Jan Lukavský <je...@seznam.cz <mailto:je...@seznam.cz>> wrote:

    Hi Robert,

    > Here "sink" is really any observable outside effect, so I
    think "how often output should be written" and "how quickly output
    should react to the change of input" are the same.

    The difference is in the input trigger - let's imagine, that I
    have two chained GBKs (A and B). If I trigger A every minute, but
    B every second, I will output 60 records per minute, but 59 of
    them will be the same. That's why it seems to me, that meaningful
    "sink" triggering has to start at the input and then propagate
    with each pane.

The idea with sink triggering is that the level of abstraction is raised. You have a DoFn (more generally IO transform) that writes to some external system, and you request updates every ten seconds. This specification is propagated to cause all the GBKs in the pipeline to emit data at a rate to enable updates to that IO every ten seconds.

Sinks will need separate configurations to handle multiple panes (updates/retractions) vs final values, and we can validate that a sink can support a particular triggering strategy. Sinks already need this, but we haven't solved the problem very formally or systematically. In many cases, these are just two different sinks - for example a CSV file with an extra column to indicate overwrite/retraction is really a different sink than just appending. They write to the same storage system, but the relationship of the input records to the output storage differs.

There's a lot of unsolved problems in terms of exactly how the triggering requirements of a sink can feed back to upstream aggregations to cause them to trigger at appropriate times. It could be static (inferring upstream triggering) but seems like it might have to be dynamic (running a state machine at the sink that broadcasts messages). I don't think this is straightforward, nor is it guaranteed to be doable without knobs or some fresh ideas.

Kenn

    > As an example, if I want, say, hourly output, triggering hourly
    at the source and then as quickly as possible from then on may be
    wasteful. It may also be desirable to arrange such that certain
    transforms only have a single pane per window, which is easier to
    propagate up than down. As another example, consider accumulating
    vs. discarding. If I have CombineValues(sum) followed by a
    re-keying and another CombineValues(sum), and I want the final
    output to be accumulating, the first must be discarding (or,
    better, retractions). Propagating upwards is possible in a way
    propagating downward is not.

    I'm not sure I understand this. If I want hourly output, I cannot
    trigger source with lower frequency. If I trigger source with
    hourly, but do not propagate this as fast as possible, I'm
    inevitably introducing additional latency (that's the definition
    of "not as fast as possible") in downstream processing. Therefore
    the final triggering cannot be "hourly output" at least not with
    regard to the rate of change in inputs.

    On 2/23/21 5:47 PM, Robert Bradshaw wrote:
    On Tue, Feb 23, 2021 at 1:07 AM Jan Lukavský <je...@seznam.cz
    <mailto:je...@seznam.cz>> wrote:

        First, +1 to the conclusion of this thread.

        One note regarding the composite transforms and triggers
        *inside* those transforms - I think that propagating the
        triggering from input PCollection might be even dangerous and
        composite PTransforms that would be sensitive to the change
        of triggering will (should!) override the input triggering,
        and therefore adjusting it upfront will not work. There is
        clear option for composite PTransform (which includes one or
        more GBKs) to create API to specify the _input_ triggering of
        the composite as a whole, i.e.

         input.apply(MyComposite.create().triggering())

        which (consistently with how triggering works for pure GBK)
        would change the input triggering (if we define trigger as
        "buffer input in state, flush buffer when trigger fires") of
        the PTransform. The PTransform knows how it expands and so it
        is quite easy to do the output triggering correctly.

    When we originally explored this (for windowing, before
    triggering existed) we looked at the number of composite
    operations (combining, joining, cogbk, ...) that contained GBKs
    and realized it would add a lot of boilerplate to manually pass
    through the windowing information to each. Worse, this is a
    burden placed on every author of a composite operation (and
    omitting this argument, or hard coding a default, would be
    strictly worse). Triggering doesn't flow as nicely, but requiring
    it on every subtransform invocation during pipeline construction
    would have the same downsides of verbosity.

        Regarding the sink triggering - out of curiosity, how does
        that differ from applying the triggering on the very first
        GBK(s) and the subsequently trigger all downstream GBKs using
        AfterPane.elementCountAtLeast(1)? It seems to me, that from
        user perspective what I will want to define is not "how often
        output should be written", but "how quickly output should
        react to the change of input" - therefore I *must* trigger
        with at least this frequency from the source and then
        propagate each pane as quickly as possible to the output. Am
        I missing something?

    Here "sink" is really any observable outside effect, so I
    think "how often output should be written" and "how quickly
    output should react to the change of input" are the same.

    As an example, if I want, say, hourly output, triggering hourly
    at the source and then as quickly as possible from then on may be
    wasteful. It may also be desirable to arrange such that certain
    transforms only have a single pane per window, which is easier to
    propagate up than down. As another example, consider accumulating
    vs. discarding. If I have CombineValues(sum) followed by a
    re-keying and another CombineValues(sum), and I want the final
    output to be accumulating, the first must be discarding (or,
    better, retractions). Propagating upwards is possible in a way
    propagating downward is not.


         Jan


        On 2/22/21 9:53 PM, Reuven Lax wrote:
        I really wish that we had found the time to build sink
        triggers. Jan is right - specifying triggers up front and
        having them propagate down is confusing (it's also a bit
        confusing for Windows, but with Windows the propagation at
        least makes sense). The fact that users rarely have access
        to the actual GBK operation means that allowing them to
        specify triggers on their sinks is the best approach.

        On Mon, Feb 22, 2021 at 12:48 PM Robert Bradshaw
        <rober...@google.com <mailto:rober...@google.com>> wrote:

            On Mon, Feb 22, 2021 at 11:51 AM Kenneth Knowles
            <k...@apache.org <mailto:k...@apache.org>> wrote:

                I agree completely: Triggers control the output of
                the GBK.

                The issue is composite transforms, where there will
                be a GBK deep inside some code and the user cannot
                adjust the triggering.

                What a user really wants is "sink triggers
                <https://s.apache.org/beam-sink-triggers>" [1], a
                purely hypothetical feature where they specify the
                latency requirements on each _output_ and everything
                else is figured out automatically. Unfortunately,
                sink triggers require retractions, so each
                PCollection can be a complete changelog. Otherwise
                transformations cannot be transparently correct
                throughout a pipeline and triggers cannot be
                decoupled from pipeline logic. Retractions
                themselves are not necessarily complex in some cases
                (Flink SQL has them - they are extra easy for "pure"
                code) but require a massive working of the library
                of transforms, particularly IOs. And backwards
                compatibility concerns for existing DoFns are
                somewhat tricky. We've had two prototypes [2] [3]
                and some important design investigations [4], but no
                time to really finish adding them, even as just an
                optional experiment. And once we have retractions,
                there is still a lot to figure out to finish sink
                triggers. They may not even really be possible!

                So for now, we do our best with the user setting up
                triggering at the beginning of the pipeline instead
                of the end of the pipeline. The very first GBK
                (which may be deep in library code) is controlled by
                the triggering they set up and all the rest get the
                "continuation trigger" which tries to just let the
                data flow. Unless they set up another bit of
                triggering. Some of our transforms do this for
                various reasons.

                I think the conclusion of this particular thread is:

                 - make all the SDKs use
                AfterSynchronizedProcessingTime triggers
                 - allow runners to do whatever they want when they
                see AfterSynchronizedProcessingTime trigger
                 - remove TimeDomain.afterSynchronizedProcessingTime
                from the proto since it is only for timers and they
                should not use this
                 - later, figure out if we want to add support for
                making downstream triggering optional (could be
                useful prep for sink triggers)


            +1

                [1] https://s.apache.org/beam-sink-triggers
                [2] https://github.com/apache/beam/pull/4742
                [3] https://github.com/apache/beam/pull/9199
                [4] https://s.apache.org/beam-retractions

                On Mon, Feb 22, 2021 at 1:28 AM Jan Lukavský
                <je...@seznam.cz <mailto:je...@seznam.cz>> wrote:

                    The same holds true for pane accumulation mode.

                     Jan

                    On 2/22/21 10:21 AM, Jan Lukavský wrote:

                    Hi,

                    I'm not sure if I got everything from this
                    thread right, but from my point of view,
                    triggers are property of GBK. They are property
                    of neither windowing, nor PCollection, but
                    relate solely to GBK. This can be seen from the
                    fact, that unlike windowFn, triggers are
                    completely ignored in stateful ParDo (there is
                    no semantics for them, which is fine). It would
                    be cool if the model could be adjusted for that
                    - this would actually mean, that the correct
                    place, where to specify triggering is not
                    Window PTransform, but the GBK, i.e.

                     input.apply(GroupByKey.create().triggering(...))

                    That would imply we simply have default trigger
                    for all GBKs, unless explicitly changed, but
                    for that particular instance only. I'm not sure
                    what the impacts on pipeline compatibility
                    would be, though.

                     Jan

                    On 2/19/21 12:09 AM, Robert Bradshaw wrote:
                    On Wed, Feb 17, 2021 at 1:56 PM Kenneth
                    Knowles <k...@apache.org
                    <mailto:k...@apache.org>> wrote:


                        On Wed, Feb 17, 2021 at 1:06 PM Robert
                        Bradshaw <rober...@google.com
                        <mailto:rober...@google.com>> wrote:

                            I would prefer to leave
                            downstream triggering up to the runner
                            (or, better, leave upstream triggering
                            up to the runner, a la sink triggers),
                            but one problem is that without an
                            explicit AfterSynchronizedProcessingTime
                            one can't tell if the downstream
                            ProcessingTime between two groupings
                            is due to an explicit re-triggering
                            between them or inherited from one to
                            the other.


                        I mean to propose that there should be no
                        triggering specified unless due to
                        explicit re-triggering.


                    You're saying that we leave the trigger (and
                    perhaps other) fields of the WindowingStrategy
                    attached to PCollections downstream the first
                    GBK unset in the proto? And let runners walk
                    over the graph to infer it? I could be OK
                    with making this legal, though updating all
                    SDKs and Runners to handle this doesn't seem
                    high priority at the moment.


                        (and BTW yes I agree about sink triggers,
                        but we need retractions and probably some
                        theoretical work before we can aim for that)

                        Kenn

                            On Wed, Feb 17, 2021 at 12:37 PM
                            Kenneth Knowles <k...@apache.org
                            <mailto:k...@apache.org>> wrote:

                                Just for the thread I want to
                                comment on another, more drastic
                                approach: eliminate continuation
                                triggers from the model, leaving
                                downstream triggering up to a
                                runner. This approach is not
                                viable because transforms may need
                                to change their behavior based on
                                whether or not a trigger will fire
                                more than once. Transforms can and
                                do inspect the windowing strategy
                                to do things differently.

                                Kenn

                                On Wed, Feb 17, 2021 at 11:47 AM
                                Reuven Lax <re...@google.com
                                <mailto:re...@google.com>> wrote:

                                    I'll say that synchronized
                                    processing time has confused
                                    users before. Users sometimes
                                    use processing-time triggers
                                    to optimize latency, banking
                                    that that will decouple
                                    stage latency from the
                                    long-tail latency of previous
                                    stages. However continuation
                                    triggers silently switching to
                                    synchronized processing time
                                    has defeated that, and it
                                    wasn't clear to users why.

                                    On Wed, Feb 17, 2021 at 11:12
                                    AM Robert Bradshaw
                                    <rober...@google.com
                                    <mailto:rober...@google.com>>
                                    wrote:

                                        On Fri, Feb 12, 2021 at
                                        9:09 AM Kenneth Knowles
                                        <k...@apache.org
                                        <mailto:k...@apache.org>>
                                        wrote:


                                            On Thu, Feb 11, 2021
                                            at 9:38 PM Robert
                                            Bradshaw
                                            <rober...@google.com
                                            <mailto:rober...@google.com>>
                                            wrote:

                                                Of course the
                                                right answer is to
                                                just implement
                                                sink triggers and
                                                sidestep the
                                                question
                                                altogether :).

                                                In the meantime, I
                                                think leaving
                                                AfterSynchronizedProcessingTime
                                                in the model makes
                                                the most sense,
                                                and runners can
                                                choose an
                                                implementation
                                                between firing
                                                eagerly and
                                                waiting some
                                                amount of time
                                                until they think
                                                all (most?)
                                                downstream results
                                                are in before
                                                firing, depending
                                                on how smart the
                                                runner wants to
                                                be. As you point
                                                out, they're all
                                                correct, and we'll
                                                have multiple
                                                firings due to the
                                                upstream trigger
                                                anyway, and this
                                                is safer than it
                                                used to be (though
                                                still possibly
                                                requires work).


                                            Just to clarify, as I
                                            got a little confused,
                                            is your suggestion:
                                            Leave
                                            AfterSynchronizedProcessingTime*
                                            triggers in the
                                            model/proto, let the
                                            SDK put them in where
                                            they want, and
                                            let runners decide how
                                            to interpret them?
                                            (this SGTM and
                                            requires the least/no
                                            changes)


                                        Yep. We may want to update
                                        Python/Go to produce
                                        AfterSynchronizedProcessingTime
                                        downstream of
                                        ProcessingTime triggers
                                        too, eventually, to better
                                        express intent.

                                            Kenn

                                            *noting that
                                            
TimeDomain.SYNCHRONIZED_PROCESSING_TIME
                                            is not related to
                                            this, except in
                                            implementation, and
                                            should be removed
                                            either way.

                                                On Wed, Feb 10,
                                                2021 at 1:37 PM
                                                Kenneth Knowles
                                                <k...@apache.org
                                                <mailto:k...@apache.org>>
                                                wrote:

                                                    Hi all,

                                                    TL;DR:
                                                    1. should we
                                                    replace "after
                                                    synchronized
                                                    processing
                                                    time" with
                                                    "after count 1"?
                                                    2. should we
                                                    remove
                                                    "continuation
                                                    trigger" and
                                                    leave this to
                                                    runners?

                                                    ----

                                                    
"AfterSynchronizedProcessingTime"
                                                    triggers were
                                                    invented to
                                                    solve a
                                                    specific
                                                    problem. They
                                                    are
                                                    inconsistent
                                                    across SDKs today.

                                                     - You have an
                                                    aggregation/GBK
                                                    with aligned
                                                    processing
                                                    time trigger
                                                    like ("output
                                                    every minute
                                                    on the minute")
                                                     - You have a
                                                    downstream
                                                    aggregation/GBK
                                                    between that
                                                    and the sink
                                                     - You expect
                                                    to have about
                                                    one output
                                                    every minute
                                                    per key+window
                                                    pair

                                                    Any output of
                                                    the upstream
                                                    aggregation
                                                    may contribute
                                                    to any
                                                    key+window of
                                                    the downstream
                                                    aggregation.
                                                    The
                                                    
AfterSynchronizedProcessingTime
                                                    trigger waits
                                                    for all the
                                                    processing
                                                    time based
                                                    triggers to
                                                    fire and
                                                    commit their
                                                    outputs. The
                                                    downstream
                                                    aggregation
                                                    will output as
                                                    fast as
                                                    possible in
                                                    panes
                                                    consistent
                                                    with the
                                                    upstream
                                                    aggregation.

                                                     - The Java
                                                    SDK behavior
                                                    is as above,
                                                    to output "as
                                                    fast as
                                                    reasonable".
                                                     - The Python
                                                    SDK never uses
                                                    
"AfterSynchronizedProcessingTime"
                                                    triggers but
                                                    simply
                                                    propagates the
                                                    same trigger
                                                    to the next
                                                    GBK, creating
                                                    additional delay.
                                                     - I don't
                                                    know what the
                                                    Go SDK may do,
                                                    if it supports
                                                    this at all.

                                                    Any behavior
                                                    could be
                                                    defined as
                                                    "correct". A
                                                    simple option
                                                    could be to
                                                    have the
                                                    downstream
                                                    aggregation
                                                    "fire always"
                                                    aka "after
                                                    element count
                                                    1". How would
                                                    this change
                                                    things? We
                                                    would
                                                    potentially
                                                    see many more
                                                    outputs.

                                                    Why did we do
                                                    this in the
                                                    first place?
                                                    There are (at
                                                    least) these
                                                    reasons:

                                                     - Previously,
                                                    triggers could
                                                    "finish" an
                                                    aggregation
                                                    thus dropping
                                                    all further
                                                    data. In this
                                                    case, waiting
                                                    for all
                                                    outputs is
                                                    critical or
                                                    else you lose
                                                    data. Now
                                                    triggers
                                                    cannot finish
                                                    aggregations.
                                                     - Whenever
                                                    there may be
                                                    more than one
                                                    pane, a user
                                                    has to write
                                                    logic to
                                                    compensate and
                                                    deal with it.
                                                    Changing from
                                                    guaranteed
                                                    single pane to
                                                    multi-pane
                                                    would break
                                                    things. So if
                                                    the user
                                                    configures a
                                                    single firing,
                                                    all downstream
                                                    aggregations
                                                    must respect
                                                    it. Now that
                                                    triggers
                                                    cannot finish,
                                                    I think
                                                    processing
                                                    time can only
                                                    be used in
                                                    multi-pane
                                                    contexts anyhow.
                                                     - The above
                                                    example
                                                    illustrates
                                                    how the
                                                    behavior in
                                                    Java maintains
                                                    something that
                                                    the user will
                                                    expect. Or so
                                                    we think.
                                                    Maybe users
                                                    don't care.

                                                    How did we get
                                                    into this
                                                    inconsistent
                                                    state? When
                                                    the user
                                                    specifies
                                                    triggering it
                                                    applies to the
                                                    very nearest
                                                    aggregation/GBK.
                                                    The SDK
                                                    decides what
                                                    triggering to
                                                    insert
                                                    downstream.
                                                    One
                                                    possibility is
                                                    to remove this
                                                    and have it
                                                    unspecified,
                                                    left to runner
                                                    behavior.

                                                    I think maybe
                                                    these pieces
                                                    of complexity
                                                    are both not
                                                    helpful and
                                                    also not
                                                    (necessarily)
                                                    breaking
                                                    changes to
                                                    alter,
                                                    especially
                                                    considering we
                                                    have
                                                    inconsistency
                                                    in the model.

                                                    WDYT? And I
                                                    wonder what
                                                    this means for
                                                    xlang and
                                                    portability...
                                                    how does
                                                    continuation
                                                    triggering
                                                    even work? (if
                                                    at all)

                                                    Kenn

Reply via email to