[
https://issues.apache.org/jira/browse/BEAM-4643?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16523803#comment-16523803
]
Kenneth Knowles commented on BEAM-4643:
---------------------------------------
The problem is that there is a natural race condition between values being
early and on time.
- Discarding: the last "early" pane can easily contain all the contents while
the "on time" pane is empty
- Accumulating: less of a problem, since the on time pane contains it all again
With {{TestStream}} you could control them so I think this feature makes sense.
But in a real pipeline you should be very careful about treating them
differently.
Please do go for a PR!
> Allow to check early panes of a window
> --------------------------------------
>
> Key: BEAM-4643
> URL: https://issues.apache.org/jira/browse/BEAM-4643
> Project: Beam
> Issue Type: Improvement
> Components: sdk-java-core, testing
> Affects Versions: 2.5.0
> Reporter: Logan HAUSPIE
> Assignee: Kenneth Knowles
> Priority: Minor
> Original Estimate: 24h
> Remaining Estimate: 24h
>
> What I would like to do is:
> {{PAssert.that(teamScores)}}
> {{ .inEarlyPanes(intervalWindow(05, 20))}}
> {{ .containsInAnyOrder(KV.of("black", 1), KV.of("black", 2)) //
> Window triggered 2 times earlier (black, 1) + (black, 1)}}
> {{ .inOnTimePane(intervalWindow(05, 20))}}
> {{ .containsInAnyOrder(KV.of("black", 2)) // Then triggered again by
> reach the watermark (no additionnal data)}}
> {{ .inFinalPane(intervalWindow(05, 20))}}
> {{ .containsInAnyOrder(KV.of("black", 10))}}{{; // And then fired by
> receiving a late data (black, 8)}}
> NB: intervalWindow(05, 20) return an IntervalWindow from 5 minutes to 20
> minutes
>
> The workaround I found is to filter the PCollection to keep only the EARLY
> elements with this method:
> {{public static <T> PCollection<T> filter(PCollection<T> values,
> PaneInfo.Timing timing) {}}
> {{ PCollection<T> filtered = values}}
> {{ .apply("Wrap into ValueInSingleWindow for filtering",}}
> {{ ParDo.of(}}
> {{ new DoFn<T, ValueInSingleWindow<T>>() {}}
> {{ @ProcessElement}}
> {{ public void processElement(ProcessContext c,
> BoundedWindow window) {}}
> {{
> c.outputWithTimestamp(ValueInSingleWindow.of(c.element(), c.timestamp(),
> window, c.pane()), c.timestamp());}}
> {{ }}}
> {{ }}}
> {{ )}}
> {{ )}}
> {{ .setCoder(}}
> {{ ValueInSingleWindow.Coder.of(}}
> {{ values.getCoder(),
> values.getWindowingStrategy().getWindowFn().windowCoder()}}
> {{ )}}
> {{ )}}
> {{ .apply(Filter.by(a -> a.getPane().getTiming() == timing))}}
> {{ .apply("Unwrap from ValueInSingleWindow for filtering",}}
> {{ ParDo.of(}}
> {{ new DoFn<ValueInSingleWindow<T>, T>() {}}
> {{ @ProcessElement}}
> {{ public void processElement(ProcessContext c, BoundedWindow
> window) {}}
> {{ c.outputWithTimestamp(c.element().getValue(),
> c.timestamp());}}
> {{ }}}
> {{ }}}
> {{ ));}}
> {{ return filtered;}}
> {{ }}}
>
> And then check the AllPanes of the window :
> {{PAssert.that(filter(teamScores, PaneInfo.Timing.EARLY))}}
> {{ .inWindow(intervalWindow(05, 20))}}
> {{ .containsInAnyOrder(KV.of("black", 1), KV.of("black", 2));}}
> {{PAssert.that(teamScores)}}
> {{ .inOnTimePane(intervalWindow(05, 20))}}
> {{ .containsInAnyOrder(KV.of("black", 2))}}
> {{ .inFinalPane(intervalWindow(05, 20))}}
> {{ .containsInAnyOrder(KV.of("black", 10))}}{{;}}
>
> But it's a bit overkill.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)