[
https://issues.apache.org/jira/browse/BEAM-1287?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Kenneth Knowles updated BEAM-1287:
----------------------------------
This Jira ticket has a pull request attached to it, but is still open. Did the
pull request resolve the issue? If so, could you please mark it resolved? This
will help the project have a clear view of its open issues.
> Give new DoFn the ability to output to a particular window
> ----------------------------------------------------------
>
> Key: BEAM-1287
> URL: https://issues.apache.org/jira/browse/BEAM-1287
> Project: Beam
> Issue Type: New Feature
> Components: beam-model, sdk-java-core, sdk-py-core
> Reporter: Kenneth Knowles
> Priority: P3
> Labels: Clarified
>
> The new {{DoFn}} design allows us to have specialized output receivers, such
> as a key-preserving output (the default is non-key-preserving) or
> non-window-preserving (the default is window-preserving) output. This JIRA is
> for the latter, with an emphasis on making the two as analogous as we can.
> {code:java}
> new DoFn<A, B>() {
> @ProcessElement
> public void processElement(ProcessContext c, OutputToWindow receiver) {
> receiver.outputWithTimestamp(value, timestamp, window);
> }
> @FinishBundle
> public void finishBundle(OutputToWindow receiver) {
> }
> }
> {code}
> After this change, window assignment need not be a primitive.
> Why is this OK? The primary motivation for keeping windows strongly separated
> is because they yield parallelism if we don't impose any requirement that
> multiple windows for a single key be co-located or linearized. We should be
> able to process a single key with millions of non-merging windows in parallel
> without having to reify the windows (though this isn't _that_ bad). That is a
> major change/improvement over the vague assumption that keys are the atom of
> parallelism.
> This change will not remove this property, as it pertains to input and state.
> The analogy with keys:
> - Stateful DoFn requires the ability to access key-and-window state. For
> some runners, perhaps this does not require colocation. For runners that want
> to do this efficiently/locally, it means some key-and-window colocation
> operation followed by only key-and-window preserving transforms. So
> outputting to a new window breaks the invariant, just as a non-key-preserving
> transform would. Until we had the new {{DoFn}} we couldn't know if
> non-window-preserving output was used.
> - Non-key-preserving output also breaks any idea that combined aggregates
> are actually one per key, etc. So windows can work the same way.
> - Timestamps are interesting. By analogy with keys, timestamps would be just
> part of the value and able to change freely. This doesn't work so well
> because of lateness. To avoid digging deeper into changing anything, this
> proposal just suggests that a timestamp is provided, and whether it is
> allowed to be late is governed by the same rules as {{outputWithTimestamp}}.
> - Not clear if this has uses for merging windows.
> This change is entirely backwards compatible, but given that it removes a
> primitive and is rather little effort, it might bear earlier consideration.
> No work will begin until it is brought to the dev list.
--
This message was sent by Atlassian Jira
(v8.20.1#820001)