[
https://issues.apache.org/jira/browse/BEAM-2140?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16065235#comment-16065235
]
Kenneth Knowles commented on BEAM-2140:
---------------------------------------
I considered for a long time what should happen with processing time triggers
as far as window expiry. We spent quite some time coming up with the semantics
at https://s.apache.org/beam-lateness#heading=h.hot1g47sz45s, long before Beam.
I don't claim it is perfect (it is way too complex, for one) but it represents
a lot of thought by lots of people. I think actually it does give some choices.
* an input being droppable does not necessarily mean you are required to drop
it (some transforms may falter on droppable inputs, but that is specific to the
transform)
* input timestamp and output timestamp are decoupled, so you can reason about
whether to ignore input based on whether the resulting output would be droppable
Some possibilities that I think don't break the model:
*Treat processing time timers as inputs with some timestamp at EOW or some such*
The theme that timers are inputs is basically valid. We gain clarity by not
conflating them in APIs and discussions. But how they interact with watermarks,
etc, should be basically compatible. Currently processing time timers are
treated as inputs with a timestamp equal to the input watermark at the moment
of their arrival. So this change would cause an input hold because there is a
known upcoming element that just hasn't arrived.
In streaming: this holds things up too much. It also makes repeatedly firing
after processing time cause an infinite loops, versus what happens today where
it naturally goes through window expiry and GC.
In batch: this breaks the unified model for processing historical data in a
batch mode. With the semantics as they exist today, the way that batch "runs"
triggers and processing time timers (by ignoring them) is completely compatible
with the semantics. So any user who writes a correct transform has good
assurances they it will work in both modes. If processing time timers held
watermarks like this they would need to be processed in batch mode, yet they
are contradictory with the whole point of it.
We can omit unbounded SDFs from this unification issue, probably, but a
bounded-per-element SDF should certainly work on streamed unbounded input as
well as bounded input.
*Decide whether to drop a processing time timer not based on the input
watermark but based on whether its output would be droppable*
This lets the input watermark advance, but still does not allow infinitely
repeating processing time timers to terminate with window expiry automatically,
and it still breaks the unified model. We could alleviate both issues by
refusing to set new timers that would already be expired. I think this is just
a rabbit hole of unnatural corner cases so we should avoid it.
*In addition to the processing time timers that ProcessFn sets, also set a GC
timer*
This seems straightforward and a simple and good idea. These timers are also
still run in batch mode for historical reprocessing.
Can you clarify how it does not work? Is it because you need to create a "loop"
that continues to fire until the residual is gone? Currently, there is simply
no way to make a perpetual loop with timers because of the commentary below.
*Treat event time timers as inputs with their given timestamp*
This would combine the GC timer idea and let you make a looping structure. This
currently cannot work because timers fire only when the input watermark is
strictly greater than their timestamp. The semantics of "on time" and "final
GC" panes depends on this, so we'd have a lot of work to do. But I think there
might be a consistent world where event time timers are treated as elements,
and fire when the watermark arrives at their timestamp. {{@OnWindowExpiration}}
is then absolutely required and cannot be simulated by a timer.
> Fix SplittableDoFn ValidatesRunner tests in FlinkRunner
> -------------------------------------------------------
>
> Key: BEAM-2140
> URL: https://issues.apache.org/jira/browse/BEAM-2140
> Project: Beam
> Issue Type: Bug
> Components: runner-flink
> Reporter: Aljoscha Krettek
> Assignee: Aljoscha Krettek
>
> As discovered as part of BEAM-1763, there is a failing SDF test. We disabled
> the tests to unblock the open PR for BEAM-1763.
--
This message was sent by Atlassian JIRA
(v6.4.14#64029)