BTW, the iterations might break the (otherwise very useful) concept that elements arriving ON_TIME should stay ON_TIME throughout the complete computation. If an element has an excessive amount of iterations to complete, it _could_ be output late even though it would have arrived ON_TIME. But that would only mean, we might need two timeouts - one for releasing the watermark hold and another for cancelling the iteration completely.

On 6/23/21 10:43 PM, Jan Lukavský wrote:

Right, one can "outsource" this functionality through external source, but that is a sort-of hackish solution. The most serious problem is that it "disconnects" the watermark of the feedback loop which can make it tricky to correctly compute the downstream watermark. The SDF approach seems to compute the watermark correctly (using per-key watermark hold until the end of the cycle).

On 6/23/21 10:25 PM, Luke Cwik wrote:
SDF isn't required as users already try to do things like this using UnboundedSource and Pubsub.

On Wed, Jun 23, 2021 at 11:39 AM Reuven Lax <re...@google.com <mailto:re...@google.com>> wrote:

    This was explored in the past, though the design started getting
    very complex (watermarks of unbounded dimension, where each
    iteration has its own watermark dimension). At the time, the
    exploration petered out.

    On Wed, Jun 23, 2021 at 10:13 AM Jan Lukavský <je...@seznam.cz
    <mailto:je...@seznam.cz>> wrote:

        Hi,

        I'd like to discuss a very rough idea. I didn't walk through
        all the
        corner cases and the whole idea has a lot of rough edges, so
        please bear
        with me. I was thinking about non-IO applications of
        splittable DoFn,
        and the main idea - and why it is called splittable - is that
        it can
        handle unbounded outputs per element. Then I was thinking
        about what can
        generate unbounded outputs per element _without reading from
        external
        source_ (as that would be IO application) - and then I
        realized that the
        data can - at least theoretically - come from a downstream
        transform. It
        would have to be passed over an RPC (gRPC probably)
        connection, it would
        probably require some sort of service discovery - as the
        feedback loop
        would have to be correctly targeted based on key - and so on
        (those are
        the rough edges).

        But supposing this can be solved - what iterations actually
        mean is the
        we have a side channel, that come from downstream processing
        - and we
        need a watermark estimator for this channel, that is able to
        hold the
        watermark back until the very last element (at a certain
        watermark)
        finishes the iteration. The idea is then we could - in theory
        - create
        an Iteration PTransform, that would take another PTransform
        (probably
        something like PTransform<PCollection<KV<K, V>>,
        PCollection<KV<K,
        IterationResult<K, V>>>, where the IterationResult<K, V>
        would contain
        the original KV<K, V> and a stopping condition (true, false)
        and by
        creating the feedback loop from the output of this
        PCollection we could
        actually implement this without any need of support on the
        side of runners.

        Does that seem like something that might be worth exploring?

          Jan

Reply via email to