UnboundedSources and SplittableDoFns report watermarks which the runner uses to compute how much the watermark could advance if it processed some outstanding work. But it is always upto the runner to choose when the watermark advances. The runner could process each work item in watermark priority order and advance the watermark in small increments or could process many work items and then advance the watermark a lot. (Note that the BoundedSources API doesn't allow for reporting the watermark and it starts at Beam's concept of START OF TIME and advances in one step to Beam's concept of END OF TIME).
You might be able to write what you want with an event based timer. Kenn wrote (2?) blog posts on state and timers that have some pretty good explanations and examples. On Tue, Apr 9, 2019 at 2:27 PM Pablo Estrada <pabl...@google.com> wrote: > hi Luke, > thanks for the prompt reply: ) > > That makes sense. I think I'll go back to my cave to read a bunch about > streaming. : ) > > I was looking for this to try to write a sequence generator for Python in > streaming, and I was trying to debug what was going on. I was trying to > allow the DoFn to receive a watermark reported by the upstream source. (... > does that answer "which watermark?"... I am not sure that it does... but > maybe..). > > Do you think it's a reasonable use case for DoFns to know what the > upstream watermark is? > I hope that makes at least a some sense... : ) > > If it doesn't make sense, feel free to ignore, and I'll go do my readings. > Thanks! > -P. > > On Tue, Apr 9, 2019 at 1:44 PM Lukasz Cwik <lc...@google.com> wrote: > >> WatermarkReporterParam is about reporting the watermark. The main usecase >> is for SplittableDoFns to be able to report the data watermark. >> >> The watermark is per input and output of a DoFn. Also each bundle being >> processed has its local watermarks while the runner computes the global >> watermark. The runners watermark could be per key, or key range or global >> across all keys. >> >> There is no runner agnostic way to read the watermark today. Is there a >> usecase you are targeting that would help from having access to the >> watermark (also, which watermark?)? >> >> >> On Tue, Apr 9, 2019 at 1:28 PM Pablo Estrada <pabl...@google.com> wrote: >> >>> I am experimenting with state / timers in Python. As I look at the >>> DoFnProcessParams[1], I see that it's possible for a DoFn to receive >>> several arguments (e.g. Timers, Side Inputs, etc). Also the Watermark via >>> WatermarkReporterParam. >>> >>> I see that this parameter is not handled by runners when filling up the >>> arguments for a DoFn[2][3]. So, as far as I can tell, DoFns are not >>> currently able to get the watermark. >>> >>> Is this a bug, or is it intentional? Perhaps there's another way to find >>> out the watermark for a DoFn? >>> >>> Best >>> -P. >>> >>> [1] >>> https://github.com/apache/beam/blob/master/sdks/python/apache_beam/transforms/core.py#L381-L390 >>> >>> [2] >>> https://github.com/apache/beam/blob/master/sdks/python/apache_beam/runners/common.py#L477-L488 >>> [3] >>> https://github.com/apache/beam/blob/master/sdks/python/apache_beam/runners/common.py#L605-L620 >>> >>