On Wed, Feb 17, 2021 at 1:56 PM Kenneth Knowles <k...@apache.org
<mailto:k...@apache.org>> wrote:
On Wed, Feb 17, 2021 at 1:06 PM Robert Bradshaw
<rober...@google.com <mailto:rober...@google.com>> wrote:
I would prefer to leave downstream triggering up to the
runner (or, better, leave upstream triggering up to the
runner, a la sink triggers), but one problem is that without
an explicit AfterSynchronizedProcessingTime one can't tell if
the downstream ProcessingTime between two groupings is due to
an explicit re-triggering between them or inherited from one
to the other.
I mean to propose that there should be no triggering specified
unless due to explicit re-triggering.
You're saying that we leave the trigger (and perhaps other) fields of
the WindowingStrategy attached to PCollections downstream the first
GBK unset in the proto? And let runners walk over the graph to infer
it? I could be OK with making this legal, though updating all SDKs
and Runners to handle this doesn't seem high priority at the moment.
(and BTW yes I agree about sink triggers, but we need retractions
and probably some theoretical work before we can aim for that)
Kenn
On Wed, Feb 17, 2021 at 12:37 PM Kenneth Knowles
<k...@apache.org <mailto:k...@apache.org>> wrote:
Just for the thread I want to comment on another, more
drastic approach: eliminate continuation triggers from
the model, leaving downstream triggering up to a runner.
This approach is not viable because transforms may need
to change their behavior based on whether or not a
trigger will fire more than once. Transforms can and do
inspect the windowing strategy to do things differently.
Kenn
On Wed, Feb 17, 2021 at 11:47 AM Reuven Lax
<re...@google.com <mailto:re...@google.com>> wrote:
I'll say that synchronized processing time has
confused users before. Users sometimes use
processing-time triggers to optimize latency, banking
that that will decouple stage latency from the
long-tail latency of previous stages. However
continuation triggers silently switching to
synchronized processing time has defeated that, and
it wasn't clear to users why.
On Wed, Feb 17, 2021 at 11:12 AM Robert Bradshaw
<rober...@google.com <mailto:rober...@google.com>> wrote:
On Fri, Feb 12, 2021 at 9:09 AM Kenneth Knowles
<k...@apache.org <mailto:k...@apache.org>> wrote:
On Thu, Feb 11, 2021 at 9:38 PM Robert
Bradshaw <rober...@google.com
<mailto:rober...@google.com>> wrote:
Of course the right answer is to just
implement sink triggers and sidestep the
question altogether :).
In the meantime, I think leaving
AfterSynchronizedProcessingTime in the
model makes the most sense, and runners
can choose an implementation between
firing eagerly and waiting some amount of
time until they think all (most?)
downstream results are in before firing,
depending on how smart the runner wants
to be. As you point out, they're all
correct, and we'll have multiple firings
due to the upstream trigger anyway, and
this is safer than it used to be (though
still possibly requires work).
Just to clarify, as I got a little confused,
is your suggestion: Leave
AfterSynchronizedProcessingTime* triggers in
the model/proto, let the SDK put them in
where they want, and let runners decide how
to interpret them? (this SGTM and requires
the least/no changes)
Yep. We may want to update Python/Go to produce
AfterSynchronizedProcessingTime downstream of
ProcessingTime triggers too, eventually, to
better express intent.
Kenn
*noting that
TimeDomain.SYNCHRONIZED_PROCESSING_TIME is
not related to this, except in
implementation, and should be removed either way.
On Wed, Feb 10, 2021 at 1:37 PM Kenneth
Knowles <k...@apache.org
<mailto:k...@apache.org>> wrote:
Hi all,
TL;DR:
1. should we replace "after
synchronized processing time" with
"after count 1"?
2. should we remove "continuation
trigger" and leave this to runners?
----
"AfterSynchronizedProcessingTime"
triggers were invented to solve a
specific problem. They are
inconsistent across SDKs today.
- You have an aggregation/GBK with
aligned processing time trigger like
("output every minute on the minute")
- You have a downstream
aggregation/GBK between that and the sink
- You expect to have about one
output every minute per key+window pair
Any output of the upstream
aggregation may contribute to any
key+window of the downstream
aggregation. The
AfterSynchronizedProcessingTime
trigger waits for all the processing
time based triggers to fire and
commit their outputs. The downstream
aggregation will output as fast as
possible in panes consistent with the
upstream aggregation.
- The Java SDK behavior is as above,
to output "as fast as reasonable".
- The Python SDK never uses
"AfterSynchronizedProcessingTime"
triggers but simply propagates the
same trigger to the next GBK,
creating additional delay.
- I don't know what the Go SDK may
do, if it supports this at all.
Any behavior could be defined as
"correct". A simple option could be
to have the downstream aggregation
"fire always" aka "after element
count 1". How would this change
things? We would potentially see many
more outputs.
Why did we do this in the first
place? There are (at least) these
reasons:
- Previously, triggers could
"finish" an aggregation thus dropping
all further data. In this case,
waiting for all outputs is critical
or else you lose data. Now triggers
cannot finish aggregations.
- Whenever there may be more than
one pane, a user has to write logic
to compensate and deal with it.
Changing from guaranteed single pane
to multi-pane would break things. So
if the user configures a single
firing, all downstream aggregations
must respect it. Now that triggers
cannot finish, I think processing
time can only be used in multi-pane
contexts anyhow.
- The above example illustrates how
the behavior in Java maintains
something that the user will expect.
Or so we think. Maybe users don't care.
How did we get into this inconsistent
state? When the user specifies
triggering it applies to the very
nearest aggregation/GBK. The SDK
decides what triggering to insert
downstream. One possibility is to
remove this and have it unspecified,
left to runner behavior.
I think maybe these pieces of
complexity are both not helpful and
also not (necessarily) breaking
changes to alter, especially
considering we have inconsistency in
the model.
WDYT? And I wonder what this means
for xlang and portability... how does
continuation triggering even work?
(if at all)
Kenn