[ 
https://issues.apache.org/jira/browse/BEAM-2140?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16065235#comment-16065235
 ] 

Kenneth Knowles commented on BEAM-2140:
---------------------------------------

I considered for a long time what should happen with processing time triggers 
as far as window expiry. We spent quite some time coming up with the semantics 
at https://s.apache.org/beam-lateness#heading=h.hot1g47sz45s, long before Beam. 
I don't claim it is perfect (it is way too complex, for one) but it represents 
a lot of thought by lots of people. I think actually it does give some choices.

* an input being droppable does not necessarily mean you are required to drop 
it (some transforms may falter on droppable inputs, but that is specific to the 
transform)
* input timestamp and output timestamp are decoupled, so you can reason about 
whether to ignore input based on whether the resulting output would be droppable

Some possibilities that I think don't break the model:

*Treat processing time timers as inputs with some timestamp at EOW or some such*

The theme that timers are inputs is basically valid. We gain clarity by not 
conflating them in APIs and discussions. But how they interact with watermarks, 
etc, should be basically compatible. Currently processing time timers are 
treated as inputs with a timestamp equal to the input watermark at the moment 
of their arrival. So this change would cause an input hold because there is a 
known upcoming element that just hasn't arrived.

In streaming: this holds things up too much. It also makes repeatedly firing 
after processing time cause an infinite loops, versus what happens today where 
it naturally goes through window expiry and GC.

In batch: this breaks the unified model for processing historical data in a 
batch mode. With the semantics as they exist today, the way that batch "runs" 
triggers and processing time timers (by ignoring them) is completely compatible 
with the semantics. So any user who writes a correct transform has good 
assurances they it will work in both modes. If processing time timers held 
watermarks like this they would need to be processed in batch mode, yet they 
are contradictory with the whole point of it.

We can omit unbounded SDFs from this unification issue, probably, but a 
bounded-per-element SDF should certainly work on streamed unbounded input as 
well as bounded input.

*Decide whether to drop a processing time timer not based on the input 
watermark but based on whether its output would be droppable*

This lets the input watermark advance, but still does not allow infinitely 
repeating processing time timers to terminate with window expiry automatically, 
and it still breaks the unified model. We could alleviate both issues by 
refusing to set new timers that would already be expired. I think this is just 
a rabbit hole of unnatural corner cases so we should avoid it.

*In addition to the processing time timers that ProcessFn sets, also set a GC 
timer*

This seems straightforward and a simple and good idea. These timers are also 
still run in batch mode for historical reprocessing.

Can you clarify how it does not work? Is it because you need to create a "loop" 
that continues to fire until the residual is gone? Currently, there is simply 
no way to make a perpetual loop with timers because of the commentary below.

*Treat event time timers as inputs with their given timestamp*

This would combine the GC timer idea and let you make a looping structure. This 
currently cannot work because timers fire only when the input watermark is 
strictly greater than their timestamp. The semantics of "on time" and "final 
GC" panes depends on this, so we'd have a lot of work to do. But I think there 
might be a consistent world where event time timers are treated as elements, 
and fire when the watermark arrives at their timestamp. {{@OnWindowExpiration}} 
is then absolutely required and cannot be simulated by a timer.

> Fix SplittableDoFn ValidatesRunner tests in FlinkRunner
> -------------------------------------------------------
>
>                 Key: BEAM-2140
>                 URL: https://issues.apache.org/jira/browse/BEAM-2140
>             Project: Beam
>          Issue Type: Bug
>          Components: runner-flink
>            Reporter: Aljoscha Krettek
>            Assignee: Aljoscha Krettek
>
> As discovered as part of BEAM-1763, there is a failing SDF test. We disabled 
> the tests to unblock the open PR for BEAM-1763.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Reply via email to