Fixes to timestamps in GroupAlsoByWindowsProperties These properties had poor test coverage, so their timestamps were not updated alongside the new default for OutputTimeFn.
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/55aae464 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/55aae464 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/55aae464 Branch: refs/heads/master Commit: 55aae464530414f6e3ebe1103c32e39e6fc98a6f Parents: d94a6f1 Author: Kenneth Knowles <[email protected]> Authored: Tue May 24 13:11:40 2016 -0700 Committer: Kenneth Knowles <[email protected]> Committed: Tue May 24 13:11:40 2016 -0700 ---------------------------------------------------------------------- .../apache/beam/sdk/util/GroupAlsoByWindowsProperties.java | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/55aae464/sdks/java/core/src/test/java/org/apache/beam/sdk/util/GroupAlsoByWindowsProperties.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/GroupAlsoByWindowsProperties.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/GroupAlsoByWindowsProperties.java index 4518f9f..c4f3c8b 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/GroupAlsoByWindowsProperties.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/GroupAlsoByWindowsProperties.java @@ -191,8 +191,9 @@ public class GroupAlsoByWindowsProperties { CombineFn<Long, ?, Long> combineFn) throws Exception { - WindowingStrategy<?, IntervalWindow> windowingStrategy = WindowingStrategy.of( - SlidingWindows.of(Duration.millis(20)).every(Duration.millis(10))); + WindowingStrategy<?, IntervalWindow> windowingStrategy = + WindowingStrategy.of(SlidingWindows.of(Duration.millis(20)).every(Duration.millis(10))) + .withOutputTimeFn(OutputTimeFns.outputAtEarliestInputTimestamp()); List<WindowedValue<KV<String, Long>>> result = runGABW(gabwFactory, windowingStrategy, "k", @@ -360,14 +361,14 @@ public class GroupAlsoByWindowsProperties { KvMatcher.isKv( equalTo("k"), equalTo(combineFn.apply(ImmutableList.of(1L, 2L)))), - 0, // aggregate timestamp + window(0, 15).maxTimestamp().getMillis(), // aggregate timestamp 0, // window start 15), // window end WindowMatchers.isSingleWindowedValue( KvMatcher.isKv( equalTo("k"), equalTo(combineFn.apply(ImmutableList.of(4L)))), - 15, // aggregate timestamp + window(15, 25).maxTimestamp().getMillis(), // aggregate timestamp 15, // window start 25))); // window end }
