I think that a per-key watermark is not just consistent with the model, but also there's an argument to be made that it is the correct way to conceive of watermarks in Beam. The way we currently hold watermarks inside of ReduceFnRunner is via a WatermarkHold, which is set per-key. As a result, the downstream watermarks for any key can be meaningfully understood from the upstream watermark + the hold of only the key. When elements are rekeyed this gets much more complicated, of course - hence, I imagine, the concepts of a "key lineage" or merely key-preserving transforms.
I think as well it's reasonable to claim that in the absence of more information the output watermark of a key-unaware producer is the minimum of the union of all of its input watermarks + holds (effectively what the current watermark representations use) - but it's possible for downstream steps to gain information (e.g. the input watermark for a GroupByKey is positive infinity, the output watermark per-key can advance after those elements are output, independently of other keys). On Tue, Feb 28, 2017 at 1:58 PM, Ben Chambers <bchamb...@google.com.invalid> wrote: > Following up on what Kenn said, it seems like the idea of a per-key > watermark is logically consistent with Beam model. Each runner already > chooses how to approximate the model-level concept of the per-key > watermark. > > The list Kenn had could be extended with things ilke: > 4. A runner that assumes the watermark for all keys is at -\infty until all > data is processed and then jumps to \infty. This wolud be similar to a > Batch runner, for instance. > > How watermarks are tracked may even depend on the pipeline -- if there is > no shuffling of data between keys it may be easier to support a per-key > watermark. > > On Tue, Feb 28, 2017 at 1:50 PM Kenneth Knowles <k...@google.com.invalid> > wrote: > > > This is a really interesting topic. I think Beam is a good place to have > a > > broader cross-runner discussion of this. > > > > I know that you are not the only person skeptical due to > > trickiness/costliness of implementation. > > > > On the other hand, at a semantic level, I think any correct definition > will > > allow a per-PCollection watermark to serve as just a "very bad" per-key > > watermark. So I would target a definition whereby any of these is > > acceptable: > > > > 1. a runner with no awareness of per-key watermarks > > 2. a runner that chooses only sometimes to use them (there might be some > > user-facing cost/latency tradeoff a la triggers here) > > 3. a runner that chooses a coarse approximation for tracking key lineage > > > > Can you come up with an example where this is impossible? > > > > Developing advanced model features whether or not all runners do (or can) > > support them is exactly the spirit of Beam, to me, so I am really glad > you > > brought this up here. > > > > On Mon, Feb 27, 2017 at 5:59 AM, Aljoscha Krettek <aljos...@apache.org> > > wrote: > > > > > We recently started a discussion on the Flink ML about this topic: [1] > > > > > > The gist of it is that for some use cases tracking the watermark > per-key > > > instead of globally (or rather per partition) can be useful for some > > cases. > > > Think, for example, of tracking some user data off mobile phones where > > the > > > user-id/phone number is the key. For these cases, one slow key, i.e. a > > > disconnected phone, would slow down the watermark. > > > > > > I'm not saying that this is a good idea, I'm actually skeptical because > > the > > > implementation seems quite complicated/costly to me and I have some > > doubts > > > about being able to track the watermark per-key in the sources. It's > just > > > that more people seem to be asking about this lately and I would like > to > > > have a discussion with the clever Beam people because we claim to be at > > the > > > forefront of parallel data processing APIs. :-) > > > > > > Also note that I'm aware that in the example given above it would be > > > difficult to find out if one key is slow in the first place and > therefore > > > hold the watermark for that key. > > > > > > If you're interested, please have a look at the discussion on the Flink > > ML > > > that I linked to above. It's only 4 mails so far and Jamie gives a > nicer > > > explanation of the possible use cases than I'm doing here. Note, that > my > > > discussion of feasibility and APIs/key lineage should also apply to > Beam. > > > > > > What do you think? > > > > > > [1] > > > https://lists.apache.org/thread.html/2b90d5b1d5e2654212cfbbcc6510ef > > > 424bbafc4fadb164bd5aff9216@%3Cdev.flink.apache.org%3E > > > > > >