[ 
https://issues.apache.org/jira/browse/BEAM-4643?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16523690#comment-16523690
 ] 

Logan HAUSPIE commented on BEAM-4643:
-------------------------------------

If this issue is retained, I would like to do the PR.

> Allow to check early panes of a window
> --------------------------------------
>
>                 Key: BEAM-4643
>                 URL: https://issues.apache.org/jira/browse/BEAM-4643
>             Project: Beam
>          Issue Type: Improvement
>          Components: sdk-java-core, testing
>    Affects Versions: 2.5.0
>            Reporter: Logan HAUSPIE
>            Assignee: Kenneth Knowles
>            Priority: Minor
>   Original Estimate: 24h
>  Remaining Estimate: 24h
>
> What I would like to do is:
> {{PAssert.that(teamScores)}}
>  {{   .inEarlyPanes(intervalWindow(05, 20))}}
>  {{          .containsInAnyOrder(KV.of("black", 1), KV.of("black", 2)) // 
> Window triggered 2 times earlier (black, 1) + (black, 1)}}
>  {{   .inOnTimePane(intervalWindow(05, 20))}}
>  {{          .containsInAnyOrder(KV.of("black", 2)) // Then triggered again 
> by reach the watermark (no additionnal data)}}
>  {{   .inFinalPane(intervalWindow(05, 20))}}
>  {{          .containsInAnyOrder(KV.of("black", 10))}}{{; // And then fired 
> by receiving a late data (black, 8)}}
> NB: intervalWindow(05, 20) return an IntervalWindow from 5 minutes to 20 
> minutes
>  
> The workaround I found is to filter the PCollection to keep only the EARLY 
> elements with this method:
> {{public static <T> PCollection<T> filter(PCollection<T> values, 
> PaneInfo.Timing timing) {}}
> {{  PCollection<T> filtered = values}}
> {{      .apply("Wrap into ValueInSingleWindow for filtering",}}
> {{          ParDo.of(}}
> {{              new DoFn<T, ValueInSingleWindow<T>>() {}}
> {{                  @ProcessElement}}
> {{                  public void processElement(ProcessContext c, 
> BoundedWindow window) {}}
> {{                    
> c.outputWithTimestamp(ValueInSingleWindow.of(c.element(), c.timestamp(), 
> window, c.pane()), c.timestamp());}}
> {{                  }}}
> {{              }}}
> {{          )}}
> {{      )}}
> {{  .setCoder(}}
> {{      ValueInSingleWindow.Coder.of(}}
> {{          values.getCoder(), 
> values.getWindowingStrategy().getWindowFn().windowCoder()}}
> {{      )}}
> {{  )}}
> {{  .apply(Filter.by(a -> a.getPane().getTiming() == timing))}}
> {{  .apply("Unwrap from ValueInSingleWindow for filtering",}}
> {{      ParDo.of(}}
> {{          new DoFn<ValueInSingleWindow<T>, T>() {}}
> {{            @ProcessElement}}
> {{            public void processElement(ProcessContext c, BoundedWindow 
> window) {}}
> {{              c.outputWithTimestamp(c.element().getValue(), 
> c.timestamp());}}
> {{            }}}
> {{          }}}
> {{      ));}}{{ return filtered;}}
> {{}}}
>  
> And then check the AllPanes of the window :
> {{PAssert.that(filter(teamScores, PaneInfo.Timing.EARLY))}}
> {{   .inWindow(intervalWindow(05, 20))}}
> {{          .containsInAnyOrder(KV.of("black", 1), KV.of("black", 2));}}
> {{PAssert.that(teamScores)}}{{}}
> {{   .inOnTimePane(intervalWindow(05, 20))}}
> {{          .containsInAnyOrder(KV.of("black", 2))}}
> {{   .inFinalPane(intervalWindow(05, 20))}}
> {{          .containsInAnyOrder(KV.of("black", 10))}}{{;}}
>  
> But it's a bit overkill.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to