Logan HAUSPIE created BEAM-4643:
-----------------------------------

             Summary: Allow to check early panes of a window
                 Key: BEAM-4643
                 URL: https://issues.apache.org/jira/browse/BEAM-4643
             Project: Beam
          Issue Type: Improvement
          Components: sdk-java-core, testing
    Affects Versions: 2.5.0
            Reporter: Logan HAUSPIE
            Assignee: Kenneth Knowles


What I would like to do is:

{{PAssert.that(teamScores)}}
 {{   .inEarlyPanes(intervalWindow(05, 20))}}
 {{          .containsInAnyOrder(KV.of("black", 1), KV.of("black", 2)) // 
Window triggered 2 times earlier (black, 1) + (black, 1)}}
 {{   .inOnTimePane(intervalWindow(05, 20))}}
 {{          .containsInAnyOrder(KV.of("black", 2)) // Then triggered again by 
reach the watermark (no additionnal data)}}
 {{   .inFinalPane(intervalWindow(05, 20))}}
 {{          .containsInAnyOrder(KV.of("black", 10))}}{{; // And then fired by 
receiving a late data (black, 8)}}

NB: intervalWindow(05, 20) return an IntervalWindow from 5 minutes to 20 minutes

 

The workaround I found is to filter the PCollection to keep only the EARLY 
elements with this method:

{{public static <T> PCollection<T> filter(PCollection<T> values, 
PaneInfo.Timing timing) {}}
{{  PCollection<T> filtered = values}}
{{      .apply("Wrap into ValueInSingleWindow for filtering",}}
{{          ParDo.of(}}
{{              new DoFn<T, ValueInSingleWindow<T>>() {}}
{{                  @ProcessElement}}
{{                  public void processElement(ProcessContext c, BoundedWindow 
window) {}}
{{                    c.outputWithTimestamp(ValueInSingleWindow.of(c.element(), 
c.timestamp(), window, c.pane()), c.timestamp());}}
{{                  }}}
{{              }}}

{{          )}}

{{      )}}
{{  .setCoder(}}
{{      ValueInSingleWindow.Coder.of(}}
{{          values.getCoder(), 
values.getWindowingStrategy().getWindowFn().windowCoder()}}

{{      )}}

{{  )}}
{{  .apply(Filter.by(a -> a.getPane().getTiming() == timing))}}
{{  .apply("Unwrap from ValueInSingleWindow for filtering",}}
{{      ParDo.of(}}
{{          new DoFn<ValueInSingleWindow<T>, T>() {}}
{{            @ProcessElement}}
{{            public void processElement(ProcessContext c, BoundedWindow 
window) {}}
{{              c.outputWithTimestamp(c.element().getValue(), c.timestamp());}}
{{            }}}
{{          }}}

{{      ));}}{{ return filtered;}}
{{}}}

 

And then check the AllPanes of the window :

{{PAssert.that(filter(teamScores, PaneInfo.Timing.EARLY))}}
{{   .inWindow(intervalWindow(05, 20))}}
{{          .containsInAnyOrder(KV.of("black", 1), KV.of("black", 2));}}
{{PAssert.that(teamScores)}}{{}}
{{   .inOnTimePane(intervalWindow(05, 20))}}
{{          .containsInAnyOrder(KV.of("black", 2))}}
{{   .inFinalPane(intervalWindow(05, 20))}}
{{          .containsInAnyOrder(KV.of("black", 10))}}{{;}}

 

But it's a bit overkill.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to