This is an automated email from the ASF dual-hosted git repository. iemejia pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push: new 3fd92d0 [BEAM-12343] GroupByKeyTest for changing WindowFn from GlobalWindow after GBK new d93c591 Merge pull request #14667:[BEAM-12343] GroupByKeyTest for changing WindowFn from GlobalWindow after GBK 3fd92d0 is described below commit 3fd92d0df24f671a9f5705860250acc3afd11e47 Author: Jan Lukavsky <je...@seznam.cz> AuthorDate: Wed Apr 28 09:38:54 2021 +0200 [BEAM-12343] GroupByKeyTest for changing WindowFn from GlobalWindow after GBK --- .../apache/beam/sdk/transforms/GroupByKeyTest.java | 40 ++++++++++++++++++++++ 1 file changed, 40 insertions(+) diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByKeyTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByKeyTest.java index 88d40ac..fd849a5 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByKeyTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByKeyTest.java @@ -70,8 +70,10 @@ import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PBegin; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.TimestampedValue; +import org.apache.beam.sdk.values.TypeDescriptors; import org.apache.beam.sdk.values.WindowingStrategy; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Streams; import org.checkerframework.checker.nullness.qual.Nullable; import org.hamcrest.Matcher; import org.joda.time.Duration; @@ -585,6 +587,44 @@ public class GroupByKeyTest implements Serializable { } @Test + @Category(ValidatesRunner.class) + public void testRewindowWithTimestampCombiner() { + PCollection<KV<String, Integer>> input = + p.apply( + Create.timestamped( + TimestampedValue.of(KV.of("foo", 1), new Instant(1)), + TimestampedValue.of(KV.of("foo", 4), new Instant(4)), + TimestampedValue.of(KV.of("bar", 3), new Instant(3)), + TimestampedValue.of(KV.of("foo", 9), new Instant(9)))) + .apply( + "GlobalWindows", + Window.<KV<String, Integer>>configure() + .withTimestampCombiner(TimestampCombiner.LATEST)); + + PCollection<KV<String, Integer>> result = + input + .apply(GroupByKey.create()) + .apply( + MapElements.into( + TypeDescriptors.kvs( + TypeDescriptors.strings(), TypeDescriptors.integers())) + .via(kv -> KV.of(kv.getKey(), sum(kv.getValue())))) + .apply("FixedWindows", Window.into(FixedWindows.of(Duration.millis(1)))); + + PAssert.that(result) + .inWindow(new IntervalWindow(new Instant(9), new Instant(10))) + .containsInAnyOrder(KV.of("foo", 14)) + .inWindow(new IntervalWindow(new Instant(3), new Instant(4))) + .containsInAnyOrder(KV.of("bar", 3)); + + p.run(); + } + + private static int sum(Iterable<Integer> parts) { + return Streams.stream(parts).mapToInt(e -> e).sum(); + } + + @Test @Category(NeedsRunner.class) public void testIdentityWindowFnPropagation() {