GitHub user aljoscha opened a pull request:

    https://github.com/apache/beam/pull/3480

    [BEAM-2140] Execute Splittable DoFn directly in Flink Runner

    Before, we were using ProcessFn. This was causing problems with the
    Flink Runner for two reasons:
    
    1. StatefulDoFnRunner is in the processing path, which means
    processing-time timers are being dropped when the watermark reaches +Inf
    
    2. When a pipeline shuts down (for example, when bounded sources shut
    down) Flink will drop any outstanding processing-time timers, meaning
    that that any remaining Restrictions will not be processed.
    
    The fix for 1. is to execute the splittable DoFn directly, thereby
    bypassing the late data/timer dropping logic.
    
    The fix for 2. builds on the fix for 1. and also introduces a "last
    resort" even-time timer that fires at +Inf and makes sure that any
    remaining restrictions are being exhausted.
    
    R: @jkff Not sure if we wan't to fix it like this or maybe adapt 
`ProcessFn` and remove `StatefulDoFnRunner` from the processing path.


You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/aljoscha/beam 
fix-flink-splittable-dofn-squashed

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/beam/pull/3480.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #3480
    
----
commit e30efe70bdafbbcc5bc1082f980867e58684c351
Author: Aljoscha Krettek <[email protected]>
Date:   2017-06-26T10:10:18Z

    [BEAM-2140] Execute Splittable DoFn directly in Flink Runner
    
    Before, we were using ProcessFn. This was causing problems with the
    Flink Runner for two reasons:
    
    1. StatefulDoFnRunner is in the processing path, which means
    processing-time timers are being dropped when the watermark reaches +Inf
    
    2. When a pipeline shuts down (for example, when bounded sources shut
    down) Flink will drop any outstanding processing-time timers, meaning
    that that any remaining Restrictions will not be processed.
    
    The fix for 1. is to execute the splittable DoFn directly, thereby
    bypassing the late data/timer dropping logic.
    
    The fix for 2. builds on the fix for 1. and also introduces a "last
    resort" even-time timer that fires at +Inf and makes sure that any
    remaining restrictions are being exhausted.

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---

Reply via email to