Really interesting! Implementing correctly the watermark has been a common struggle for IO authors, to the point that some IOs still have issues around that. So +1 for this, in particular if we can get to reuse common patterns. I was not aware of Boyuan's work around this, really nice.
One aspect I have always being confused about since I read the SDF proposal documents is if we could get to have a single API for both Bounded and Unbounded IO by somehow assuming that with a BoundedSDF is an UnboundedSDF special case. Could WatermarkEstimator help in this direction? One quick case that I can think is to make the current HBaseIO SDF to work in an unbounded manner, for example to 'watch and read new tables'. On Thu, Feb 27, 2020 at 11:43 PM Luke Cwik <[email protected]> wrote: > See this doc[1] and blog[2] for some context about SplittableDoFns. > > To support watermark reporting within the Java SDK for SplittableDoFns, we > need a way to have SDF authors to report watermark estimates over the > element and restriction pair that they are processing. > > For UnboundedSources, it was found to be a pain point to ask each SDF > author to write their own watermark estimation which typically prevented > re-use. Therefore we would like to have a "library" of watermark estimators > that help SDF authors perform this estimation similar to how there is a > "library" of restrictions and restriction trackers that SDF authors can > use. For SDF authors where the existing library doesn't work, they can add > additional ones that observe timestamps of elements or choose to directly > report the watermark through a "ManualWatermarkEstimator" parameter that > can be supplied to @ProcessElement methods. > > The public facing portion of the DoFn changes adds three new annotations > for new DoFn style methods: > GetInitialWatermarkEstimatorState: Returns the initial watermark state, > similar to GetInitialRestriction > GetWatermarkEstimatorStateCoder: Returns a coder compatible with watermark > state type, similar to GetRestrictionCoder for restrictions returned by > GetInitialRestriction. > NewWatermarkEstimator: Returns a watermark estimator that either the > framework invokes allowing it to observe the timestamps of output records > or a manual watermark estimator that can be explicitly invoked to update > the watermark. > > See [3] for an initial PR with the public facing additions to the core > Java API related to SplittableDoFn. > > This mirrors a bunch of work that was done by Boyuan within the Pyhon SDK > [4, 5] but in the style of new DoFn parameter/method invocation we have in > the Java SDK. > > 1: https://s.apache.org/splittable-do-fn > 2: https://beam.apache.org/blog/2017/08/16/splittable-do-fn.html > 3: https://github.com/apache/beam/pull/10992 > 4: https://github.com/apache/beam/pull/9794 > 5: https://github.com/apache/beam/pull/10375 >
