Logan HAUSPIE created BEAM-4643:
-----------------------------------
Summary: Allow to check early panes of a window
Key: BEAM-4643
URL: https://issues.apache.org/jira/browse/BEAM-4643
Project: Beam
Issue Type: Improvement
Components: sdk-java-core, testing
Affects Versions: 2.5.0
Reporter: Logan HAUSPIE
Assignee: Kenneth Knowles
What I would like to do is:
{{PAssert.that(teamScores)}}
{{ .inEarlyPanes(intervalWindow(05, 20))}}
{{ .containsInAnyOrder(KV.of("black", 1), KV.of("black", 2)) //
Window triggered 2 times earlier (black, 1) + (black, 1)}}
{{ .inOnTimePane(intervalWindow(05, 20))}}
{{ .containsInAnyOrder(KV.of("black", 2)) // Then triggered again by
reach the watermark (no additionnal data)}}
{{ .inFinalPane(intervalWindow(05, 20))}}
{{ .containsInAnyOrder(KV.of("black", 10))}}{{; // And then fired by
receiving a late data (black, 8)}}
NB: intervalWindow(05, 20) return an IntervalWindow from 5 minutes to 20 minutes
The workaround I found is to filter the PCollection to keep only the EARLY
elements with this method:
{{public static <T> PCollection<T> filter(PCollection<T> values,
PaneInfo.Timing timing) {}}
{{ PCollection<T> filtered = values}}
{{ .apply("Wrap into ValueInSingleWindow for filtering",}}
{{ ParDo.of(}}
{{ new DoFn<T, ValueInSingleWindow<T>>() {}}
{{ @ProcessElement}}
{{ public void processElement(ProcessContext c, BoundedWindow
window) {}}
{{ c.outputWithTimestamp(ValueInSingleWindow.of(c.element(),
c.timestamp(), window, c.pane()), c.timestamp());}}
{{ }}}
{{ }}}
{{ )}}
{{ )}}
{{ .setCoder(}}
{{ ValueInSingleWindow.Coder.of(}}
{{ values.getCoder(),
values.getWindowingStrategy().getWindowFn().windowCoder()}}
{{ )}}
{{ )}}
{{ .apply(Filter.by(a -> a.getPane().getTiming() == timing))}}
{{ .apply("Unwrap from ValueInSingleWindow for filtering",}}
{{ ParDo.of(}}
{{ new DoFn<ValueInSingleWindow<T>, T>() {}}
{{ @ProcessElement}}
{{ public void processElement(ProcessContext c, BoundedWindow
window) {}}
{{ c.outputWithTimestamp(c.element().getValue(), c.timestamp());}}
{{ }}}
{{ }}}
{{ ));}}{{ return filtered;}}
{{}}}
And then check the AllPanes of the window :
{{PAssert.that(filter(teamScores, PaneInfo.Timing.EARLY))}}
{{ .inWindow(intervalWindow(05, 20))}}
{{ .containsInAnyOrder(KV.of("black", 1), KV.of("black", 2));}}
{{PAssert.that(teamScores)}}{{}}
{{ .inOnTimePane(intervalWindow(05, 20))}}
{{ .containsInAnyOrder(KV.of("black", 2))}}
{{ .inFinalPane(intervalWindow(05, 20))}}
{{ .containsInAnyOrder(KV.of("black", 10))}}{{;}}
But it's a bit overkill.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)