Repository: beam Updated Branches: refs/heads/master be5b9347b -> 0d927ef6a
Add GroupByKey tests for Multiple & Merging windows This gives explicit coverage to a GroupByKey where the elements are in multiple windows, or in merging windows. Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/1e947045 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/1e947045 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/1e947045 Branch: refs/heads/master Commit: 1e947045a54bd59b449fd56f8f5f50879b6d9c4c Parents: be5b934 Author: Thomas Groh <tg...@google.com> Authored: Mon Jul 17 13:38:11 2017 -0700 Committer: Thomas Groh <tg...@google.com> Committed: Tue Jul 18 17:52:55 2017 -0700 ---------------------------------------------------------------------- .../beam/sdk/transforms/GroupByKeyTest.java | 156 +++++++++++++++---- 1 file changed, 122 insertions(+), 34 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/1e947045/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByKeyTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByKeyTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByKeyTest.java index 4b5d5f5..8fcb4c0 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByKeyTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByKeyTest.java @@ -23,18 +23,20 @@ import static org.hamcrest.CoreMatchers.equalTo; import static org.hamcrest.CoreMatchers.hasItem; import static org.hamcrest.Matchers.empty; import static org.hamcrest.collection.IsIterableContainingInAnyOrder.containsInAnyOrder; -import static org.hamcrest.core.Is.is; import static org.junit.Assert.assertThat; import com.google.common.base.Function; +import com.google.common.collect.ImmutableList; import com.google.common.collect.Iterables; import java.io.DataInputStream; import java.io.DataOutputStream; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; +import java.io.Serializable; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collection; import java.util.List; import java.util.Map; import java.util.concurrent.ThreadLocalRandom; @@ -56,9 +58,12 @@ import org.apache.beam.sdk.testing.ValidatesRunner; import org.apache.beam.sdk.transforms.display.DisplayData; import org.apache.beam.sdk.transforms.windowing.AfterProcessingTime; import org.apache.beam.sdk.transforms.windowing.FixedWindows; +import org.apache.beam.sdk.transforms.windowing.GlobalWindow; +import org.apache.beam.sdk.transforms.windowing.IntervalWindow; import org.apache.beam.sdk.transforms.windowing.InvalidWindows; import org.apache.beam.sdk.transforms.windowing.Repeatedly; import org.apache.beam.sdk.transforms.windowing.Sessions; +import org.apache.beam.sdk.transforms.windowing.SlidingWindows; import org.apache.beam.sdk.transforms.windowing.TimestampCombiner; import org.apache.beam.sdk.transforms.windowing.Window; import org.apache.beam.sdk.values.KV; @@ -67,6 +72,7 @@ import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.TimestampedValue; import org.apache.beam.sdk.values.TypeDescriptor; import org.apache.beam.sdk.values.WindowingStrategy; +import org.hamcrest.Matcher; import org.joda.time.Duration; import org.joda.time.Instant; import org.junit.Assert; @@ -82,13 +88,13 @@ import org.junit.runners.JUnit4; */ @RunWith(JUnit4.class) @SuppressWarnings({"rawtypes", "unchecked"}) -public class GroupByKeyTest { +public class GroupByKeyTest implements Serializable { @Rule - public final TestPipeline p = TestPipeline.create(); + public transient TestPipeline p = TestPipeline.create(); @Rule - public ExpectedException thrown = ExpectedException.none(); + public transient ExpectedException thrown = ExpectedException.none(); @Test @Category(ValidatesRunner.class) @@ -109,27 +115,18 @@ public class GroupByKeyTest { PCollection<KV<String, Iterable<Integer>>> output = input.apply(GroupByKey.<String, Integer>create()); - PAssert.that(output) - .satisfies(new AssertThatHasExpectedContentsForTestGroupByKey()); + SerializableFunction<Iterable<KV<String, Iterable<Integer>>>, Void> checker = + containsKvs( + kv("k1", 3, 4), + kv("k5", Integer.MIN_VALUE, Integer.MAX_VALUE), + kv("k2", 66, -33), + kv("k3", 0)); + PAssert.that(output).satisfies(checker); + PAssert.that(output).inWindow(GlobalWindow.INSTANCE).satisfies(checker); p.run(); } - static class AssertThatHasExpectedContentsForTestGroupByKey - implements SerializableFunction<Iterable<KV<String, Iterable<Integer>>>, - Void> { - @Override - public Void apply(Iterable<KV<String, Iterable<Integer>>> actual) { - assertThat(actual, containsInAnyOrder( - isKv(is("k1"), containsInAnyOrder(3, 4)), - isKv(is("k5"), containsInAnyOrder(Integer.MAX_VALUE, - Integer.MIN_VALUE)), - isKv(is("k2"), containsInAnyOrder(66, -33)), - isKv(is("k3"), containsInAnyOrder(0)))); - return null; - } - } - @Test @Category(ValidatesRunner.class) public void testGroupByKeyAndWindows() { @@ -150,24 +147,115 @@ public class GroupByKeyTest { .apply(GroupByKey.<String, Integer>create()); PAssert.that(output) - .satisfies(new AssertThatHasExpectedContentsForTestGroupByKeyAndWindows()); + .satisfies( + containsKvs( + kv("k1", 3), + kv("k1", 4), + kv("k5", Integer.MAX_VALUE, Integer.MIN_VALUE), + kv("k2", 66), + kv("k2", -33), + kv("k3", 0))); + PAssert.that(output) + .inWindow(new IntervalWindow(new Instant(0L), Duration.millis(5L))) + .satisfies( + containsKvs(kv("k1", 3), kv("k5", Integer.MIN_VALUE, Integer.MAX_VALUE), kv("k2", 66))); + PAssert.that(output) + .inWindow(new IntervalWindow(new Instant(5L), Duration.millis(5L))) + .satisfies(containsKvs(kv("k1", 4), kv("k2", -33), kv("k3", 0))); + + p.run(); + } + + @Test + @Category(ValidatesRunner.class) + public void testGroupByKeyMultipleWindows() { + PCollection<KV<String, Integer>> windowedInput = + p.apply( + Create.timestamped( + TimestampedValue.of(KV.of("foo", 1), new Instant(1)), + TimestampedValue.of(KV.of("foo", 4), new Instant(4)), + TimestampedValue.of(KV.of("bar", 3), new Instant(3)))) + .apply( + Window.<KV<String, Integer>>into( + SlidingWindows.of(Duration.millis(5L)).every(Duration.millis(3L)))); + + PCollection<KV<String, Iterable<Integer>>> output = + windowedInput.apply(GroupByKey.<String, Integer>create()); + + PAssert.that(output) + .satisfies( + containsKvs(kv("foo", 1, 4), kv("foo", 1), kv("foo", 4), kv("bar", 3), kv("bar", 3))); + PAssert.that(output) + .inWindow(new IntervalWindow(new Instant(-3L), Duration.millis(5L))) + .satisfies(containsKvs(kv("foo", 1))); + PAssert.that(output) + .inWindow(new IntervalWindow(new Instant(0L), Duration.millis(5L))) + .satisfies(containsKvs(kv("foo", 1, 4), kv("bar", 3))); + PAssert.that(output) + .inWindow(new IntervalWindow(new Instant(3L), Duration.millis(5L))) + .satisfies(containsKvs(kv("foo", 4), kv("bar", 3))); + + p.run(); + } + + @Test + @Category(ValidatesRunner.class) + public void testGroupByKeyMergingWindows() { + PCollection<KV<String, Integer>> windowedInput = + p.apply( + Create.timestamped( + TimestampedValue.of(KV.of("foo", 1), new Instant(1)), + TimestampedValue.of(KV.of("foo", 4), new Instant(4)), + TimestampedValue.of(KV.of("bar", 3), new Instant(3)), + TimestampedValue.of(KV.of("foo", 9), new Instant(9)))) + .apply(Window.<KV<String, Integer>>into(Sessions.withGapDuration(Duration.millis(4L)))); + + PCollection<KV<String, Iterable<Integer>>> output = + windowedInput.apply(GroupByKey.<String, Integer>create()); + + PAssert.that(output).satisfies(containsKvs(kv("foo", 1, 4), kv("foo", 9), kv("bar", 3))); + PAssert.that(output) + .inWindow(new IntervalWindow(new Instant(1L), new Instant(8L))) + .satisfies(containsKvs(kv("foo", 1, 4))); + PAssert.that(output) + .inWindow(new IntervalWindow(new Instant(3L), new Instant(7L))) + .satisfies(containsKvs(kv("bar", 3))); + PAssert.that(output) + .inWindow(new IntervalWindow(new Instant(9L), new Instant(13L))) + .satisfies(containsKvs(kv("foo", 9))); p.run(); } - static class AssertThatHasExpectedContentsForTestGroupByKeyAndWindows - implements SerializableFunction<Iterable<KV<String, Iterable<Integer>>>, - Void> { + private static KV<String, Collection<Integer>> kv(String key, Integer... values) { + return KV.<String, Collection<Integer>>of(key, ImmutableList.copyOf(values)); + } + + private static SerializableFunction<Iterable<KV<String, Iterable<Integer>>>, Void> containsKvs( + KV<String, Collection<Integer>>... kvs) { + return new ContainsKVs(ImmutableList.copyOf(kvs)); + } + + /** + * A function that asserts that the input element contains the expected {@link KV KVs} in any + * order, where values appear in any order. + */ + private static class ContainsKVs + implements SerializableFunction<Iterable<KV<String, Iterable<Integer>>>, Void> { + private final List<KV<String, Collection<Integer>>> expectedKvs; + + private ContainsKVs(List<KV<String, Collection<Integer>>> expectedKvs) { + this.expectedKvs = expectedKvs; + } + @Override - public Void apply(Iterable<KV<String, Iterable<Integer>>> actual) { - assertThat(actual, containsInAnyOrder( - isKv(is("k1"), containsInAnyOrder(3)), - isKv(is("k1"), containsInAnyOrder(4)), - isKv(is("k5"), containsInAnyOrder(Integer.MAX_VALUE, - Integer.MIN_VALUE)), - isKv(is("k2"), containsInAnyOrder(66)), - isKv(is("k2"), containsInAnyOrder(-33)), - isKv(is("k3"), containsInAnyOrder(0)))); + public Void apply(Iterable<KV<String, Iterable<Integer>>> input) { + List<Matcher<? super KV<String, Iterable<Integer>>>> matchers = new ArrayList<>(); + for (KV<String, Collection<Integer>> expected : expectedKvs) { + Integer[] values = expected.getValue().toArray(new Integer[0]); + matchers.add(isKv(equalTo(expected.getKey()), containsInAnyOrder(values))); + } + assertThat(input, containsInAnyOrder(matchers.toArray(new Matcher[0]))); return null; } }