[ 
https://issues.apache.org/jira/browse/BEAM-2140?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16062881#comment-16062881
 ] 

Aljoscha Krettek commented on BEAM-2140:
----------------------------------------

[~lzljs3620320]/[~jkff] I looked into this again and I think I finally found 
all the issues:

1. Processing-time timers are in fact dropped but I'm wondering whether this is 
actually "working as intended". Consider a stateful {{DoFn}} that sets a 
processing-time timer for some time in the future. Before this timer fires the 
sources terminate (they send the +Inf watermark and the runner can shut down, 
although I think this is questionable). The Runner still has that pending 
processing-time timer, should it block shutting down until that timer is fired? 
Or fire it right away? Or drop it? (Flink currently shuts down, thereby 
dropping that pending timer). Maybe [~kenn] also has an opinion on this since 
it is about stateful/timely {{DoFn}} in general.

2. {{SplittableParDoViaKeyedWorkItems.ProcessFn}} doesn't behave as a 
stateful/timely {{DoFn}} should. It uses {{TimerInternals}} and instead of 
having an {{@OnTimer}} method it expects firing timers to come in the form of a 
{{KeyedWorkIterm}}. This messes with {{DoFnRunner.onTimer()}} (because it 
circumvents it) which is especially bad for {{StatefulDoFnRunner}} which has 
extra logic in {{onTimer()}}. It also leads to this somewhat awkward code in 
the Flink Runner where I manually filter out an event-time timer because the 
{{ProcessFn}} is not expecting that: 
https://github.com/apache/beam/blob/c10c4da9ab1bdbfb2530aa5d5f3ddb0670594397/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/SplittableDoFnOperator.java#L155-L155

What do you think?

> Fix SplittableDoFn ValidatesRunner tests in FlinkRunner
> -------------------------------------------------------
>
>                 Key: BEAM-2140
>                 URL: https://issues.apache.org/jira/browse/BEAM-2140
>             Project: Beam
>          Issue Type: Bug
>          Components: runner-flink
>            Reporter: Aljoscha Krettek
>            Assignee: Aljoscha Krettek
>
> As discovered as part of BEAM-1763, there is a failing SDF test. We disabled 
> the tests to unblock the open PR for BEAM-1763.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Reply via email to