[
https://issues.apache.org/jira/browse/BEAM-2140?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16062881#comment-16062881
]
Aljoscha Krettek commented on BEAM-2140:
----------------------------------------
[~lzljs3620320]/[~jkff] I looked into this again and I think I finally found
all the issues:
1. Processing-time timers are in fact dropped but I'm wondering whether this is
actually "working as intended". Consider a stateful {{DoFn}} that sets a
processing-time timer for some time in the future. Before this timer fires the
sources terminate (they send the +Inf watermark and the runner can shut down,
although I think this is questionable). The Runner still has that pending
processing-time timer, should it block shutting down until that timer is fired?
Or fire it right away? Or drop it? (Flink currently shuts down, thereby
dropping that pending timer). Maybe [~kenn] also has an opinion on this since
it is about stateful/timely {{DoFn}} in general.
2. {{SplittableParDoViaKeyedWorkItems.ProcessFn}} doesn't behave as a
stateful/timely {{DoFn}} should. It uses {{TimerInternals}} and instead of
having an {{@OnTimer}} method it expects firing timers to come in the form of a
{{KeyedWorkIterm}}. This messes with {{DoFnRunner.onTimer()}} (because it
circumvents it) which is especially bad for {{StatefulDoFnRunner}} which has
extra logic in {{onTimer()}}. It also leads to this somewhat awkward code in
the Flink Runner where I manually filter out an event-time timer because the
{{ProcessFn}} is not expecting that:
https://github.com/apache/beam/blob/c10c4da9ab1bdbfb2530aa5d5f3ddb0670594397/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/SplittableDoFnOperator.java#L155-L155
What do you think?
> Fix SplittableDoFn ValidatesRunner tests in FlinkRunner
> -------------------------------------------------------
>
> Key: BEAM-2140
> URL: https://issues.apache.org/jira/browse/BEAM-2140
> Project: Beam
> Issue Type: Bug
> Components: runner-flink
> Reporter: Aljoscha Krettek
> Assignee: Aljoscha Krettek
>
> As discovered as part of BEAM-1763, there is a failing SDF test. We disabled
> the tests to unblock the open PR for BEAM-1763.
--
This message was sent by Atlassian JIRA
(v6.4.14#64029)