This is cool. Could the watermark estimators be packaged in a module without additional dependencies? I think that it would be useful even for projects outside of Bbeam, so it would be nice if these could use this library without depending on Beam SDK itself.

Jan

On 2/28/20 12:50 AM, Luke Cwik wrote:
Python SDK also has a RestrictionProvider[1], that covers initial splitting, sizing the restriction and providing the restriction coder. I believe that keeping one as a provider while fully integrating the other set as "new" DoFn style methods and parameters would be odd.

Kenn are you also suggesting swapping all the restriction tracker related methods (e.g. NewTracker, GetRestrictionCoder, ...) to the Provider style as well?

I like how the new DoFn style methods and parameters is done because it is easily extensible. This could be extended for RestrictionProvider/WatermarkEstimatorProvider via new invokers in the style of the DoFnInvoker.

1: https://github.com/apache/beam/blob/7a4cdece44304acb77a1812390bf35f66f3df0a2/sdks/python/apache_beam/transforms/core.py#L208

On Thu, Feb 27, 2020 at 3:39 PM Kenneth Knowles <[email protected] <mailto:[email protected]>> wrote:

    Great idea.

    Are any of the methods optional or useful on their own? It seems
    like maybe not? So then a single annotation to return an object
    that returns all the methods might be more clear. Per Boyuan's
    work - WatermarkEstimatorProvider?

    Kenn

    On Thu, Feb 27, 2020 at 2:43 PM Luke Cwik <[email protected]
    <mailto:[email protected]>> wrote:

        See this doc[1] and blog[2] for some context about
        SplittableDoFns.

        To support watermark reporting within the Java SDK for
        SplittableDoFns, we need a way to have SDF authors to report
        watermark estimates over the element and restriction pair that
        they are processing.

        For UnboundedSources, it was found to be a pain point to ask
        each SDF author to write their own watermark estimation which
        typically prevented re-use. Therefore we would like to have a
        "library" of watermark estimators that help SDF authors
        perform this estimation similar to how there is a "library" of
        restrictions and restriction trackers that SDF authors can
        use. For SDF authors where the existing library doesn't work,
        they can add additional ones that observe timestamps of
        elements or choose to directly report the watermark through a
        "ManualWatermarkEstimator" parameter that can be supplied to
        @ProcessElement methods.

        The public facing portion of the DoFn changes adds three new
        annotations for new DoFn style methods:
        GetInitialWatermarkEstimatorState: Returns the initial
        watermark state, similar to GetInitialRestriction
        GetWatermarkEstimatorStateCoder: Returns a coder compatible
        with watermark state type, similar to GetRestrictionCoder for
        restrictions returned by GetInitialRestriction.
        NewWatermarkEstimator: Returns a watermark estimator that
        either the framework invokes allowing it to observe the
        timestamps of output records or a manual watermark estimator
        that can be explicitly invoked to update the watermark.

        See [3] for an initial PR with the public facing additions to
        the core Java API related to SplittableDoFn.

        This mirrors a bunch of work that was done by Boyuan within
        the Pyhon SDK [4, 5] but in the style of new DoFn
        parameter/method invocation we have in the Java SDK.

        1: https://s.apache.org/splittable-do-fn
        2: https://beam.apache.org/blog/2017/08/16/splittable-do-fn.html
        3: https://github.com/apache/beam/pull/10992
        4: https://github.com/apache/beam/pull/9794
        5: https://github.com/apache/beam/pull/10375

Reply via email to