On Mon, Feb 22, 2021 at 11:51 AM Kenneth Knowles <k...@apache.org> wrote:

> I agree completely: Triggers control the output of the GBK.
>
> The issue is composite transforms, where there will be a GBK deep inside
> some code and the user cannot adjust the triggering.
>
> What a user really wants is "sink triggers
> <https://s.apache.org/beam-sink-triggers>" [1], a purely hypothetical
> feature where they specify the latency requirements on each _output_ and
> everything else is figured out automatically. Unfortunately, sink triggers
> require retractions, so each PCollection can be a complete changelog.
> Otherwise transformations cannot be transparently correct throughout a
> pipeline and triggers cannot be decoupled from pipeline logic. Retractions
> themselves are not necessarily complex in some cases (Flink SQL has them -
> they are extra easy for "pure" code) but require a massive working of the
> library of transforms, particularly IOs. And backwards compatibility
> concerns for existing DoFns are somewhat tricky. We've had two prototypes
> [2] [3] and some important design investigations [4], but no time to really
> finish adding them, even as just an optional experiment. And once we have
> retractions, there is still a lot to figure out to finish sink triggers.
> They may not even really be possible!
>
> So for now, we do our best with the user setting up triggering at the
> beginning of the pipeline instead of the end of the pipeline. The very
> first GBK (which may be deep in library code) is controlled by the
> triggering they set up and all the rest get the "continuation trigger"
> which tries to just let the data flow. Unless they set up another bit of
> triggering. Some of our transforms do this for various reasons.
>
> I think the conclusion of this particular thread is:
>
>  - make all the SDKs use AfterSynchronizedProcessingTime triggers
>  - allow runners to do whatever they want when they see
> AfterSynchronizedProcessingTime trigger
>  - remove TimeDomain.afterSynchronizedProcessingTime from the proto since
> it is only for timers and they should not use this
>  - later, figure out if we want to add support for making downstream
> triggering optional (could be useful prep for sink triggers)
>

+1


> [1] https://s.apache.org/beam-sink-triggers
> [2] https://github.com/apache/beam/pull/4742
> [3] https://github.com/apache/beam/pull/9199
> [4] https://s.apache.org/beam-retractions
>
> On Mon, Feb 22, 2021 at 1:28 AM Jan Lukavský <je...@seznam.cz> wrote:
>
>> The same holds true for pane accumulation mode.
>>
>>  Jan
>> On 2/22/21 10:21 AM, Jan Lukavský wrote:
>>
>> Hi,
>>
>> I'm not sure if I got everything from this thread right, but from my
>> point of view, triggers are property of GBK. They are property of neither
>> windowing, nor PCollection, but relate solely to GBK. This can be seen from
>> the fact, that unlike windowFn, triggers are completely ignored in stateful
>> ParDo (there is no semantics for them, which is fine). It would be cool if
>> the model could be adjusted for that - this would actually mean, that the
>> correct place, where to specify triggering is not Window PTransform, but
>> the GBK, i.e.
>>
>>  input.apply(GroupByKey.create().triggering(...))
>>
>> That would imply we simply have default trigger for all GBKs, unless
>> explicitly changed, but for that particular instance only. I'm not sure
>> what the impacts on pipeline compatibility would be, though.
>>
>>  Jan
>> On 2/19/21 12:09 AM, Robert Bradshaw wrote:
>>
>> On Wed, Feb 17, 2021 at 1:56 PM Kenneth Knowles <k...@apache.org> wrote:
>>
>>>
>>> On Wed, Feb 17, 2021 at 1:06 PM Robert Bradshaw <rober...@google.com>
>>> wrote:
>>>
>>>> I would prefer to leave downstream triggering up to the runner (or,
>>>> better, leave upstream triggering up to the runner, a la sink triggers),
>>>> but one problem is that without an explicit AfterSynchronizedProcessingTime
>>>> one can't tell if the downstream ProcessingTime between two groupings is
>>>> due to an explicit re-triggering between them or inherited from one to the
>>>> other.
>>>>
>>>
>>> I mean to propose that there should be no triggering specified unless
>>> due to explicit re-triggering.
>>>
>>
>> You're saying that we leave the trigger (and perhaps other) fields of the
>> WindowingStrategy attached to PCollections downstream the first GBK unset
>> in the proto? And let runners walk over the graph to infer it? I could be
>> OK with making this legal, though updating all SDKs and Runners to handle
>> this doesn't seem high priority at the moment.
>>
>>
>>>
>>> (and BTW yes I agree about sink triggers, but we need retractions and
>>> probably some theoretical work before we can aim for that)
>>>
>>> Kenn
>>>
>>>
>>>> On Wed, Feb 17, 2021 at 12:37 PM Kenneth Knowles <k...@apache.org>
>>>> wrote:
>>>>
>>>>> Just for the thread I want to comment on another, more drastic
>>>>> approach: eliminate continuation triggers from the model, leaving
>>>>> downstream triggering up to a runner. This approach is not viable because
>>>>> transforms may need to change their behavior based on whether or not a
>>>>> trigger will fire more than once. Transforms can and do inspect the
>>>>> windowing strategy to do things differently.
>>>>>
>>>>> Kenn
>>>>>
>>>>> On Wed, Feb 17, 2021 at 11:47 AM Reuven Lax <re...@google.com> wrote:
>>>>>
>>>>>> I'll say that synchronized processing time has confused users before.
>>>>>> Users sometimes use processing-time triggers to optimize latency, banking
>>>>>> that that will decouple stage latency from the long-tail latency of
>>>>>> previous stages. However continuation triggers silently switching to
>>>>>> synchronized processing time has defeated that, and it wasn't clear to
>>>>>> users why.
>>>>>>
>>>>>> On Wed, Feb 17, 2021 at 11:12 AM Robert Bradshaw <rober...@google.com>
>>>>>> wrote:
>>>>>>
>>>>>>> On Fri, Feb 12, 2021 at 9:09 AM Kenneth Knowles <k...@apache.org>
>>>>>>> wrote:
>>>>>>>
>>>>>>>>
>>>>>>>> On Thu, Feb 11, 2021 at 9:38 PM Robert Bradshaw <
>>>>>>>> rober...@google.com> wrote:
>>>>>>>>
>>>>>>>>> Of course the right answer is to just implement sink triggers and
>>>>>>>>> sidestep the question altogether :).
>>>>>>>>>
>>>>>>>>> In the meantime, I think leaving AfterSynchronizedProcessingTime
>>>>>>>>> in the model makes the most sense, and runners can choose an 
>>>>>>>>> implementation
>>>>>>>>> between firing eagerly and waiting some amount of time until they 
>>>>>>>>> think all
>>>>>>>>> (most?) downstream results are in before firing, depending on how 
>>>>>>>>> smart the
>>>>>>>>> runner wants to be. As you point out, they're all correct, and we'll 
>>>>>>>>> have
>>>>>>>>> multiple firings due to the upstream trigger anyway, and this is 
>>>>>>>>> safer than
>>>>>>>>> it used to be (though still possibly requires work).
>>>>>>>>>
>>>>>>>>
>>>>>>>> Just to clarify, as I got a little confused, is your suggestion:
>>>>>>>> Leave AfterSynchronizedProcessingTime* triggers in the model/proto, 
>>>>>>>> let the
>>>>>>>> SDK put them in where they want, and let runners decide how to 
>>>>>>>> interpret
>>>>>>>> them? (this SGTM and requires the least/no changes)
>>>>>>>>
>>>>>>>
>>>>>>> Yep. We may want to update Python/Go to produce
>>>>>>> AfterSynchronizedProcessingTime downstream of ProcessingTime triggers 
>>>>>>> too,
>>>>>>> eventually, to better express intent.
>>>>>>>
>>>>>>>
>>>>>>>> Kenn
>>>>>>>>
>>>>>>>> *noting that TimeDomain.SYNCHRONIZED_PROCESSING_TIME is not related
>>>>>>>> to this, except in implementation, and should be removed either way.
>>>>>>>>
>>>>>>>>
>>>>>>>>> On Wed, Feb 10, 2021 at 1:37 PM Kenneth Knowles <k...@apache.org>
>>>>>>>>> wrote:
>>>>>>>>>
>>>>>>>>>> Hi all,
>>>>>>>>>>
>>>>>>>>>> TL;DR:
>>>>>>>>>> 1. should we replace "after synchronized processing time" with
>>>>>>>>>> "after count 1"?
>>>>>>>>>> 2. should we remove "continuation trigger" and leave this to
>>>>>>>>>> runners?
>>>>>>>>>>
>>>>>>>>>> ----
>>>>>>>>>>
>>>>>>>>>> "AfterSynchronizedProcessingTime" triggers were invented to solve
>>>>>>>>>> a specific problem. They are inconsistent across SDKs today.
>>>>>>>>>>
>>>>>>>>>>  - You have an aggregation/GBK with aligned processing time
>>>>>>>>>> trigger like ("output every minute on the minute")
>>>>>>>>>>  - You have a downstream aggregation/GBK between that and the sink
>>>>>>>>>>  - You expect to have about one output every minute per
>>>>>>>>>> key+window pair
>>>>>>>>>>
>>>>>>>>>> Any output of the upstream aggregation may contribute to any
>>>>>>>>>> key+window of the downstream aggregation. The
>>>>>>>>>> AfterSynchronizedProcessingTime trigger waits for all the processing 
>>>>>>>>>> time
>>>>>>>>>> based triggers to fire and commit their outputs. The downstream 
>>>>>>>>>> aggregation
>>>>>>>>>> will output as fast as possible in panes consistent with the upstream
>>>>>>>>>> aggregation.
>>>>>>>>>>
>>>>>>>>>>  - The Java SDK behavior is as above, to output "as fast as
>>>>>>>>>> reasonable".
>>>>>>>>>>  - The Python SDK never uses "AfterSynchronizedProcessingTime"
>>>>>>>>>> triggers but simply propagates the same trigger to the next GBK, 
>>>>>>>>>> creating
>>>>>>>>>> additional delay.
>>>>>>>>>>  - I don't know what the Go SDK may do, if it supports this at
>>>>>>>>>> all.
>>>>>>>>>>
>>>>>>>>>> Any behavior could be defined as "correct". A simple option could
>>>>>>>>>> be to have the downstream aggregation "fire always" aka "after 
>>>>>>>>>> element
>>>>>>>>>> count 1". How would this change things? We would potentially see 
>>>>>>>>>> many more
>>>>>>>>>> outputs.
>>>>>>>>>>
>>>>>>>>>> Why did we do this in the first place? There are (at least) these
>>>>>>>>>> reasons:
>>>>>>>>>>
>>>>>>>>>>  - Previously, triggers could "finish" an aggregation thus
>>>>>>>>>> dropping all further data. In this case, waiting for all outputs is
>>>>>>>>>> critical or else you lose data. Now triggers cannot finish 
>>>>>>>>>> aggregations.
>>>>>>>>>>  - Whenever there may be more than one pane, a user has to write
>>>>>>>>>> logic to compensate and deal with it. Changing from guaranteed 
>>>>>>>>>> single pane
>>>>>>>>>> to multi-pane would break things. So if the user configures a single
>>>>>>>>>> firing, all downstream aggregations must respect it. Now that 
>>>>>>>>>> triggers
>>>>>>>>>> cannot finish, I think processing time can only be used in multi-pane
>>>>>>>>>> contexts anyhow.
>>>>>>>>>>  - The above example illustrates how the behavior in Java
>>>>>>>>>> maintains something that the user will expect. Or so we think. Maybe 
>>>>>>>>>> users
>>>>>>>>>> don't care.
>>>>>>>>>>
>>>>>>>>>> How did we get into this inconsistent state? When the user
>>>>>>>>>> specifies triggering it applies to the very nearest aggregation/GBK. 
>>>>>>>>>> The
>>>>>>>>>> SDK decides what triggering to insert downstream. One possibility is 
>>>>>>>>>> to
>>>>>>>>>> remove this and have it unspecified, left to runner behavior.
>>>>>>>>>>
>>>>>>>>>> I think maybe these pieces of complexity are both not helpful and
>>>>>>>>>> also not (necessarily) breaking changes to alter, especially 
>>>>>>>>>> considering we
>>>>>>>>>> have inconsistency in the model.
>>>>>>>>>>
>>>>>>>>>> WDYT? And I wonder what this means for xlang and portability...
>>>>>>>>>> how does continuation triggering even work? (if at all)
>>>>>>>>>>
>>>>>>>>>> Kenn
>>>>>>>>>>
>>>>>>>>>

Reply via email to