[ 
https://issues.apache.org/jira/browse/BEAM-2140?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16069802#comment-16069802
 ] 

Aljoscha Krettek commented on BEAM-2140:
----------------------------------------

Yep, in the Flink Runner the processing path for {{ProcessFn}} contains 
{{StatefulDoFnRunner}}, that's why timers were dropped once the input watermark 
went to +Inf. I fixed this in the branch I posted earlier by changing 
{{SplittableDoFnOperator}} to not use that code path anymore but instead use 
completely custom code for processing a splittable DoFn: 
https://github.com/apache/beam/blob/10b1b598100541ff37734a04850ada45fc362b99/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/SplittableDoFnOperator.java#L73-L73.
 This fixed the problem of dropped processing-time timers.

The other problem (in the Flink Runner) was that processing-time timers are 
simply dropped if the pipeline is shutting down. I'm getting around this by 
setting a "last resort" event-time timer that fires when the watermark goes to 
+Inf. There I'm processing the remaining restrictions until they're exhausted. 
Splittable DoFn processing in the {{SplittableDoFnOperator}} is now split 
(hehe) into three methods:
 * {{processElement()}}: seed state and process restriction once, set next 
processing-time timer and set last resort event-time timer
 * {{onProcessingTime()}}: process restriction once and set next 
processing-time timer, cleanup all state if restriction is exhausted
 * {{onEventTime()}}: process restriction until exhausted, cleanup all state

There is no way of getting around Flink dropping processing-time timers so if 
we want to get the Flink Runner to directly use {{ProcessFn}} we should add 
this "last resort" timer there as well. I think it makes sense to have this in 
general anyways. [~jkff] what do you think about this?

Regarding state leakage: AFAIK a splittable DoFn is not allowed to have any 
custom state or timers, right? And {{ProcessFn}} makes sure to cleanup the 
element state and restriction state when a restriction is exhausted so there 
should be no state leakage, right?

[~jkff] You mentioned that "the input watermark is held by "pending elements"". 
Is this true? I thought that only the output watermark is held by pending 
elements.

> Fix SplittableDoFn ValidatesRunner tests in FlinkRunner
> -------------------------------------------------------
>
>                 Key: BEAM-2140
>                 URL: https://issues.apache.org/jira/browse/BEAM-2140
>             Project: Beam
>          Issue Type: Bug
>          Components: runner-flink
>            Reporter: Aljoscha Krettek
>            Assignee: Aljoscha Krettek
>
> As discovered as part of BEAM-1763, there is a failing SDF test. We disabled 
> the tests to unblock the open PR for BEAM-1763.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Reply via email to