Just for the thread I want to comment on another, more drastic approach:
eliminate continuation triggers from the model, leaving downstream
triggering up to a runner. This approach is not viable because transforms
may need to change their behavior based on whether or not a trigger will
fire more than once. Transforms can and do inspect the windowing strategy
to do things differently.

Kenn

On Wed, Feb 17, 2021 at 11:47 AM Reuven Lax <re...@google.com> wrote:

> I'll say that synchronized processing time has confused users before.
> Users sometimes use processing-time triggers to optimize latency, banking
> that that will decouple stage latency from the long-tail latency of
> previous stages. However continuation triggers silently switching to
> synchronized processing time has defeated that, and it wasn't clear to
> users why.
>
> On Wed, Feb 17, 2021 at 11:12 AM Robert Bradshaw <rober...@google.com>
> wrote:
>
>> On Fri, Feb 12, 2021 at 9:09 AM Kenneth Knowles <k...@apache.org> wrote:
>>
>>>
>>> On Thu, Feb 11, 2021 at 9:38 PM Robert Bradshaw <rober...@google.com>
>>> wrote:
>>>
>>>> Of course the right answer is to just implement sink triggers and
>>>> sidestep the question altogether :).
>>>>
>>>> In the meantime, I think leaving AfterSynchronizedProcessingTime in the
>>>> model makes the most sense, and runners can choose an implementation
>>>> between firing eagerly and waiting some amount of time until they think all
>>>> (most?) downstream results are in before firing, depending on how smart the
>>>> runner wants to be. As you point out, they're all correct, and we'll have
>>>> multiple firings due to the upstream trigger anyway, and this is safer than
>>>> it used to be (though still possibly requires work).
>>>>
>>>
>>> Just to clarify, as I got a little confused, is your suggestion: Leave
>>> AfterSynchronizedProcessingTime* triggers in the model/proto, let the SDK
>>> put them in where they want, and let runners decide how to interpret them?
>>> (this SGTM and requires the least/no changes)
>>>
>>
>> Yep. We may want to update Python/Go to produce
>> AfterSynchronizedProcessingTime downstream of ProcessingTime triggers too,
>> eventually, to better express intent.
>>
>>
>>> Kenn
>>>
>>> *noting that TimeDomain.SYNCHRONIZED_PROCESSING_TIME is not related to
>>> this, except in implementation, and should be removed either way.
>>>
>>>
>>>> On Wed, Feb 10, 2021 at 1:37 PM Kenneth Knowles <k...@apache.org>
>>>> wrote:
>>>>
>>>>> Hi all,
>>>>>
>>>>> TL;DR:
>>>>> 1. should we replace "after synchronized processing time" with "after
>>>>> count 1"?
>>>>> 2. should we remove "continuation trigger" and leave this to runners?
>>>>>
>>>>> ----
>>>>>
>>>>> "AfterSynchronizedProcessingTime" triggers were invented to solve a
>>>>> specific problem. They are inconsistent across SDKs today.
>>>>>
>>>>>  - You have an aggregation/GBK with aligned processing time trigger
>>>>> like ("output every minute on the minute")
>>>>>  - You have a downstream aggregation/GBK between that and the sink
>>>>>  - You expect to have about one output every minute per key+window pair
>>>>>
>>>>> Any output of the upstream aggregation may contribute to any
>>>>> key+window of the downstream aggregation. The
>>>>> AfterSynchronizedProcessingTime trigger waits for all the processing time
>>>>> based triggers to fire and commit their outputs. The downstream 
>>>>> aggregation
>>>>> will output as fast as possible in panes consistent with the upstream
>>>>> aggregation.
>>>>>
>>>>>  - The Java SDK behavior is as above, to output "as fast as
>>>>> reasonable".
>>>>>  - The Python SDK never uses "AfterSynchronizedProcessingTime"
>>>>> triggers but simply propagates the same trigger to the next GBK, creating
>>>>> additional delay.
>>>>>  - I don't know what the Go SDK may do, if it supports this at all.
>>>>>
>>>>> Any behavior could be defined as "correct". A simple option could be
>>>>> to have the downstream aggregation "fire always" aka "after element count
>>>>> 1". How would this change things? We would potentially see many more
>>>>> outputs.
>>>>>
>>>>> Why did we do this in the first place? There are (at least) these
>>>>> reasons:
>>>>>
>>>>>  - Previously, triggers could "finish" an aggregation thus dropping
>>>>> all further data. In this case, waiting for all outputs is critical or 
>>>>> else
>>>>> you lose data. Now triggers cannot finish aggregations.
>>>>>  - Whenever there may be more than one pane, a user has to write logic
>>>>> to compensate and deal with it. Changing from guaranteed single pane to
>>>>> multi-pane would break things. So if the user configures a single firing,
>>>>> all downstream aggregations must respect it. Now that triggers cannot
>>>>> finish, I think processing time can only be used in multi-pane contexts
>>>>> anyhow.
>>>>>  - The above example illustrates how the behavior in Java maintains
>>>>> something that the user will expect. Or so we think. Maybe users don't 
>>>>> care.
>>>>>
>>>>> How did we get into this inconsistent state? When the user specifies
>>>>> triggering it applies to the very nearest aggregation/GBK. The SDK decides
>>>>> what triggering to insert downstream. One possibility is to remove this 
>>>>> and
>>>>> have it unspecified, left to runner behavior.
>>>>>
>>>>> I think maybe these pieces of complexity are both not helpful and also
>>>>> not (necessarily) breaking changes to alter, especially considering we 
>>>>> have
>>>>> inconsistency in the model.
>>>>>
>>>>> WDYT? And I wonder what this means for xlang and portability... how
>>>>> does continuation triggering even work? (if at all)
>>>>>
>>>>> Kenn
>>>>>
>>>>

Reply via email to