This is an automated email from the ASF dual-hosted git repository.

iemejia pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 3fd92d0  [BEAM-12343] GroupByKeyTest for changing WindowFn from 
GlobalWindow after GBK
     new d93c591  Merge pull request #14667:[BEAM-12343] GroupByKeyTest for 
changing WindowFn from GlobalWindow after GBK
3fd92d0 is described below

commit 3fd92d0df24f671a9f5705860250acc3afd11e47
Author: Jan Lukavsky <je...@seznam.cz>
AuthorDate: Wed Apr 28 09:38:54 2021 +0200

    [BEAM-12343] GroupByKeyTest for changing WindowFn from GlobalWindow after 
GBK
---
 .../apache/beam/sdk/transforms/GroupByKeyTest.java | 40 ++++++++++++++++++++++
 1 file changed, 40 insertions(+)

diff --git 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByKeyTest.java
 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByKeyTest.java
index 88d40ac..fd849a5 100644
--- 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByKeyTest.java
+++ 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByKeyTest.java
@@ -70,8 +70,10 @@ import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PBegin;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.TimestampedValue;
+import org.apache.beam.sdk.values.TypeDescriptors;
 import org.apache.beam.sdk.values.WindowingStrategy;
 import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
+import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Streams;
 import org.checkerframework.checker.nullness.qual.Nullable;
 import org.hamcrest.Matcher;
 import org.joda.time.Duration;
@@ -585,6 +587,44 @@ public class GroupByKeyTest implements Serializable {
     }
 
     @Test
+    @Category(ValidatesRunner.class)
+    public void testRewindowWithTimestampCombiner() {
+      PCollection<KV<String, Integer>> input =
+          p.apply(
+                  Create.timestamped(
+                      TimestampedValue.of(KV.of("foo", 1), new Instant(1)),
+                      TimestampedValue.of(KV.of("foo", 4), new Instant(4)),
+                      TimestampedValue.of(KV.of("bar", 3), new Instant(3)),
+                      TimestampedValue.of(KV.of("foo", 9), new Instant(9))))
+              .apply(
+                  "GlobalWindows",
+                  Window.<KV<String, Integer>>configure()
+                      .withTimestampCombiner(TimestampCombiner.LATEST));
+
+      PCollection<KV<String, Integer>> result =
+          input
+              .apply(GroupByKey.create())
+              .apply(
+                  MapElements.into(
+                          TypeDescriptors.kvs(
+                              TypeDescriptors.strings(), 
TypeDescriptors.integers()))
+                      .via(kv -> KV.of(kv.getKey(), sum(kv.getValue()))))
+              .apply("FixedWindows", 
Window.into(FixedWindows.of(Duration.millis(1))));
+
+      PAssert.that(result)
+          .inWindow(new IntervalWindow(new Instant(9), new Instant(10)))
+          .containsInAnyOrder(KV.of("foo", 14))
+          .inWindow(new IntervalWindow(new Instant(3), new Instant(4)))
+          .containsInAnyOrder(KV.of("bar", 3));
+
+      p.run();
+    }
+
+    private static int sum(Iterable<Integer> parts) {
+      return Streams.stream(parts).mapToInt(e -> e).sum();
+    }
+
+    @Test
     @Category(NeedsRunner.class)
     public void testIdentityWindowFnPropagation() {
 

Reply via email to