GitHub user aljoscha opened a pull request:

    https://github.com/apache/incubator-beam/pull/328

    [BEAM-270] Support Timestamps/Windows in Flink Batch

    This change sits on top of #291 
    
    With this we can run all of the `RunnableOnService` tests on the Flink 
batch runner. Almost all of our own Flink-specific IT cases fail now because 
they verified the older, non-complete support for the Beam model. I did not 
remove them, since we should discuss whether we want to keep them, or keep only 
those that test a specific integration with Beam. If we want to keep them they 
have to be fixed.
    
    For now `CombineTest.testSessionsCombineWithContext` fails. I'm inserting a 
pre-shuffle combine phase but with merging windows the correct, final window in 
which an element will reside is not yet know in the pre-shuffle combine. This 
means that we don't get the correct side input for those values in the 
`CombineFnWithContext`. To fix the failing test I can get rid of the 
pre-shuffle combine, this means that we have more network traffic but are 
correct. What are the thought on this?
    
    For GroupByKey/Combine.PerKey I'm doing a shuffle by key and then an 
in-memory pass over the windows. For non-merging windows this could also be 
changed to do a shuffle by key-and-window but then we would have to explode all 
windows on the send side.
    
    **This is the text from the commit in question:**
    
       [BEAM-270] Support Timestamps/Windows in Flink Batch
        
        With this change we always use WindowedValue<T> for the underlying Flink
        DataSets instead of just T. This allows us to support windowing as well.
        
        This changes also a lot of other stuff enabled by the above:
        
         - Use WindowedValue throughout
         - Add proper translation for Window.into()
         - Make side inputs window aware
         - Make GroupByKey and Combine transformations window aware, this
           includes support for merging windows. GroupByKey is implemented as a
           Combine with a concatenating CombineFn, for simplicity
        
        This removes Flink specific transformations for things that are handled
        by builtin sources/sinks, among other things this:
        
         - Removes special translation for AvroIO.Read/Write and
           TextIO.Read/Write
         - Removes special support for Write.Bound, this was not working 
properly
           and is now handled by the Beam machinery that uses DoFns for this
         - Removes special translation for binary Co-Group, the code was still
           in there but was never used
        
        With this change all RunnableOnService tests run on Flink Batch.
    
    R: @mxm for Flink review
    R: @kennknowles, you are probably interested in how the shuffle/reduce works

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/aljoscha/incubator-beam 
flink-windowed-value-batch

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/incubator-beam/pull/328.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #328
    
----
commit b7335f9176e72c1dbd8f30f0b770d290e508dd9c
Author: Kenneth Knowles <[email protected]>
Date:   2016-05-02T20:11:12Z

    Add TestFlinkPipelineRunner to FlinkRunnerRegistrar
    
    This makes the runner available for selection by integration tests.

commit 093dc3ec186bc16fd070b09c1678a87dd8f6b47e
Author: Kenneth Knowles <[email protected]>
Date:   2016-05-02T21:04:20Z

    Configure RunnableOnService tests for Flink in batch mode
    
    Today Flink batch supports only global windows. This is a situation we
    intend our build to allow, eventually via JUnit category filtering.
    
    For now all the test classes that use non-global windows are excluded
    entirely via maven configuration. In the future, it should be on a
    per-test-method basis.

commit 3cb997342fc6a8230afcbbb84fb1742a53ef1683
Author: Kenneth Knowles <[email protected]>
Date:   2016-05-02T21:29:30Z

    Add Window.Bound translator to Flink batch
    
    This adds a Window.Bound translator that allows only
    GlobalWindows. It is a temporary measure, but one that
    brings the Flink batch translator in line with the
    Beam model - instead of "ignoring" windows, the GBK
    is a perfectly valid GBK for GlobalWindows.
    
    Previously, the SDK's runner test suite would fail
    due to the lack of a translator - now some of them
    will fail due to windowing support, but others have
    a chance.

commit 6d00cba87a16ff8f2318db87dab23cc09b7d1c20
Author: Aljoscha Krettek <[email protected]>
Date:   2016-05-06T06:26:50Z

    Fix Dangling Flink DataSets

commit af7f8580024466dd85cca8a2c070fd1db67490d4
Author: Aljoscha Krettek <[email protected]>
Date:   2016-05-06T07:38:55Z

    Add hamcrest dep to Flink Runner

commit abad0377e31235134a735e3a2012199bd98b6b16
Author: Kenneth Knowles <[email protected]>
Date:   2016-05-06T17:32:33Z

    Add RequiresFixedWindows test category

commit b98ee40a9ea32ec1e5f401eacd2300573cea4b7d
Author: Kenneth Knowles <[email protected]>
Date:   2016-05-06T17:33:05Z

    Exclude RequiresFixedWindows test category from Flink batch tests

commit 0cc7363f8324a3ee0aee687d6e433d7db34de093
Author: Aljoscha Krettek <[email protected]>
Date:   2016-05-06T11:09:28Z

    Fix Flink Create and GroupByNullKeyTest, Remove Special VoidSerializer

commit 5fa21dfa04c4d2cd259af01218e686d55be70b5c
Author: Aljoscha Krettek <[email protected]>
Date:   2016-05-06T11:31:18Z

    Fix faulty Flink Flatten with empty PCollectionList

commit 30818e8f09375bbaa1691962c1a61b7d9206ef86
Author: Aljoscha Krettek <[email protected]>
Date:   2016-05-06T14:39:02Z

    Fix Flink Batch Partial Combine/Combine
    
    We're now using a PerKeyCombineFnRunner for all interaction with the
    CombineFn. This required adding a proper ProcessContext in
    FlinkReduceFunction and FlinkPartialReduceFunction, along with adding
    support for side inputs there.

commit e663de12a9e1fe9dde873d0f4b1c4da1d6819e10
Author: Aljoscha Krettek <[email protected]>
Date:   2016-05-06T14:43:31Z

    Disable MaybeEmptyTestITCase
    
    This does not work because Flink Batch does not allow sending null
    elements. This is a pretty deep thing and hard to fix.
    
    In an earlier commit I removed the special TypeSerializer for VoidCoder.
    Before, we got away by always intercepting the VoidCoder and wrapping it in 
a
    TypeSerializer that would always emit a VoidValue instead of a proper
    null. If the user fn reads this, this will not be consistent with how it
    should behave, however.

commit 130665531db6725b2e2bae3d718176428a099c76
Author: Kenneth Knowles <[email protected]>
Date:   2016-05-06T17:54:41Z

    Remove unused threadCount from integration tests

commit c3f71e2742a5a0b15601b35f1c15164e14d068d6
Author: Kenneth Knowles <[email protected]>
Date:   2016-05-06T17:55:16Z

    Disable Flink streaming integration tests for now

commit a0376fffea1b363b07adbe25bc87a75a09a98cd5
Author: Kenneth Knowles <[email protected]>
Date:   2016-05-06T18:19:33Z

    Change PAssert's dummy inputs from (Void) null to integer 0

commit d60229d7dc79b42e721fe6494939cd8fc3eeefa7
Author: Kenneth Knowles <[email protected]>
Date:   2016-05-06T19:49:55Z

    Special casing job exec AssertionError in TestFlinkPipelineRunner

commit 054a42cbaf6c437facfe958f8a2e5dadb59d3e9a
Author: Aljoscha Krettek <[email protected]>
Date:   2016-05-09T10:40:27Z

    Use Int instead of Void in Combine.globally default insertion
    
    The single null value is only used as a dummy, thus can also be an
    integer. This makes it work with runners that don't support sending null
    values.

commit 8f7f9fbec1a23b098973d15cb60ee3bc55749c2f
Author: Aljoscha Krettek <[email protected]>
Date:   2016-05-09T12:07:57Z

    Fix accumulator update in Flink Reduce Function

commit 7abf64badb0c9c04b21ed91a96c15200d8b4513c
Author: Aljoscha Krettek <[email protected]>
Date:   2016-05-09T12:08:22Z

    Use Int instead of Void as dummy key in Combine.globally
    
    This makes it work with runners that don't support sending null
    values.

commit 6023361838363640815af370aa13572be9571b9c
Author: Aljoscha Krettek <[email protected]>
Date:   2016-05-09T12:56:36Z

    Use Int instead of Void in Sample
    
    The single null value is only used as a dummy, thus can also be an
    integer. This makes it work with runners that don't support sending null
    values.

commit fa0f7692e86ffc8057fd6559aa16ca7a89d567fb
Author: Aljoscha Krettek <[email protected]>
Date:   2016-05-09T12:57:26Z

    Use Int instead of Void in FlattenTest
    
    The single null value is only used as a dummy, thus can also be an
    integer. This makes it work with runners that don't support sending null
    values.

commit 19c4cd3332597462c1c6036d0ce2f4429f3bf408
Author: Kenneth Knowles <[email protected]>
Date:   2016-05-09T16:51:39Z

    Add RequiresTimestampControl category and tag some tests

commit 55efae45936d602ec1fc4ed47551762a82bc4e91
Author: Kenneth Knowles <[email protected]>
Date:   2016-05-09T16:58:19Z

    Exclude groups from Flink batch integration tests

commit 99fe94c36ac55cae30ff68e7a7a00180ed69e72a
Author: Kenneth Knowles <[email protected]>
Date:   2016-05-09T18:08:57Z

    fixup! fix checkstyle of FlattenTest

commit 1ee56853e025753cc32b6a76d0551f22b387980a
Author: Aljoscha Krettek <[email protected]>
Date:   2016-05-10T11:53:03Z

    [BEAM-270] Support Timestamps/Windows in Flink Batch
    
    With this change we always use WindowedValue<T> for the underlying Flink
    DataSets instead of just T. This allows us to support windowing as well.
    
    This changes also a lot of other stuff enabled by the above:
    
     - Use WindowedValue throughout
     - Add proper translation for Window.into()
     - Make side inputs window aware
     - Make GroupByKey and Combine transformations window aware, this
       includes support for merging windows. GroupByKey is implemented as a
       Combine with a concatenating CombineFn, for simplicity
    
    This removes Flink specific transformations for things that are handled
    by builtin sources/sinks, among other things this:
    
     - Removes special translation for AvroIO.Read/Write and
       TextIO.Read/Write
     - Removes special support for Write.Bound, this was not working properly
       and is now handled by the Beam machinery that uses DoFns for this
     - Removes special translation for binary Co-Group, the code was still
       in there but was never used
    
    With this change all RunnableOnService tests run on Flink Batch.

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---

Reply via email to