[ 
https://issues.apache.org/jira/browse/BEAM-4643?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Logan HAUSPIE updated BEAM-4643:
--------------------------------
    Description: 
What I would like to do is:

{{PAssert.that(teamScores)}}
 {{    .inEarlyPanes(intervalWindow(05, 20))}}
 {{        .containsInAnyOrder(KV.of("black", 1), KV.of("black", 2)) // Window 
triggered 2 times earlier (black, 1) + (black, 1)}}
 {{    .inOnTimePane(intervalWindow(05, 20))}}
 {{         .containsInAnyOrder(KV.of("black", 2)) // Then triggered again by 
reach the watermark (no additionnal data)}}
 {{    .inFinalPane(intervalWindow(05, 20))}}
 {{         .containsInAnyOrder(KV.of("black", 10))}}{{; // And then fired by 
receiving a late data (black, 8)}}

NB: intervalWindow(05, 20) return an IntervalWindow from 5 minutes to 20 minutes

 

The workaround I found is to filter the PCollection to keep only the EARLY 
elements with this method:

{{public static <T> PCollection<T> filter(PCollection<T> values, 
PaneInfo.Timing timing) {}}
{{  PCollection<T> filtered = values}}
{{      .apply("Wrap into ValueInSingleWindow for filtering",}}
{{          ParDo.of(}}
{{              new DoFn<T, ValueInSingleWindow<T>>() {}}
{{                  @ProcessElement}}
{{                  public void processElement(ProcessContext c, BoundedWindow 
window) {}}
{{                    c.outputWithTimestamp(ValueInSingleWindow.of(c.element(), 
c.timestamp(), window, c.pane()), c.timestamp());}}
{{                    }}}
{{              }}}
{{          )}}

{{      )}}
 {{      .setCoder(}}
 {{          ValueInSingleWindow.Coder.of(}}
 {{              values.getCoder(), 
values.getWindowingStrategy().getWindowFn().windowCoder()}}

{{          )}}

{{      )}}
{{      .apply(Filter.by(a -> a.getPane().getTiming() == timing))}}
{{      .apply("Unwrap from ValueInSingleWindow for filtering",}}
{{          ParDo.of(}}
{{              new DoFn<ValueInSingleWindow<T>, T>() {}}
{{                @ProcessElement}}
{{                public void processElement(ProcessContext c, BoundedWindow 
window) {}}
{{                   c.outputWithTimestamp(c.element().getValue(), 
c.timestamp());}}
{{                }}}
{{          }}}
{{      ));}}
{{  return filtered;}}
{{ }}}

 

And then check the AllPanes of the window :

{{PAssert.that(filter(teamScores, PaneInfo.Timing.EARLY))}}
 {{   .inWindow(intervalWindow(05, 20))}}
 {{          .containsInAnyOrder(KV.of("black", 1), KV.of("black", 2));}}
 {{PAssert.that(teamScores)}}
 {{   .inOnTimePane(intervalWindow(05, 20))}}
 {{          .containsInAnyOrder(KV.of("black", 2))}}
 {{   .inFinalPane(intervalWindow(05, 20))}}
 {{          .containsInAnyOrder(KV.of("black", 10))}}{{;}}

 

But it's a bit overkill.

  was:
What I would like to do is:

{{PAssert.that(teamScores)}}
 {{   .inEarlyPanes(intervalWindow(05, 20))}}
 {{          .containsInAnyOrder(KV.of("black", 1), KV.of("black", 2)) // 
Window triggered 2 times earlier (black, 1) + (black, 1)}}
 {{   .inOnTimePane(intervalWindow(05, 20))}}
 {{          .containsInAnyOrder(KV.of("black", 2)) // Then triggered again by 
reach the watermark (no additionnal data)}}
 {{   .inFinalPane(intervalWindow(05, 20))}}
 {{          .containsInAnyOrder(KV.of("black", 10))}}{{; // And then fired by 
receiving a late data (black, 8)}}

NB: intervalWindow(05, 20) return an IntervalWindow from 5 minutes to 20 minutes

 

The workaround I found is to filter the PCollection to keep only the EARLY 
elements with this method:

{{public static <T> PCollection<T> filter(PCollection<T> values, 
PaneInfo.Timing timing) {}}
 {{  PCollection<T> filtered = values}}
 {{      .apply("Wrap into ValueInSingleWindow for filtering",}}
 {{          ParDo.of(}}
 {{              new DoFn<T, ValueInSingleWindow<T>>() {}}
 {{                  @ProcessElement}}
 {{                  public void processElement(ProcessContext c, BoundedWindow 
window) {}}
 {{                    
c.outputWithTimestamp(ValueInSingleWindow.of(c.element(), c.timestamp(), 
window, c.pane()), c.timestamp());}}
 \{{                  }}}
 \{{              }}}

{{          )}}

{{      )}}
 {{  .setCoder(}}
 {{      ValueInSingleWindow.Coder.of(}}
 {{          values.getCoder(), 
values.getWindowingStrategy().getWindowFn().windowCoder()}}

{{      )}}

{{  )}}
 {{  .apply(Filter.by(a -> a.getPane().getTiming() == timing))}}
 {{  .apply("Unwrap from ValueInSingleWindow for filtering",}}
 {{      ParDo.of(}}
 {{          new DoFn<ValueInSingleWindow<T>, T>() {}}
 {{            @ProcessElement}}
 {{            public void processElement(ProcessContext c, BoundedWindow 
window) {}}
 {{              c.outputWithTimestamp(c.element().getValue(), c.timestamp());}}
 \{{            }}}
 \{{          }}}

{{      ));}}{{ return filtered;}}
 {{}}}

 

And then check the AllPanes of the window :

{{PAssert.that(filter(teamScores, PaneInfo.Timing.EARLY))}}
 {{   .inWindow(intervalWindow(05, 20))}}
 {{          .containsInAnyOrder(KV.of("black", 1), KV.of("black", 2));}}
 {{PAssert.that(teamScores)}}{{}}
 {{   .inOnTimePane(intervalWindow(05, 20))}}
 {{          .containsInAnyOrder(KV.of("black", 2))}}
 {{   .inFinalPane(intervalWindow(05, 20))}}
 {{          .containsInAnyOrder(KV.of("black", 10))}}{{;}}

 

But it's a bit overkill.


> Allow to check early panes of a window
> --------------------------------------
>
>                 Key: BEAM-4643
>                 URL: https://issues.apache.org/jira/browse/BEAM-4643
>             Project: Beam
>          Issue Type: Improvement
>          Components: sdk-java-core, testing
>    Affects Versions: 2.5.0
>            Reporter: Logan HAUSPIE
>            Assignee: Kenneth Knowles
>            Priority: Minor
>   Original Estimate: 24h
>  Remaining Estimate: 24h
>
> What I would like to do is:
> {{PAssert.that(teamScores)}}
>  {{    .inEarlyPanes(intervalWindow(05, 20))}}
>  {{        .containsInAnyOrder(KV.of("black", 1), KV.of("black", 2)) // 
> Window triggered 2 times earlier (black, 1) + (black, 1)}}
>  {{    .inOnTimePane(intervalWindow(05, 20))}}
>  {{         .containsInAnyOrder(KV.of("black", 2)) // Then triggered again by 
> reach the watermark (no additionnal data)}}
>  {{    .inFinalPane(intervalWindow(05, 20))}}
>  {{         .containsInAnyOrder(KV.of("black", 10))}}{{; // And then fired by 
> receiving a late data (black, 8)}}
> NB: intervalWindow(05, 20) return an IntervalWindow from 5 minutes to 20 
> minutes
>  
> The workaround I found is to filter the PCollection to keep only the EARLY 
> elements with this method:
> {{public static <T> PCollection<T> filter(PCollection<T> values, 
> PaneInfo.Timing timing) {}}
> {{  PCollection<T> filtered = values}}
> {{      .apply("Wrap into ValueInSingleWindow for filtering",}}
> {{          ParDo.of(}}
> {{              new DoFn<T, ValueInSingleWindow<T>>() {}}
> {{                  @ProcessElement}}
> {{                  public void processElement(ProcessContext c, 
> BoundedWindow window) {}}
> {{                    
> c.outputWithTimestamp(ValueInSingleWindow.of(c.element(), c.timestamp(), 
> window, c.pane()), c.timestamp());}}
> {{                    }}}
> {{              }}}
> {{          )}}
> {{      )}}
>  {{      .setCoder(}}
>  {{          ValueInSingleWindow.Coder.of(}}
>  {{              values.getCoder(), 
> values.getWindowingStrategy().getWindowFn().windowCoder()}}
> {{          )}}
> {{      )}}
> {{      .apply(Filter.by(a -> a.getPane().getTiming() == timing))}}
> {{      .apply("Unwrap from ValueInSingleWindow for filtering",}}
> {{          ParDo.of(}}
> {{              new DoFn<ValueInSingleWindow<T>, T>() {}}
> {{                @ProcessElement}}
> {{                public void processElement(ProcessContext c, BoundedWindow 
> window) {}}
> {{                   c.outputWithTimestamp(c.element().getValue(), 
> c.timestamp());}}
> {{                }}}
> {{          }}}
> {{      ));}}
> {{  return filtered;}}
> {{ }}}
>  
> And then check the AllPanes of the window :
> {{PAssert.that(filter(teamScores, PaneInfo.Timing.EARLY))}}
>  {{   .inWindow(intervalWindow(05, 20))}}
>  {{          .containsInAnyOrder(KV.of("black", 1), KV.of("black", 2));}}
>  {{PAssert.that(teamScores)}}
>  {{   .inOnTimePane(intervalWindow(05, 20))}}
>  {{          .containsInAnyOrder(KV.of("black", 2))}}
>  {{   .inFinalPane(intervalWindow(05, 20))}}
>  {{          .containsInAnyOrder(KV.of("black", 10))}}{{;}}
>  
> But it's a bit overkill.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to