This is an automated email from the ASF dual-hosted git repository.
iemejia pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 3fd92d0 [BEAM-12343] GroupByKeyTest for changing WindowFn from
GlobalWindow after GBK
new d93c591 Merge pull request #14667:[BEAM-12343] GroupByKeyTest for
changing WindowFn from GlobalWindow after GBK
3fd92d0 is described below
commit 3fd92d0df24f671a9f5705860250acc3afd11e47
Author: Jan Lukavsky <[email protected]>
AuthorDate: Wed Apr 28 09:38:54 2021 +0200
[BEAM-12343] GroupByKeyTest for changing WindowFn from GlobalWindow after
GBK
---
.../apache/beam/sdk/transforms/GroupByKeyTest.java | 40 ++++++++++++++++++++++
1 file changed, 40 insertions(+)
diff --git
a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByKeyTest.java
b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByKeyTest.java
index 88d40ac..fd849a5 100644
---
a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByKeyTest.java
+++
b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByKeyTest.java
@@ -70,8 +70,10 @@ import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PBegin;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.TimestampedValue;
+import org.apache.beam.sdk.values.TypeDescriptors;
import org.apache.beam.sdk.values.WindowingStrategy;
import
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
+import
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Streams;
import org.checkerframework.checker.nullness.qual.Nullable;
import org.hamcrest.Matcher;
import org.joda.time.Duration;
@@ -585,6 +587,44 @@ public class GroupByKeyTest implements Serializable {
}
@Test
+ @Category(ValidatesRunner.class)
+ public void testRewindowWithTimestampCombiner() {
+ PCollection<KV<String, Integer>> input =
+ p.apply(
+ Create.timestamped(
+ TimestampedValue.of(KV.of("foo", 1), new Instant(1)),
+ TimestampedValue.of(KV.of("foo", 4), new Instant(4)),
+ TimestampedValue.of(KV.of("bar", 3), new Instant(3)),
+ TimestampedValue.of(KV.of("foo", 9), new Instant(9))))
+ .apply(
+ "GlobalWindows",
+ Window.<KV<String, Integer>>configure()
+ .withTimestampCombiner(TimestampCombiner.LATEST));
+
+ PCollection<KV<String, Integer>> result =
+ input
+ .apply(GroupByKey.create())
+ .apply(
+ MapElements.into(
+ TypeDescriptors.kvs(
+ TypeDescriptors.strings(),
TypeDescriptors.integers()))
+ .via(kv -> KV.of(kv.getKey(), sum(kv.getValue()))))
+ .apply("FixedWindows",
Window.into(FixedWindows.of(Duration.millis(1))));
+
+ PAssert.that(result)
+ .inWindow(new IntervalWindow(new Instant(9), new Instant(10)))
+ .containsInAnyOrder(KV.of("foo", 14))
+ .inWindow(new IntervalWindow(new Instant(3), new Instant(4)))
+ .containsInAnyOrder(KV.of("bar", 3));
+
+ p.run();
+ }
+
+ private static int sum(Iterable<Integer> parts) {
+ return Streams.stream(parts).mapToInt(e -> e).sum();
+ }
+
+ @Test
@Category(NeedsRunner.class)
public void testIdentityWindowFnPropagation() {