[
https://issues.apache.org/jira/browse/BEAM-4643?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16761003#comment-16761003
]
Maximilian Michels commented on BEAM-4643:
------------------------------------------
Resolving this for 2.8.0 where this first appeared.
> Allow to check early panes of a window
> --------------------------------------
>
> Key: BEAM-4643
> URL: https://issues.apache.org/jira/browse/BEAM-4643
> Project: Beam
> Issue Type: Improvement
> Components: sdk-java-core, testing
> Affects Versions: 2.5.0
> Reporter: Logan HAUSPIE
> Assignee: Logan HAUSPIE
> Priority: Minor
> Fix For: 2.8.0
>
> Original Estimate: 24h
> Time Spent: 2h 10m
> Remaining Estimate: 21h 50m
>
> What I would like to do is:
> {{PAssert.that(teamScores)}}
> {{ .inEarlyPanes(intervalWindow(05, 20))}}
> {{ .containsInAnyOrder(KV.of("black", 1), KV.of("black", 2)) //
> Window triggered 2 times earlier (black, 1) + (black, 1)}}
> {{ .inOnTimePane(intervalWindow(05, 20))}}
> {{ .containsInAnyOrder(KV.of("black", 2)) // Then triggered again by
> reach the watermark (no additionnal data)}}
> {{ .inFinalPane(intervalWindow(05, 20))}}
> {{ .containsInAnyOrder(KV.of("black", 10))}}{{; // And then fired by
> receiving a late data (black, 8)}}
> NB: intervalWindow(05, 20) return an IntervalWindow from 5 minutes to 20
> minutes
>
> The workaround I found is to filter the PCollection to keep only the EARLY
> elements with this method:
> {{public static <T> PCollection<T> filter(PCollection<T> values,
> PaneInfo.Timing timing) {}}
> {{ PCollection<T> filtered = values}}
> {{ .apply("Wrap into ValueInSingleWindow for filtering",}}
> {{ ParDo.of(}}
> {{ new DoFn<T, ValueInSingleWindow<T>>() {}}
> {{ @ProcessElement}}
> {{ public void processElement(ProcessContext c,
> BoundedWindow window) {}}
> {{
> c.outputWithTimestamp(ValueInSingleWindow.of(c.element(), c.timestamp(),
> window, c.pane()), c.timestamp());}}
> {{ }}}
> {{ }}}
> {{ )}}
> {{ )}}
> {{ .setCoder(}}
> {{ ValueInSingleWindow.Coder.of(}}
> {{ values.getCoder(),
> values.getWindowingStrategy().getWindowFn().windowCoder()}}
> {{ )}}
> {{ )}}
> {{ .apply(Filter.by(a -> a.getPane().getTiming() == timing))}}
> {{ .apply("Unwrap from ValueInSingleWindow for filtering",}}
> {{ ParDo.of(}}
> {{ new DoFn<ValueInSingleWindow<T>, T>() {}}
> {{ @ProcessElement}}
> {{ public void processElement(ProcessContext c, BoundedWindow
> window) {}}
> {{ c.outputWithTimestamp(c.element().getValue(),
> c.timestamp());}}
> {{ }}}
> {{ }}}
> {{ ));}}
> {{ return filtered;}}
> {{ }}}
>
> And then check the AllPanes of the window :
> {{PAssert.that(filter(teamScores, PaneInfo.Timing.EARLY))}}
> {{ .inWindow(intervalWindow(05, 20))}}
> {{ .containsInAnyOrder(KV.of("black", 1), KV.of("black", 2));}}
> {{PAssert.that(teamScores)}}
> {{ .inOnTimePane(intervalWindow(05, 20))}}
> {{ .containsInAnyOrder(KV.of("black", 2))}}
> {{ .inFinalPane(intervalWindow(05, 20))}}
> {{ .containsInAnyOrder(KV.of("black", 10))}}{{;}}
>
> But it's a bit overkill.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)