[ 
https://issues.apache.org/jira/browse/BEAM-4643?focusedWorklogId=152174&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-152174
 ]

ASF GitHub Bot logged work on BEAM-4643:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 08/Oct/18 09:32
            Start Date: 08/Oct/18 09:32
    Worklog Time Spent: 10m 
      Work Description: lhauspie commented on issue #5811: [BEAM-4643] Allow to 
check early panes of a window
URL: https://github.com/apache/beam/pull/5811#issuecomment-427772476
 
 
   @lukecwik, does this PR need some actions from me or something else to be 
merged ?
   
   Thanks in advance ;)

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
-------------------

            Worklog Id:     (was: 152174)
            Time Spent: 1.5h  (was: 1h 20m)
    Remaining Estimate: 22.5h  (was: 22h 40m)

> Allow to check early panes of a window
> --------------------------------------
>
>                 Key: BEAM-4643
>                 URL: https://issues.apache.org/jira/browse/BEAM-4643
>             Project: Beam
>          Issue Type: Improvement
>          Components: sdk-java-core, testing
>    Affects Versions: 2.5.0
>            Reporter: Logan HAUSPIE
>            Assignee: Logan HAUSPIE
>            Priority: Minor
>   Original Estimate: 24h
>          Time Spent: 1.5h
>  Remaining Estimate: 22.5h
>
> What I would like to do is:
> {{PAssert.that(teamScores)}}
>  {{    .inEarlyPanes(intervalWindow(05, 20))}}
>  {{        .containsInAnyOrder(KV.of("black", 1), KV.of("black", 2)) // 
> Window triggered 2 times earlier (black, 1) + (black, 1)}}
>  {{    .inOnTimePane(intervalWindow(05, 20))}}
>  {{         .containsInAnyOrder(KV.of("black", 2)) // Then triggered again by 
> reach the watermark (no additionnal data)}}
>  {{    .inFinalPane(intervalWindow(05, 20))}}
>  {{         .containsInAnyOrder(KV.of("black", 10))}}{{; // And then fired by 
> receiving a late data (black, 8)}}
> NB: intervalWindow(05, 20) return an IntervalWindow from 5 minutes to 20 
> minutes
>  
> The workaround I found is to filter the PCollection to keep only the EARLY 
> elements with this method:
> {{public static <T> PCollection<T> filter(PCollection<T> values, 
> PaneInfo.Timing timing) {}}
> {{  PCollection<T> filtered = values}}
> {{      .apply("Wrap into ValueInSingleWindow for filtering",}}
> {{          ParDo.of(}}
> {{              new DoFn<T, ValueInSingleWindow<T>>() {}}
> {{                  @ProcessElement}}
> {{                  public void processElement(ProcessContext c, 
> BoundedWindow window) {}}
> {{                    
> c.outputWithTimestamp(ValueInSingleWindow.of(c.element(), c.timestamp(), 
> window, c.pane()), c.timestamp());}}
> {{                    }}}
> {{              }}}
> {{          )}}
> {{      )}}
>  {{      .setCoder(}}
>  {{          ValueInSingleWindow.Coder.of(}}
>  {{              values.getCoder(), 
> values.getWindowingStrategy().getWindowFn().windowCoder()}}
> {{          )}}
> {{      )}}
> {{      .apply(Filter.by(a -> a.getPane().getTiming() == timing))}}
> {{      .apply("Unwrap from ValueInSingleWindow for filtering",}}
> {{          ParDo.of(}}
> {{              new DoFn<ValueInSingleWindow<T>, T>() {}}
> {{                @ProcessElement}}
> {{                public void processElement(ProcessContext c, BoundedWindow 
> window) {}}
> {{                   c.outputWithTimestamp(c.element().getValue(), 
> c.timestamp());}}
> {{                }}}
> {{          }}}
> {{      ));}}
> {{  return filtered;}}
> {{ }}}
>  
> And then check the AllPanes of the window :
> {{PAssert.that(filter(teamScores, PaneInfo.Timing.EARLY))}}
>  {{   .inWindow(intervalWindow(05, 20))}}
>  {{          .containsInAnyOrder(KV.of("black", 1), KV.of("black", 2));}}
>  {{PAssert.that(teamScores)}}
>  {{   .inOnTimePane(intervalWindow(05, 20))}}
>  {{          .containsInAnyOrder(KV.of("black", 2))}}
>  {{   .inFinalPane(intervalWindow(05, 20))}}
>  {{          .containsInAnyOrder(KV.of("black", 10))}}{{;}}
>  
> But it's a bit overkill.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to