GitHub user aljoscha opened a pull request:
https://github.com/apache/incubator-beam/pull/328
[BEAM-270] Support Timestamps/Windows in Flink Batch
This change sits on top of #291
With this we can run all of the `RunnableOnService` tests on the Flink
batch runner. Almost all of our own Flink-specific IT cases fail now because
they verified the older, non-complete support for the Beam model. I did not
remove them, since we should discuss whether we want to keep them, or keep only
those that test a specific integration with Beam. If we want to keep them they
have to be fixed.
For now `CombineTest.testSessionsCombineWithContext` fails. I'm inserting a
pre-shuffle combine phase but with merging windows the correct, final window in
which an element will reside is not yet know in the pre-shuffle combine. This
means that we don't get the correct side input for those values in the
`CombineFnWithContext`. To fix the failing test I can get rid of the
pre-shuffle combine, this means that we have more network traffic but are
correct. What are the thought on this?
For GroupByKey/Combine.PerKey I'm doing a shuffle by key and then an
in-memory pass over the windows. For non-merging windows this could also be
changed to do a shuffle by key-and-window but then we would have to explode all
windows on the send side.
**This is the text from the commit in question:**
[BEAM-270] Support Timestamps/Windows in Flink Batch
With this change we always use WindowedValue<T> for the underlying Flink
DataSets instead of just T. This allows us to support windowing as well.
This changes also a lot of other stuff enabled by the above:
- Use WindowedValue throughout
- Add proper translation for Window.into()
- Make side inputs window aware
- Make GroupByKey and Combine transformations window aware, this
includes support for merging windows. GroupByKey is implemented as a
Combine with a concatenating CombineFn, for simplicity
This removes Flink specific transformations for things that are handled
by builtin sources/sinks, among other things this:
- Removes special translation for AvroIO.Read/Write and
TextIO.Read/Write
- Removes special support for Write.Bound, this was not working
properly
and is now handled by the Beam machinery that uses DoFns for this
- Removes special translation for binary Co-Group, the code was still
in there but was never used
With this change all RunnableOnService tests run on Flink Batch.
R: @mxm for Flink review
R: @kennknowles, you are probably interested in how the shuffle/reduce works
You can merge this pull request into a Git repository by running:
$ git pull https://github.com/aljoscha/incubator-beam
flink-windowed-value-batch
Alternatively you can review and apply these changes as the patch at:
https://github.com/apache/incubator-beam/pull/328.patch
To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:
This closes #328
----
commit b7335f9176e72c1dbd8f30f0b770d290e508dd9c
Author: Kenneth Knowles <[email protected]>
Date: 2016-05-02T20:11:12Z
Add TestFlinkPipelineRunner to FlinkRunnerRegistrar
This makes the runner available for selection by integration tests.
commit 093dc3ec186bc16fd070b09c1678a87dd8f6b47e
Author: Kenneth Knowles <[email protected]>
Date: 2016-05-02T21:04:20Z
Configure RunnableOnService tests for Flink in batch mode
Today Flink batch supports only global windows. This is a situation we
intend our build to allow, eventually via JUnit category filtering.
For now all the test classes that use non-global windows are excluded
entirely via maven configuration. In the future, it should be on a
per-test-method basis.
commit 3cb997342fc6a8230afcbbb84fb1742a53ef1683
Author: Kenneth Knowles <[email protected]>
Date: 2016-05-02T21:29:30Z
Add Window.Bound translator to Flink batch
This adds a Window.Bound translator that allows only
GlobalWindows. It is a temporary measure, but one that
brings the Flink batch translator in line with the
Beam model - instead of "ignoring" windows, the GBK
is a perfectly valid GBK for GlobalWindows.
Previously, the SDK's runner test suite would fail
due to the lack of a translator - now some of them
will fail due to windowing support, but others have
a chance.
commit 6d00cba87a16ff8f2318db87dab23cc09b7d1c20
Author: Aljoscha Krettek <[email protected]>
Date: 2016-05-06T06:26:50Z
Fix Dangling Flink DataSets
commit af7f8580024466dd85cca8a2c070fd1db67490d4
Author: Aljoscha Krettek <[email protected]>
Date: 2016-05-06T07:38:55Z
Add hamcrest dep to Flink Runner
commit abad0377e31235134a735e3a2012199bd98b6b16
Author: Kenneth Knowles <[email protected]>
Date: 2016-05-06T17:32:33Z
Add RequiresFixedWindows test category
commit b98ee40a9ea32ec1e5f401eacd2300573cea4b7d
Author: Kenneth Knowles <[email protected]>
Date: 2016-05-06T17:33:05Z
Exclude RequiresFixedWindows test category from Flink batch tests
commit 0cc7363f8324a3ee0aee687d6e433d7db34de093
Author: Aljoscha Krettek <[email protected]>
Date: 2016-05-06T11:09:28Z
Fix Flink Create and GroupByNullKeyTest, Remove Special VoidSerializer
commit 5fa21dfa04c4d2cd259af01218e686d55be70b5c
Author: Aljoscha Krettek <[email protected]>
Date: 2016-05-06T11:31:18Z
Fix faulty Flink Flatten with empty PCollectionList
commit 30818e8f09375bbaa1691962c1a61b7d9206ef86
Author: Aljoscha Krettek <[email protected]>
Date: 2016-05-06T14:39:02Z
Fix Flink Batch Partial Combine/Combine
We're now using a PerKeyCombineFnRunner for all interaction with the
CombineFn. This required adding a proper ProcessContext in
FlinkReduceFunction and FlinkPartialReduceFunction, along with adding
support for side inputs there.
commit e663de12a9e1fe9dde873d0f4b1c4da1d6819e10
Author: Aljoscha Krettek <[email protected]>
Date: 2016-05-06T14:43:31Z
Disable MaybeEmptyTestITCase
This does not work because Flink Batch does not allow sending null
elements. This is a pretty deep thing and hard to fix.
In an earlier commit I removed the special TypeSerializer for VoidCoder.
Before, we got away by always intercepting the VoidCoder and wrapping it in
a
TypeSerializer that would always emit a VoidValue instead of a proper
null. If the user fn reads this, this will not be consistent with how it
should behave, however.
commit 130665531db6725b2e2bae3d718176428a099c76
Author: Kenneth Knowles <[email protected]>
Date: 2016-05-06T17:54:41Z
Remove unused threadCount from integration tests
commit c3f71e2742a5a0b15601b35f1c15164e14d068d6
Author: Kenneth Knowles <[email protected]>
Date: 2016-05-06T17:55:16Z
Disable Flink streaming integration tests for now
commit a0376fffea1b363b07adbe25bc87a75a09a98cd5
Author: Kenneth Knowles <[email protected]>
Date: 2016-05-06T18:19:33Z
Change PAssert's dummy inputs from (Void) null to integer 0
commit d60229d7dc79b42e721fe6494939cd8fc3eeefa7
Author: Kenneth Knowles <[email protected]>
Date: 2016-05-06T19:49:55Z
Special casing job exec AssertionError in TestFlinkPipelineRunner
commit 054a42cbaf6c437facfe958f8a2e5dadb59d3e9a
Author: Aljoscha Krettek <[email protected]>
Date: 2016-05-09T10:40:27Z
Use Int instead of Void in Combine.globally default insertion
The single null value is only used as a dummy, thus can also be an
integer. This makes it work with runners that don't support sending null
values.
commit 8f7f9fbec1a23b098973d15cb60ee3bc55749c2f
Author: Aljoscha Krettek <[email protected]>
Date: 2016-05-09T12:07:57Z
Fix accumulator update in Flink Reduce Function
commit 7abf64badb0c9c04b21ed91a96c15200d8b4513c
Author: Aljoscha Krettek <[email protected]>
Date: 2016-05-09T12:08:22Z
Use Int instead of Void as dummy key in Combine.globally
This makes it work with runners that don't support sending null
values.
commit 6023361838363640815af370aa13572be9571b9c
Author: Aljoscha Krettek <[email protected]>
Date: 2016-05-09T12:56:36Z
Use Int instead of Void in Sample
The single null value is only used as a dummy, thus can also be an
integer. This makes it work with runners that don't support sending null
values.
commit fa0f7692e86ffc8057fd6559aa16ca7a89d567fb
Author: Aljoscha Krettek <[email protected]>
Date: 2016-05-09T12:57:26Z
Use Int instead of Void in FlattenTest
The single null value is only used as a dummy, thus can also be an
integer. This makes it work with runners that don't support sending null
values.
commit 19c4cd3332597462c1c6036d0ce2f4429f3bf408
Author: Kenneth Knowles <[email protected]>
Date: 2016-05-09T16:51:39Z
Add RequiresTimestampControl category and tag some tests
commit 55efae45936d602ec1fc4ed47551762a82bc4e91
Author: Kenneth Knowles <[email protected]>
Date: 2016-05-09T16:58:19Z
Exclude groups from Flink batch integration tests
commit 99fe94c36ac55cae30ff68e7a7a00180ed69e72a
Author: Kenneth Knowles <[email protected]>
Date: 2016-05-09T18:08:57Z
fixup! fix checkstyle of FlattenTest
commit 1ee56853e025753cc32b6a76d0551f22b387980a
Author: Aljoscha Krettek <[email protected]>
Date: 2016-05-10T11:53:03Z
[BEAM-270] Support Timestamps/Windows in Flink Batch
With this change we always use WindowedValue<T> for the underlying Flink
DataSets instead of just T. This allows us to support windowing as well.
This changes also a lot of other stuff enabled by the above:
- Use WindowedValue throughout
- Add proper translation for Window.into()
- Make side inputs window aware
- Make GroupByKey and Combine transformations window aware, this
includes support for merging windows. GroupByKey is implemented as a
Combine with a concatenating CombineFn, for simplicity
This removes Flink specific transformations for things that are handled
by builtin sources/sinks, among other things this:
- Removes special translation for AvroIO.Read/Write and
TextIO.Read/Write
- Removes special support for Write.Bound, this was not working properly
and is now handled by the Beam machinery that uses DoFns for this
- Removes special translation for binary Co-Group, the code was still
in there but was never used
With this change all RunnableOnService tests run on Flink Batch.
----
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---