Tidy a troublesome TestStreamTest
Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/555ba40d Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/555ba40d Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/555ba40d Branch: refs/heads/master Commit: 555ba40d5934694476b5337b88276625252d684e Parents: a593e49 Author: Kenneth Knowles <[email protected]> Authored: Fri Oct 27 10:51:38 2017 -0700 Committer: Kenneth Knowles <[email protected]> Committed: Mon Nov 13 15:03:21 2017 -0800 ---------------------------------------------------------------------- .../apache/beam/sdk/testing/TestStreamTest.java | 25 +++++++++++--------- 1 file changed, 14 insertions(+), 11 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/555ba40d/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/TestStreamTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/TestStreamTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/TestStreamTest.java index bef6aa0..2f147dc 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/TestStreamTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/TestStreamTest.java @@ -240,21 +240,24 @@ public class TestStreamTest implements Serializable { @Category({NeedsRunner.class, UsesTestStream.class}) public void testElementsAtAlmostPositiveInfinity() { Instant endOfGlobalWindow = GlobalWindow.INSTANCE.maxTimestamp(); - TestStream<String> stream = TestStream.create(StringUtf8Coder.of()) - .addElements(TimestampedValue.of("foo", endOfGlobalWindow), - TimestampedValue.of("bar", endOfGlobalWindow)) - .advanceWatermarkToInfinity(); + TestStream<String> stream = + TestStream.create(StringUtf8Coder.of()) + .addElements( + TimestampedValue.of("foo", endOfGlobalWindow), + TimestampedValue.of("bar", endOfGlobalWindow)) + .advanceWatermarkToInfinity(); FixedWindows windows = FixedWindows.of(Duration.standardHours(6)); - PCollection<String> windowedValues = p.apply(stream) - .apply(Window.<String>into(windows)) - .apply(WithKeys.<Integer, String>of(1)) - .apply(GroupByKey.<Integer, String>create()) - .apply(Values.<Iterable<String>>create()) - .apply(Flatten.<String>iterables()); + PCollection<String> windowedValues = + p.apply(stream) + .apply(Window.<String>into(windows)) + .apply(WithKeys.<Integer, String>of(1)) + .apply(GroupByKey.<Integer, String>create()) + .apply(Values.<Iterable<String>>create()) + .apply(Flatten.<String>iterables()); PAssert.that(windowedValues) - .inWindow(windows.assignWindow(GlobalWindow.INSTANCE.maxTimestamp())) + .inWindow(windows.assignWindow(endOfGlobalWindow)) .containsInAnyOrder("foo", "bar"); p.run(); }
