This is cool. Could the watermark estimators be packaged in a module
without additional dependencies? I think that it would be useful even
for projects outside of Bbeam, so it would be nice if these could use
this library without depending on Beam SDK itself.
Jan
On 2/28/20 12:50 AM, Luke Cwik wrote:
Python SDK also has a RestrictionProvider[1], that covers initial
splitting, sizing the restriction and providing the restriction coder.
I believe that keeping one as a provider while fully integrating the
other set as "new" DoFn style methods and parameters would be odd.
Kenn are you also suggesting swapping all the restriction tracker
related methods (e.g. NewTracker, GetRestrictionCoder, ...) to the
Provider style as well?
I like how the new DoFn style methods and parameters is done because
it is easily extensible. This could be extended for
RestrictionProvider/WatermarkEstimatorProvider via new invokers in the
style of the DoFnInvoker.
1:
https://github.com/apache/beam/blob/7a4cdece44304acb77a1812390bf35f66f3df0a2/sdks/python/apache_beam/transforms/core.py#L208
On Thu, Feb 27, 2020 at 3:39 PM Kenneth Knowles <[email protected]
<mailto:[email protected]>> wrote:
Great idea.
Are any of the methods optional or useful on their own? It seems
like maybe not? So then a single annotation to return an object
that returns all the methods might be more clear. Per Boyuan's
work - WatermarkEstimatorProvider?
Kenn
On Thu, Feb 27, 2020 at 2:43 PM Luke Cwik <[email protected]
<mailto:[email protected]>> wrote:
See this doc[1] and blog[2] for some context about
SplittableDoFns.
To support watermark reporting within the Java SDK for
SplittableDoFns, we need a way to have SDF authors to report
watermark estimates over the element and restriction pair that
they are processing.
For UnboundedSources, it was found to be a pain point to ask
each SDF author to write their own watermark estimation which
typically prevented re-use. Therefore we would like to have a
"library" of watermark estimators that help SDF authors
perform this estimation similar to how there is a "library" of
restrictions and restriction trackers that SDF authors can
use. For SDF authors where the existing library doesn't work,
they can add additional ones that observe timestamps of
elements or choose to directly report the watermark through a
"ManualWatermarkEstimator" parameter that can be supplied to
@ProcessElement methods.
The public facing portion of the DoFn changes adds three new
annotations for new DoFn style methods:
GetInitialWatermarkEstimatorState: Returns the initial
watermark state, similar to GetInitialRestriction
GetWatermarkEstimatorStateCoder: Returns a coder compatible
with watermark state type, similar to GetRestrictionCoder for
restrictions returned by GetInitialRestriction.
NewWatermarkEstimator: Returns a watermark estimator that
either the framework invokes allowing it to observe the
timestamps of output records or a manual watermark estimator
that can be explicitly invoked to update the watermark.
See [3] for an initial PR with the public facing additions to
the core Java API related to SplittableDoFn.
This mirrors a bunch of work that was done by Boyuan within
the Pyhon SDK [4, 5] but in the style of new DoFn
parameter/method invocation we have in the Java SDK.
1: https://s.apache.org/splittable-do-fn
2: https://beam.apache.org/blog/2017/08/16/splittable-do-fn.html
3: https://github.com/apache/beam/pull/10992
4: https://github.com/apache/beam/pull/9794
5: https://github.com/apache/beam/pull/10375