On Wed, Feb 17, 2021 at 1:06 PM Robert Bradshaw <rober...@google.com> wrote:

> I would prefer to leave downstream triggering up to the runner (or,
> better, leave upstream triggering up to the runner, a la sink triggers),
> but one problem is that without an explicit AfterSynchronizedProcessingTime
> one can't tell if the downstream ProcessingTime between two groupings is
> due to an explicit re-triggering between them or inherited from one to the
> other.
>

I mean to propose that there should be no triggering specified unless due
to explicit re-triggering.

(and BTW yes I agree about sink triggers, but we need retractions and
probably some theoretical work before we can aim for that)

Kenn


> On Wed, Feb 17, 2021 at 12:37 PM Kenneth Knowles <k...@apache.org> wrote:
>
>> Just for the thread I want to comment on another, more drastic approach:
>> eliminate continuation triggers from the model, leaving downstream
>> triggering up to a runner. This approach is not viable because transforms
>> may need to change their behavior based on whether or not a trigger will
>> fire more than once. Transforms can and do inspect the windowing strategy
>> to do things differently.
>>
>> Kenn
>>
>> On Wed, Feb 17, 2021 at 11:47 AM Reuven Lax <re...@google.com> wrote:
>>
>>> I'll say that synchronized processing time has confused users before.
>>> Users sometimes use processing-time triggers to optimize latency, banking
>>> that that will decouple stage latency from the long-tail latency of
>>> previous stages. However continuation triggers silently switching to
>>> synchronized processing time has defeated that, and it wasn't clear to
>>> users why.
>>>
>>> On Wed, Feb 17, 2021 at 11:12 AM Robert Bradshaw <rober...@google.com>
>>> wrote:
>>>
>>>> On Fri, Feb 12, 2021 at 9:09 AM Kenneth Knowles <k...@apache.org>
>>>> wrote:
>>>>
>>>>>
>>>>> On Thu, Feb 11, 2021 at 9:38 PM Robert Bradshaw <rober...@google.com>
>>>>> wrote:
>>>>>
>>>>>> Of course the right answer is to just implement sink triggers and
>>>>>> sidestep the question altogether :).
>>>>>>
>>>>>> In the meantime, I think leaving AfterSynchronizedProcessingTime in
>>>>>> the model makes the most sense, and runners can choose an implementation
>>>>>> between firing eagerly and waiting some amount of time until they think 
>>>>>> all
>>>>>> (most?) downstream results are in before firing, depending on how smart 
>>>>>> the
>>>>>> runner wants to be. As you point out, they're all correct, and we'll have
>>>>>> multiple firings due to the upstream trigger anyway, and this is safer 
>>>>>> than
>>>>>> it used to be (though still possibly requires work).
>>>>>>
>>>>>
>>>>> Just to clarify, as I got a little confused, is your suggestion: Leave
>>>>> AfterSynchronizedProcessingTime* triggers in the model/proto, let the SDK
>>>>> put them in where they want, and let runners decide how to interpret them?
>>>>> (this SGTM and requires the least/no changes)
>>>>>
>>>>
>>>> Yep. We may want to update Python/Go to produce
>>>> AfterSynchronizedProcessingTime downstream of ProcessingTime triggers too,
>>>> eventually, to better express intent.
>>>>
>>>>
>>>>> Kenn
>>>>>
>>>>> *noting that TimeDomain.SYNCHRONIZED_PROCESSING_TIME is not related to
>>>>> this, except in implementation, and should be removed either way.
>>>>>
>>>>>
>>>>>> On Wed, Feb 10, 2021 at 1:37 PM Kenneth Knowles <k...@apache.org>
>>>>>> wrote:
>>>>>>
>>>>>>> Hi all,
>>>>>>>
>>>>>>> TL;DR:
>>>>>>> 1. should we replace "after synchronized processing time" with
>>>>>>> "after count 1"?
>>>>>>> 2. should we remove "continuation trigger" and leave this to runners?
>>>>>>>
>>>>>>> ----
>>>>>>>
>>>>>>> "AfterSynchronizedProcessingTime" triggers were invented to solve a
>>>>>>> specific problem. They are inconsistent across SDKs today.
>>>>>>>
>>>>>>>  - You have an aggregation/GBK with aligned processing time trigger
>>>>>>> like ("output every minute on the minute")
>>>>>>>  - You have a downstream aggregation/GBK between that and the sink
>>>>>>>  - You expect to have about one output every minute per key+window
>>>>>>> pair
>>>>>>>
>>>>>>> Any output of the upstream aggregation may contribute to any
>>>>>>> key+window of the downstream aggregation. The
>>>>>>> AfterSynchronizedProcessingTime trigger waits for all the processing 
>>>>>>> time
>>>>>>> based triggers to fire and commit their outputs. The downstream 
>>>>>>> aggregation
>>>>>>> will output as fast as possible in panes consistent with the upstream
>>>>>>> aggregation.
>>>>>>>
>>>>>>>  - The Java SDK behavior is as above, to output "as fast as
>>>>>>> reasonable".
>>>>>>>  - The Python SDK never uses "AfterSynchronizedProcessingTime"
>>>>>>> triggers but simply propagates the same trigger to the next GBK, 
>>>>>>> creating
>>>>>>> additional delay.
>>>>>>>  - I don't know what the Go SDK may do, if it supports this at all.
>>>>>>>
>>>>>>> Any behavior could be defined as "correct". A simple option could be
>>>>>>> to have the downstream aggregation "fire always" aka "after element 
>>>>>>> count
>>>>>>> 1". How would this change things? We would potentially see many more
>>>>>>> outputs.
>>>>>>>
>>>>>>> Why did we do this in the first place? There are (at least) these
>>>>>>> reasons:
>>>>>>>
>>>>>>>  - Previously, triggers could "finish" an aggregation thus dropping
>>>>>>> all further data. In this case, waiting for all outputs is critical or 
>>>>>>> else
>>>>>>> you lose data. Now triggers cannot finish aggregations.
>>>>>>>  - Whenever there may be more than one pane, a user has to write
>>>>>>> logic to compensate and deal with it. Changing from guaranteed single 
>>>>>>> pane
>>>>>>> to multi-pane would break things. So if the user configures a single
>>>>>>> firing, all downstream aggregations must respect it. Now that triggers
>>>>>>> cannot finish, I think processing time can only be used in multi-pane
>>>>>>> contexts anyhow.
>>>>>>>  - The above example illustrates how the behavior in Java maintains
>>>>>>> something that the user will expect. Or so we think. Maybe users don't 
>>>>>>> care.
>>>>>>>
>>>>>>> How did we get into this inconsistent state? When the user specifies
>>>>>>> triggering it applies to the very nearest aggregation/GBK. The SDK 
>>>>>>> decides
>>>>>>> what triggering to insert downstream. One possibility is to remove this 
>>>>>>> and
>>>>>>> have it unspecified, left to runner behavior.
>>>>>>>
>>>>>>> I think maybe these pieces of complexity are both not helpful and
>>>>>>> also not (necessarily) breaking changes to alter, especially 
>>>>>>> considering we
>>>>>>> have inconsistency in the model.
>>>>>>>
>>>>>>> WDYT? And I wonder what this means for xlang and portability... how
>>>>>>> does continuation triggering even work? (if at all)
>>>>>>>
>>>>>>> Kenn
>>>>>>>
>>>>>>

Reply via email to