[ 
https://issues.apache.org/jira/browse/BEAM-4643?focusedWorklogId=152951&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-152951
 ]

ASF GitHub Bot logged work on BEAM-4643:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 09/Oct/18 22:46
            Start Date: 09/Oct/18 22:46
    Worklog Time Spent: 10m 
      Work Description: akedin closed pull request #5811: [BEAM-4643] Allow to 
check early panes of a window
URL: https://github.com/apache/beam/pull/5811
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/PAssert.java 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/PAssert.java
index cfcb16291ac..cedc594fd49 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/PAssert.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/PAssert.java
@@ -250,6 +250,15 @@ public int hashCode() {
      */
     IterableAssert<T> inOnTimePane(BoundedWindow window);
 
+    /**
+     * Creates a new {@link IterableAssert} like this one, but with the 
assertion restricted to only
+     * run on the provided window across all panes that were produced by the 
arrival of early data.
+     *
+     * @return a new {@link IterableAssert} like this one but with the 
assertion only applied to the
+     *     specified window.
+     */
+    IterableAssert<T> inEarlyPane(BoundedWindow window);
+
     /**
      * Creates a new {@link IterableAssert} like this one, but with the 
assertion restricted to only
      * run on the provided window across all panes that were not produced by 
the arrival of late
@@ -336,6 +345,15 @@ public int hashCode() {
      */
     SingletonAssert<T> inOnTimePane(BoundedWindow window);
 
+    /**
+     * Creates a new {@link SingletonAssert} like this one, but with the 
assertion restricted to
+     * only run on the provided window, running the checker only on early 
panes for each key.
+     *
+     * @return a new {@link SingletonAssert} like this one but with the 
assertion only applied to
+     *     the specified window.
+     */
+    SingletonAssert<T> inEarlyPane(BoundedWindow window);
+
     /**
      * Asserts that the value in question is equal to the provided value, 
according to {@link
      * Object#equals}.
@@ -513,6 +531,11 @@ public PCollectionContentsAssert(
       return withPane(window, PaneExtractors.onTimePane());
     }
 
+    @Override
+    public PCollectionContentsAssert<T> inEarlyPane(BoundedWindow window) {
+      return withPane(window, PaneExtractors.earlyPanes());
+    }
+
     @Override
     public PCollectionContentsAssert<T> inCombinedNonLatePanes(BoundedWindow 
window) {
       return withPane(window, PaneExtractors.nonLatePanes());
@@ -700,6 +723,11 @@ public PCollectionSingletonIterableAssert(
       return withPanes(window, PaneExtractors.onTimePane());
     }
 
+    @Override
+    public PCollectionSingletonIterableAssert<T> inEarlyPane(BoundedWindow 
window) {
+      return withPanes(window, PaneExtractors.earlyPanes());
+    }
+
     @Override
     public PCollectionSingletonIterableAssert<T> 
inCombinedNonLatePanes(BoundedWindow window) {
       return withPanes(window, PaneExtractors.nonLatePanes());
@@ -805,6 +833,11 @@ private PCollectionViewAssert(
       return inPane(window, PaneExtractors.onTimePane());
     }
 
+    @Override
+    public PCollectionViewAssert<ElemT, ViewT> inEarlyPane(BoundedWindow 
window) {
+      return inPane(window, PaneExtractors.earlyPanes());
+    }
+
     private PCollectionViewAssert<ElemT, ViewT> inPane(
         BoundedWindow window,
         SimpleFunction<Iterable<ValueInSingleWindow<ElemT>>, Iterable<ElemT>> 
paneExtractor) {
diff --git 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/TestStreamTest.java 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/TestStreamTest.java
index 2aab957d902..a448694d3d8 100644
--- 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/TestStreamTest.java
+++ 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/TestStreamTest.java
@@ -20,10 +20,12 @@
 
 import static org.hamcrest.Matchers.allOf;
 import static org.hamcrest.Matchers.greaterThanOrEqualTo;
+import static org.hamcrest.Matchers.is;
 import static org.hamcrest.Matchers.lessThanOrEqualTo;
 import static org.junit.Assert.assertThat;
 
 import java.io.Serializable;
+import java.util.stream.StreamSupport;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
 import org.apache.beam.sdk.coders.VarIntCoder;
 import org.apache.beam.sdk.coders.VarLongCoder;
@@ -32,6 +34,7 @@
 import org.apache.beam.sdk.transforms.Count;
 import org.apache.beam.sdk.transforms.Flatten;
 import org.apache.beam.sdk.transforms.GroupByKey;
+import org.apache.beam.sdk.transforms.MapElements;
 import org.apache.beam.sdk.transforms.Sum;
 import org.apache.beam.sdk.transforms.Values;
 import org.apache.beam.sdk.transforms.WithKeys;
@@ -46,8 +49,10 @@
 import org.apache.beam.sdk.transforms.windowing.Never;
 import org.apache.beam.sdk.transforms.windowing.Window;
 import org.apache.beam.sdk.transforms.windowing.Window.ClosingBehavior;
+import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.TimestampedValue;
+import org.apache.beam.sdk.values.TypeDescriptors;
 import org.joda.time.Duration;
 import org.joda.time.Instant;
 import org.junit.Rule;
@@ -328,4 +333,56 @@ public void 
testAdvanceWatermarkEqualToPositiveInfinityThrows() {
     thrown.expect(IllegalArgumentException.class);
     stream.advanceWatermarkTo(BoundedWindow.TIMESTAMP_MAX_VALUE);
   }
+
+  @Test
+  @Category({NeedsRunner.class, UsesTestStream.class})
+  public void testEarlyPanesOfWindow() {
+    TestStream<Long> source =
+        TestStream.create(VarLongCoder.of())
+            .addElements(TimestampedValue.of(1L, new Instant(1000L)))
+            .advanceProcessingTime(Duration.standardMinutes(6)) // Fire early 
pane
+            .addElements(TimestampedValue.of(2L, new Instant(2000L)))
+            .advanceProcessingTime(Duration.standardMinutes(6)) // Fire early 
pane
+            .addElements(TimestampedValue.of(3L, new Instant(3000L)))
+            .advanceProcessingTime(Duration.standardMinutes(6)) // Fire early 
pane
+            .advanceWatermarkToInfinity(); // Fire on-time pane
+
+    PCollection<KV<String, Long>> sum =
+        p.apply(source)
+            .apply(
+                
Window.<Long>into(FixedWindows.of(Duration.standardMinutes(30)))
+                    .triggering(
+                        AfterWatermark.pastEndOfWindow()
+                            .withEarlyFirings(
+                                AfterProcessingTime.pastFirstElementInPane()
+                                    .plusDelayOf(Duration.standardMinutes(5))))
+                    .accumulatingFiredPanes()
+                    .withAllowedLateness(Duration.ZERO))
+            .apply(
+                MapElements.into(
+                        TypeDescriptors.kvs(TypeDescriptors.strings(), 
TypeDescriptors.longs()))
+                    .via(v -> KV.of("key", v)))
+            .apply(Sum.longsPerKey());
+
+    IntervalWindow window =
+        new IntervalWindow(new Instant(0L), new 
Instant(0L).plus(Duration.standardMinutes(30)));
+
+    PAssert.that(sum)
+        .inEarlyPane(window)
+        .satisfies(
+            input -> {
+              assertThat(StreamSupport.stream(input.spliterator(), 
false).count(), is(3L));
+              return null;
+            })
+        .containsInAnyOrder(KV.of("key", 1L), KV.of("key", 3L), KV.of("key", 
6L))
+        .inOnTimePane(window)
+        .satisfies(
+            input -> {
+              assertThat(StreamSupport.stream(input.spliterator(), 
false).count(), is(1L));
+              return null;
+            })
+        .containsInAnyOrder(KV.of("key", 6L));
+
+    p.run().waitUntilFinish();
+  }
 }


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


Issue Time Tracking
-------------------

            Worklog Id:     (was: 152951)
            Time Spent: 2h 10m  (was: 2h)
    Remaining Estimate: 21h 50m  (was: 22h)

> Allow to check early panes of a window
> --------------------------------------
>
>                 Key: BEAM-4643
>                 URL: https://issues.apache.org/jira/browse/BEAM-4643
>             Project: Beam
>          Issue Type: Improvement
>          Components: sdk-java-core, testing
>    Affects Versions: 2.5.0
>            Reporter: Logan HAUSPIE
>            Assignee: Logan HAUSPIE
>            Priority: Minor
>   Original Estimate: 24h
>          Time Spent: 2h 10m
>  Remaining Estimate: 21h 50m
>
> What I would like to do is:
> {{PAssert.that(teamScores)}}
>  {{    .inEarlyPanes(intervalWindow(05, 20))}}
>  {{        .containsInAnyOrder(KV.of("black", 1), KV.of("black", 2)) // 
> Window triggered 2 times earlier (black, 1) + (black, 1)}}
>  {{    .inOnTimePane(intervalWindow(05, 20))}}
>  {{         .containsInAnyOrder(KV.of("black", 2)) // Then triggered again by 
> reach the watermark (no additionnal data)}}
>  {{    .inFinalPane(intervalWindow(05, 20))}}
>  {{         .containsInAnyOrder(KV.of("black", 10))}}{{; // And then fired by 
> receiving a late data (black, 8)}}
> NB: intervalWindow(05, 20) return an IntervalWindow from 5 minutes to 20 
> minutes
>  
> The workaround I found is to filter the PCollection to keep only the EARLY 
> elements with this method:
> {{public static <T> PCollection<T> filter(PCollection<T> values, 
> PaneInfo.Timing timing) {}}
> {{  PCollection<T> filtered = values}}
> {{      .apply("Wrap into ValueInSingleWindow for filtering",}}
> {{          ParDo.of(}}
> {{              new DoFn<T, ValueInSingleWindow<T>>() {}}
> {{                  @ProcessElement}}
> {{                  public void processElement(ProcessContext c, 
> BoundedWindow window) {}}
> {{                    
> c.outputWithTimestamp(ValueInSingleWindow.of(c.element(), c.timestamp(), 
> window, c.pane()), c.timestamp());}}
> {{                    }}}
> {{              }}}
> {{          )}}
> {{      )}}
>  {{      .setCoder(}}
>  {{          ValueInSingleWindow.Coder.of(}}
>  {{              values.getCoder(), 
> values.getWindowingStrategy().getWindowFn().windowCoder()}}
> {{          )}}
> {{      )}}
> {{      .apply(Filter.by(a -> a.getPane().getTiming() == timing))}}
> {{      .apply("Unwrap from ValueInSingleWindow for filtering",}}
> {{          ParDo.of(}}
> {{              new DoFn<ValueInSingleWindow<T>, T>() {}}
> {{                @ProcessElement}}
> {{                public void processElement(ProcessContext c, BoundedWindow 
> window) {}}
> {{                   c.outputWithTimestamp(c.element().getValue(), 
> c.timestamp());}}
> {{                }}}
> {{          }}}
> {{      ));}}
> {{  return filtered;}}
> {{ }}}
>  
> And then check the AllPanes of the window :
> {{PAssert.that(filter(teamScores, PaneInfo.Timing.EARLY))}}
>  {{   .inWindow(intervalWindow(05, 20))}}
>  {{          .containsInAnyOrder(KV.of("black", 1), KV.of("black", 2));}}
>  {{PAssert.that(teamScores)}}
>  {{   .inOnTimePane(intervalWindow(05, 20))}}
>  {{          .containsInAnyOrder(KV.of("black", 2))}}
>  {{   .inFinalPane(intervalWindow(05, 20))}}
>  {{          .containsInAnyOrder(KV.of("black", 10))}}{{;}}
>  
> But it's a bit overkill.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to