Reuven, can you please elaborate a little on that? Why do you need watermark per iteration? Letting the watermark progress as soon as all the keys arriving before the upstream watermark terminate the cycle seems like a valid definition without the need to make the watermark multidimensional. Yes, it introduces (possibly unbounded) latency in downstream processing, but that is something that should be probably expected. The unboundness of the latency can be limited by either fixed timeout or number of iterations.

On 6/23/21 8:39 PM, Reuven Lax wrote:
This was explored in the past, though the design started getting very complex (watermarks of unbounded dimension, where each iteration has its own watermark dimension). At the time, the exploration petered out.

On Wed, Jun 23, 2021 at 10:13 AM Jan Lukavský <[email protected] <mailto:[email protected]>> wrote:

    Hi,

    I'd like to discuss a very rough idea. I didn't walk through all the
    corner cases and the whole idea has a lot of rough edges, so
    please bear
    with me. I was thinking about non-IO applications of splittable DoFn,
    and the main idea - and why it is called splittable - is that it can
    handle unbounded outputs per element. Then I was thinking about
    what can
    generate unbounded outputs per element _without reading from external
    source_ (as that would be IO application) - and then I realized
    that the
    data can - at least theoretically - come from a downstream
    transform. It
    would have to be passed over an RPC (gRPC probably) connection, it
    would
    probably require some sort of service discovery - as the feedback
    loop
    would have to be correctly targeted based on key - and so on
    (those are
    the rough edges).

    But supposing this can be solved - what iterations actually mean
    is the
    we have a side channel, that come from downstream processing - and we
    need a watermark estimator for this channel, that is able to hold the
    watermark back until the very last element (at a certain watermark)
    finishes the iteration. The idea is then we could - in theory -
    create
    an Iteration PTransform, that would take another PTransform (probably
    something like PTransform<PCollection<KV<K, V>>, PCollection<KV<K,
    IterationResult<K, V>>>, where the IterationResult<K, V> would
    contain
    the original KV<K, V> and a stopping condition (true, false) and by
    creating the feedback loop from the output of this PCollection we
    could
    actually implement this without any need of support on the side of
    runners.

    Does that seem like something that might be worth exploring?

      Jan

Reply via email to