[
https://issues.apache.org/jira/browse/BEAM-2140?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16070150#comment-16070150
]
ASF GitHub Bot commented on BEAM-2140:
--------------------------------------
GitHub user aljoscha opened a pull request:
https://github.com/apache/beam/pull/3480
[BEAM-2140] Execute Splittable DoFn directly in Flink Runner
Before, we were using ProcessFn. This was causing problems with the
Flink Runner for two reasons:
1. StatefulDoFnRunner is in the processing path, which means
processing-time timers are being dropped when the watermark reaches +Inf
2. When a pipeline shuts down (for example, when bounded sources shut
down) Flink will drop any outstanding processing-time timers, meaning
that that any remaining Restrictions will not be processed.
The fix for 1. is to execute the splittable DoFn directly, thereby
bypassing the late data/timer dropping logic.
The fix for 2. builds on the fix for 1. and also introduces a "last
resort" even-time timer that fires at +Inf and makes sure that any
remaining restrictions are being exhausted.
R: @jkff Not sure if we wan't to fix it like this or maybe adapt
`ProcessFn` and remove `StatefulDoFnRunner` from the processing path.
You can merge this pull request into a Git repository by running:
$ git pull https://github.com/aljoscha/beam
fix-flink-splittable-dofn-squashed
Alternatively you can review and apply these changes as the patch at:
https://github.com/apache/beam/pull/3480.patch
To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:
This closes #3480
----
commit e30efe70bdafbbcc5bc1082f980867e58684c351
Author: Aljoscha Krettek <[email protected]>
Date: 2017-06-26T10:10:18Z
[BEAM-2140] Execute Splittable DoFn directly in Flink Runner
Before, we were using ProcessFn. This was causing problems with the
Flink Runner for two reasons:
1. StatefulDoFnRunner is in the processing path, which means
processing-time timers are being dropped when the watermark reaches +Inf
2. When a pipeline shuts down (for example, when bounded sources shut
down) Flink will drop any outstanding processing-time timers, meaning
that that any remaining Restrictions will not be processed.
The fix for 1. is to execute the splittable DoFn directly, thereby
bypassing the late data/timer dropping logic.
The fix for 2. builds on the fix for 1. and also introduces a "last
resort" even-time timer that fires at +Inf and makes sure that any
remaining restrictions are being exhausted.
----
> Fix SplittableDoFn ValidatesRunner tests in FlinkRunner
> -------------------------------------------------------
>
> Key: BEAM-2140
> URL: https://issues.apache.org/jira/browse/BEAM-2140
> Project: Beam
> Issue Type: Bug
> Components: runner-flink
> Reporter: Aljoscha Krettek
> Assignee: Aljoscha Krettek
>
> As discovered as part of BEAM-1763, there is a failing SDF test. We disabled
> the tests to unblock the open PR for BEAM-1763.
--
This message was sent by Atlassian JIRA
(v6.4.14#64029)