Hi all,

TL;DR:
1. should we replace "after synchronized processing time" with "after count
1"?
2. should we remove "continuation trigger" and leave this to runners?

----

"AfterSynchronizedProcessingTime" triggers were invented to solve a
specific problem. They are inconsistent across SDKs today.

 - You have an aggregation/GBK with aligned processing time trigger like
("output every minute on the minute")
 - You have a downstream aggregation/GBK between that and the sink
 - You expect to have about one output every minute per key+window pair

Any output of the upstream aggregation may contribute to any key+window of
the downstream aggregation. The AfterSynchronizedProcessingTime trigger
waits for all the processing time based triggers to fire and commit their
outputs. The downstream aggregation will output as fast as possible in
panes consistent with the upstream aggregation.

 - The Java SDK behavior is as above, to output "as fast as reasonable".
 - The Python SDK never uses "AfterSynchronizedProcessingTime" triggers but
simply propagates the same trigger to the next GBK, creating additional
delay.
 - I don't know what the Go SDK may do, if it supports this at all.

Any behavior could be defined as "correct". A simple option could be to
have the downstream aggregation "fire always" aka "after element count 1".
How would this change things? We would potentially see many more outputs.

Why did we do this in the first place? There are (at least) these reasons:

 - Previously, triggers could "finish" an aggregation thus dropping all
further data. In this case, waiting for all outputs is critical or else you
lose data. Now triggers cannot finish aggregations.
 - Whenever there may be more than one pane, a user has to write logic to
compensate and deal with it. Changing from guaranteed single pane to
multi-pane would break things. So if the user configures a single firing,
all downstream aggregations must respect it. Now that triggers cannot
finish, I think processing time can only be used in multi-pane contexts
anyhow.
 - The above example illustrates how the behavior in Java maintains
something that the user will expect. Or so we think. Maybe users don't care.

How did we get into this inconsistent state? When the user specifies
triggering it applies to the very nearest aggregation/GBK. The SDK decides
what triggering to insert downstream. One possibility is to remove this and
have it unspecified, left to runner behavior.

I think maybe these pieces of complexity are both not helpful and also not
(necessarily) breaking changes to alter, especially considering we have
inconsistency in the model.

WDYT? And I wonder what this means for xlang and portability... how does
continuation triggering even work? (if at all)

Kenn

Reply via email to