I would like to discuss a minor extension to the Beam model.
Beam bundles have very few restrictions on what can be in a bundle, in
particular s bundle might contain records for many different windows. This
was an explicit decision as bundling primarily exists for performance
reasons and we found that limiting bundling based on windows or timestamps
often led to severe performance problems. However it sometimes makes
finishBundle hard to use.
I've seen multiple cases where users maintain some state in their DoFn that
needs finalizing (e.g. writing to an external service) in finishBundle.
Often users end up keeping lists of all windows seen in the bundle so they
can be processed separately (or sometimes not realizing that their can be
multiple windows and writing incorrect code).
The lack of a window also means that we don't currently support injecting
an OuptutReceiver into finishBundle, as there's no good way of knowing
which window output should be put into.
I would like to propose adding a way for finishBundle to inspect the
window, either directly (via a BoundedWindow parameter) or indirectly (via
an OutputReceiver parameter). In this case, we will execute finishBundle
once per window in the bundle. Otherwise, we will execute finishBundle once
at the end of the bundle as before. This behavior is backwards compatible,
as previously these parameters were disallowed in finishBundle.
Note that this is similar to something Beam already does in processElement.
A single element can exist in multiple windows, however if the
processElement "observes" the window then Beam will call processElement
once per window.
In Java, the user code could look like this:
DoFn<> {
...
@FinishBundle
public void finishBundle(IntervalWindow window, OutputReceiver<T> o) {
// This finishBundle will be called once per window in the bundle
since it has
// a parameter that observes the window.
}
}
This PR <https://github.com/apache/beam/pull/11600> shows an implementation
of this extension for the Java SDK.
Thoughts?
Reuven