[ 
https://issues.apache.org/jira/browse/BEAM-4643?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16761003#comment-16761003
 ] 

Maximilian Michels commented on BEAM-4643:
------------------------------------------

Resolving this for 2.8.0 where this first appeared.

> Allow to check early panes of a window
> --------------------------------------
>
>                 Key: BEAM-4643
>                 URL: https://issues.apache.org/jira/browse/BEAM-4643
>             Project: Beam
>          Issue Type: Improvement
>          Components: sdk-java-core, testing
>    Affects Versions: 2.5.0
>            Reporter: Logan HAUSPIE
>            Assignee: Logan HAUSPIE
>            Priority: Minor
>             Fix For: 2.8.0
>
>   Original Estimate: 24h
>          Time Spent: 2h 10m
>  Remaining Estimate: 21h 50m
>
> What I would like to do is:
> {{PAssert.that(teamScores)}}
>  {{    .inEarlyPanes(intervalWindow(05, 20))}}
>  {{        .containsInAnyOrder(KV.of("black", 1), KV.of("black", 2)) // 
> Window triggered 2 times earlier (black, 1) + (black, 1)}}
>  {{    .inOnTimePane(intervalWindow(05, 20))}}
>  {{         .containsInAnyOrder(KV.of("black", 2)) // Then triggered again by 
> reach the watermark (no additionnal data)}}
>  {{    .inFinalPane(intervalWindow(05, 20))}}
>  {{         .containsInAnyOrder(KV.of("black", 10))}}{{; // And then fired by 
> receiving a late data (black, 8)}}
> NB: intervalWindow(05, 20) return an IntervalWindow from 5 minutes to 20 
> minutes
>  
> The workaround I found is to filter the PCollection to keep only the EARLY 
> elements with this method:
> {{public static <T> PCollection<T> filter(PCollection<T> values, 
> PaneInfo.Timing timing) {}}
> {{  PCollection<T> filtered = values}}
> {{      .apply("Wrap into ValueInSingleWindow for filtering",}}
> {{          ParDo.of(}}
> {{              new DoFn<T, ValueInSingleWindow<T>>() {}}
> {{                  @ProcessElement}}
> {{                  public void processElement(ProcessContext c, 
> BoundedWindow window) {}}
> {{                    
> c.outputWithTimestamp(ValueInSingleWindow.of(c.element(), c.timestamp(), 
> window, c.pane()), c.timestamp());}}
> {{                    }}}
> {{              }}}
> {{          )}}
> {{      )}}
>  {{      .setCoder(}}
>  {{          ValueInSingleWindow.Coder.of(}}
>  {{              values.getCoder(), 
> values.getWindowingStrategy().getWindowFn().windowCoder()}}
> {{          )}}
> {{      )}}
> {{      .apply(Filter.by(a -> a.getPane().getTiming() == timing))}}
> {{      .apply("Unwrap from ValueInSingleWindow for filtering",}}
> {{          ParDo.of(}}
> {{              new DoFn<ValueInSingleWindow<T>, T>() {}}
> {{                @ProcessElement}}
> {{                public void processElement(ProcessContext c, BoundedWindow 
> window) {}}
> {{                   c.outputWithTimestamp(c.element().getValue(), 
> c.timestamp());}}
> {{                }}}
> {{          }}}
> {{      ));}}
> {{  return filtered;}}
> {{ }}}
>  
> And then check the AllPanes of the window :
> {{PAssert.that(filter(teamScores, PaneInfo.Timing.EARLY))}}
>  {{   .inWindow(intervalWindow(05, 20))}}
>  {{          .containsInAnyOrder(KV.of("black", 1), KV.of("black", 2));}}
>  {{PAssert.that(teamScores)}}
>  {{   .inOnTimePane(intervalWindow(05, 20))}}
>  {{          .containsInAnyOrder(KV.of("black", 2))}}
>  {{   .inFinalPane(intervalWindow(05, 20))}}
>  {{          .containsInAnyOrder(KV.of("black", 10))}}{{;}}
>  
> But it's a bit overkill.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to