This is an automated email from the ASF dual-hosted git repository. echauchot pushed a commit to branch spark-runner_structured-streaming in repository https://gitbox.apache.org/repos/asf/beam.git
commit 982197ce985197e4153d79938db247c84d708fc7 Author: Etienne Chauchot <[email protected]> AuthorDate: Tue May 7 09:35:46 2019 +0200 Update windowAssignTest --- .../translation/batch/WindowAssignTest.java | 19 ++++++++----------- 1 file changed, 8 insertions(+), 11 deletions(-) diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/WindowAssignTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/WindowAssignTest.java index 61da3ea..3011d88 100644 --- a/runners/spark/src/test/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/WindowAssignTest.java +++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/WindowAssignTest.java @@ -27,13 +27,11 @@ import org.apache.beam.sdk.transforms.Create; import org.apache.beam.sdk.transforms.Sum; import org.apache.beam.sdk.transforms.windowing.FixedWindows; import org.apache.beam.sdk.transforms.windowing.Window; -import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.TimestampedValue; import org.joda.time.Duration; import org.joda.time.Instant; import org.junit.BeforeClass; -import org.junit.Ignore; import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.JUnit4; @@ -50,20 +48,19 @@ public class WindowAssignTest implements Serializable { p = Pipeline.create(options); } - @Ignore @Test public void testWindowAssign() { - PCollection<KV<Integer, Integer>> input = + PCollection<Integer> input = p.apply( Create.timestamped( - TimestampedValue.of(KV.of(1, 1), new Instant(1)), - TimestampedValue.of(KV.of(1, 2), new Instant(2)), - TimestampedValue.of(KV.of(1, 3), new Instant(3)), - TimestampedValue.of(KV.of(1, 4), new Instant(10)), - TimestampedValue.of(KV.of(1, 5), new Instant(11)))) + TimestampedValue.of(1, new Instant(1)), + TimestampedValue.of(2, new Instant(2)), + TimestampedValue.of(3, new Instant(3)), + TimestampedValue.of(4, new Instant(10)), + TimestampedValue.of(5, new Instant(11)))) .apply(Window.into(FixedWindows.of(Duration.millis(10)))) - .apply(Sum.integersPerKey()); - PAssert.that(input).containsInAnyOrder(KV.of(1, 6), KV.of(1, 9)); + .apply(Sum.integersGlobally().withoutDefaults()); + PAssert.that(input).containsInAnyOrder(6, 9); p.run(); } }
