[
https://issues.apache.org/jira/browse/BEAM-2140?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16069802#comment-16069802
]
Aljoscha Krettek commented on BEAM-2140:
----------------------------------------
Yep, in the Flink Runner the processing path for {{ProcessFn}} contains
{{StatefulDoFnRunner}}, that's why timers were dropped once the input watermark
went to +Inf. I fixed this in the branch I posted earlier by changing
{{SplittableDoFnOperator}} to not use that code path anymore but instead use
completely custom code for processing a splittable DoFn:
https://github.com/apache/beam/blob/10b1b598100541ff37734a04850ada45fc362b99/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/SplittableDoFnOperator.java#L73-L73.
This fixed the problem of dropped processing-time timers.
The other problem (in the Flink Runner) was that processing-time timers are
simply dropped if the pipeline is shutting down. I'm getting around this by
setting a "last resort" event-time timer that fires when the watermark goes to
+Inf. There I'm processing the remaining restrictions until they're exhausted.
Splittable DoFn processing in the {{SplittableDoFnOperator}} is now split
(hehe) into three methods:
* {{processElement()}}: seed state and process restriction once, set next
processing-time timer and set last resort event-time timer
* {{onProcessingTime()}}: process restriction once and set next
processing-time timer, cleanup all state if restriction is exhausted
* {{onEventTime()}}: process restriction until exhausted, cleanup all state
There is no way of getting around Flink dropping processing-time timers so if
we want to get the Flink Runner to directly use {{ProcessFn}} we should add
this "last resort" timer there as well. I think it makes sense to have this in
general anyways. [~jkff] what do you think about this?
Regarding state leakage: AFAIK a splittable DoFn is not allowed to have any
custom state or timers, right? And {{ProcessFn}} makes sure to cleanup the
element state and restriction state when a restriction is exhausted so there
should be no state leakage, right?
[~jkff] You mentioned that "the input watermark is held by "pending elements"".
Is this true? I thought that only the output watermark is held by pending
elements.
> Fix SplittableDoFn ValidatesRunner tests in FlinkRunner
> -------------------------------------------------------
>
> Key: BEAM-2140
> URL: https://issues.apache.org/jira/browse/BEAM-2140
> Project: Beam
> Issue Type: Bug
> Components: runner-flink
> Reporter: Aljoscha Krettek
> Assignee: Aljoscha Krettek
>
> As discovered as part of BEAM-1763, there is a failing SDF test. We disabled
> the tests to unblock the open PR for BEAM-1763.
--
This message was sent by Atlassian JIRA
(v6.4.14#64029)