On Wed, Feb 17, 2021 at 1:56 PM Kenneth Knowles
<k...@apache.org <mailto:k...@apache.org>> wrote:
On Wed, Feb 17, 2021 at 1:06 PM Robert Bradshaw
<rober...@google.com <mailto:rober...@google.com>>
wrote:
I would prefer to leave downstream triggering up
to the runner (or, better, leave upstream
triggering up to the runner, a la sink
triggers), but one problem is that without an
explicit AfterSynchronizedProcessingTime one
can't tell if the downstream ProcessingTime
between two groupings is due to an explicit
re-triggering between them or inherited from one
to the other.
I mean to propose that there should be no triggering
specified unless due to explicit re-triggering.
You're saying that we leave the trigger (and perhaps
other) fields of the WindowingStrategy attached to
PCollections downstream the first GBK unset in the
proto? And let runners walk over the graph to infer it?
I could be OK with making this legal, though updating
all SDKs and Runners to handle this doesn't seem
high priority at the moment.
(and BTW yes I agree about sink triggers, but we
need retractions and probably some theoretical work
before we can aim for that)
Kenn
On Wed, Feb 17, 2021 at 12:37 PM Kenneth Knowles
<k...@apache.org <mailto:k...@apache.org>> wrote:
Just for the thread I want to comment on
another, more drastic approach: eliminate
continuation triggers from the model,
leaving downstream triggering up to a
runner. This approach is not viable because
transforms may need to change their behavior
based on whether or not a trigger will fire
more than once. Transforms can and do
inspect the windowing strategy to do things
differently.
Kenn
On Wed, Feb 17, 2021 at 11:47 AM Reuven Lax
<re...@google.com <mailto:re...@google.com>>
wrote:
I'll say that synchronized processing
time has confused users before. Users
sometimes use processing-time triggers
to optimize latency, banking that that
will decouple stage latency from the
long-tail latency of previous stages.
However continuation triggers silently
switching to synchronized processing
time has defeated that, and it wasn't
clear to users why.
On Wed, Feb 17, 2021 at 11:12 AM Robert
Bradshaw <rober...@google.com
<mailto:rober...@google.com>> wrote:
On Fri, Feb 12, 2021 at 9:09 AM
Kenneth Knowles <k...@apache.org
<mailto:k...@apache.org>> wrote:
On Thu, Feb 11, 2021 at 9:38 PM
Robert Bradshaw
<rober...@google.com
<mailto:rober...@google.com>> wrote:
Of course the right answer
is to just implement sink
triggers and sidestep the
question altogether :).
In the meantime, I think
leaving
AfterSynchronizedProcessingTime
in the model makes the most
sense, and runners can
choose an implementation
between firing eagerly and
waiting some amount of time
until they think all (most?)
downstream results are in
before firing, depending on
how smart the runner wants
to be. As you point out,
they're all correct, and
we'll have multiple firings
due to the upstream trigger
anyway, and this is safer
than it used to be (though
still possibly requires work).
Just to clarify, as I got a
little confused, is your
suggestion: Leave
AfterSynchronizedProcessingTime*
triggers in the model/proto, let
the SDK put them in where they
want, and let runners decide how
to interpret them? (this SGTM
and requires the least/no changes)
Yep. We may want to update Python/Go
to produce
AfterSynchronizedProcessingTime
downstream of ProcessingTime
triggers too, eventually, to better
express intent.
Kenn
*noting that
TimeDomain.SYNCHRONIZED_PROCESSING_TIME
is not related to this, except
in implementation, and should be
removed either way.
On Wed, Feb 10, 2021 at 1:37
PM Kenneth Knowles
<k...@apache.org
<mailto:k...@apache.org>> wrote:
Hi all,
TL;DR:
1. should we replace
"after synchronized
processing time" with
"after count 1"?
2. should we remove
"continuation trigger"
and leave this to runners?
----
"AfterSynchronizedProcessingTime"
triggers were invented
to solve a specific
problem. They are
inconsistent across SDKs
today.
- You have an
aggregation/GBK with
aligned processing time
trigger like ("output
every minute on the minute")
- You have a downstream
aggregation/GBK between
that and the sink
- You expect to have
about one output every
minute per key+window pair
Any output of the
upstream aggregation may
contribute to any
key+window of the
downstream aggregation.
The
AfterSynchronizedProcessingTime
trigger waits for all
the processing time
based triggers to fire
and commit their
outputs. The downstream
aggregation will output
as fast as possible in
panes consistent with
the upstream aggregation.
- The Java SDK behavior
is as above, to output
"as fast as reasonable".
- The Python SDK never
uses
"AfterSynchronizedProcessingTime"
triggers but simply
propagates the same
trigger to the next GBK,
creating additional delay.
- I don't know what the
Go SDK may do, if it
supports this at all.
Any behavior could be
defined as "correct". A
simple option could be
to have the downstream
aggregation "fire
always" aka "after
element count 1". How
would this change
things? We would
potentially see many
more outputs.
Why did we do this in
the first place? There
are (at least) these
reasons:
- Previously, triggers
could "finish" an
aggregation thus
dropping all further
data. In this case,
waiting for all outputs
is critical or else you
lose data. Now triggers
cannot finish aggregations.
- Whenever there may be
more than one pane, a
user has to write logic
to compensate and deal
with it. Changing from
guaranteed single pane
to multi-pane would
break things. So if the
user configures a single
firing, all downstream
aggregations must
respect it. Now that
triggers cannot finish,
I think processing time
can only be used in
multi-pane contexts anyhow.
- The above example
illustrates how the
behavior in Java
maintains something that
the user will expect. Or
so we think. Maybe users
don't care.
How did we get into this
inconsistent state? When
the user specifies
triggering it applies to
the very nearest
aggregation/GBK. The SDK
decides what triggering
to insert downstream.
One possibility is to
remove this and have it
unspecified, left to
runner behavior.
I think maybe these
pieces of complexity are
both not helpful and
also not (necessarily)
breaking changes to
alter, especially
considering we have
inconsistency in the model.
WDYT? And I wonder what
this means for xlang and
portability... how does
continuation triggering
even work? (if at all)
Kenn