Most of the theory is particularly well-treated in "Timely Dataflow" and
"Differential Dataflow". There is a brief summary of the latter at
https://blog.acolyer.org/2015/06/17/differential-dataflow/ but I recommend
actually reading both papers. It uses clock ticks rather than Beam's
continuous style of watermark, but I don't think this changes the general
approach.

There are very few implementations of watermark-correct cycles AFAIK. For
Beam runners where the watermark is simulated (for example using Spark's
state) we could possibly implement at the Beam layer. For engines where the
Beam watermark is implemented more directly (for example Dataflow & Flink)
there would be a lot of added complexity, probably performance loss, if it
could be done at all.

Kenn

On Wed, Jun 23, 2021 at 3:31 PM Reuven Lax <re...@google.com> wrote:

>
>
> On Wed, Jun 23, 2021 at 2:33 PM Jan Lukavský <je...@seznam.cz> wrote:
>
>> On 6/23/21 11:13 PM, Reuven Lax wrote:
>>
>>
>>
>> On Wed, Jun 23, 2021 at 2:00 PM Jan Lukavský <je...@seznam.cz> wrote:
>>
>>> The most qualitatively import use-case I see are ACID transactions -
>>> transactions naturally involve cycles, because the most natural
>>> implementation would be of something like "optimistic locking" where the
>>> transaction is allowed to progress until a downstream "commit" sees a
>>> conflict, when it needs to return the transaction back to the beginning
>>> (pretty much the same as how git resolves conflict in a push).
>>>
>> True, however within a transform one could use timers to implement this
>> (there are currently some bugs around looping timers I believe, but those
>> are easier to fix than implementing a brand new programming model).
>> Iterative is only really necessary if you need to iterate an entire
>> subgraph, including GroupByKeys, etc.
>>
>> There necessarily needs to be GBK to support transactions. If we take the
>> most prominent example of a transaction - moving some amount of cash
>> between two bank accounts - we can have a state for the current amount of
>> cash per user. This state would be keyed, request to transfer some amount
>> would come as a atomic event "move amount X from user A to user B". This
>> would lead to updates of state in keyed state of A and B, but we don't know
>> if A or B have the required amount on their account. And this is where we
>> need to do two GBKs - one will assign a sequential ID to each transaction
>> (that is a serialized order) and the other downstream will verify that the
>> result was computed from the correct data.
>>
>
> Fair point. This could be done with state/timers in an
> eventually-consistent way (though not fully ACID) by simply sending
> messages. However in these sorts of workflow scenarios, the need for back
> edges will probably come up regardless (e.g. if a failure happens and you
> want to cancel, you might need a back edge to tell the previous key to
> cancel).
>
> However I'm still not convinced watermarks are needed.
>
>> This is maybe too complex to describe in short, but there definitely has
>> to be GBK (actually GroupAll operation) downstream and a cycle when a
>> post-condition fails.
>>
>>
>>
>>> Another application would be graph algorithms on changing graphs, where
>>> adding or removing an edge might trigger an iterative algorithm on the
>>> graph (and I'm absolutely not sure that the discussed approach can do that,
>>> this is just something, that would be cool to do :)).
>>>
>> Yes, that's what I had in mind. I'm just not sure that these algorithms
>> lend themselves to windowing. I.e. if we added iterative support, but did
>> not have support for windowing or watermarks across iterations, have we
>> solved most of the problem?
>>
>> I don't think there is any windowing involved. When a new road is built
>> between cities A and B it _immediately_ makes traveling between these two
>> cities faster. There is no discrete boundary.
>>
>> I don't understand why we would drop support for watermarks - they would
>> be perfectly supported, every iteration key will have a watermark hold that
>> would be released when the key finished iterating - or was terminated due
>> to timeout. I'm not sure if windowing as such plays any role in this, but
>> maybe can.
>>
>
> You'd have to make sure things don't deadlock. If a step inside the
> transform that was being iterated had an event-time timer, what triggers
> that timer? If that timer is triggered by the watermark of the previous
> step and that watermark is being held up by the entire iteration, then this
> timer will never fire and the whole transform could deadlock. This was one
> reason for multi-dimensional watermarks - the timer can fire based on the
> watermark from the previous iterations, and so never deadlocks (though
> figuring out how to efficiently implement watermarks of unbounded
> dimensionality might be difficult).
>
>
>
>
>> YOn 6/23/21 10:53 PM, Reuven Lax wrote:
>>>
>>> One question I have is whether the use cases for cyclic graphs overlap
>>> substantially with the use cases for event-time watermarks. Many of the
>>> uses I'm aware of are ML-type algorithms (e.g. clustering) or iterative
>>> algorithms on large graphs (connected components, etc.), and it's unclear
>>> how many of these problems have a natural time component.
>>>
>>> On Wed, Jun 23, 2021 at 1:49 PM Jan Lukavský <je...@seznam.cz> wrote:
>>>
>>>> Reuven, can you please elaborate a little on that? Why do you need
>>>> watermark per iteration? Letting the watermark progress as soon as all the
>>>> keys arriving before the upstream watermark terminate the cycle seems like
>>>> a valid definition without the need to make the watermark multidimensional.
>>>> Yes, it introduces (possibly unbounded) latency in downstream processing,
>>>> but that is something that should be probably expected. The unboundness of
>>>> the latency can be limited by either fixed timeout or number of iterations.
>>>> On 6/23/21 8:39 PM, Reuven Lax wrote:
>>>>
>>>> This was explored in the past, though the design started getting very
>>>> complex (watermarks of unbounded dimension, where each iteration has its
>>>> own watermark dimension). At the time, the exploration petered out.
>>>>
>>>> On Wed, Jun 23, 2021 at 10:13 AM Jan Lukavský <je...@seznam.cz> wrote:
>>>>
>>>>> Hi,
>>>>>
>>>>> I'd like to discuss a very rough idea. I didn't walk through all the
>>>>> corner cases and the whole idea has a lot of rough edges, so please
>>>>> bear
>>>>> with me. I was thinking about non-IO applications of splittable DoFn,
>>>>> and the main idea - and why it is called splittable - is that it can
>>>>> handle unbounded outputs per element. Then I was thinking about what
>>>>> can
>>>>> generate unbounded outputs per element _without reading from external
>>>>> source_ (as that would be IO application) - and then I realized that
>>>>> the
>>>>> data can - at least theoretically - come from a downstream transform.
>>>>> It
>>>>> would have to be passed over an RPC (gRPC probably) connection, it
>>>>> would
>>>>> probably require some sort of service discovery - as the feedback loop
>>>>> would have to be correctly targeted based on key - and so on (those
>>>>> are
>>>>> the rough edges).
>>>>>
>>>>> But supposing this can be solved - what iterations actually mean is
>>>>> the
>>>>> we have a side channel, that come from downstream processing - and we
>>>>> need a watermark estimator for this channel, that is able to hold the
>>>>> watermark back until the very last element (at a certain watermark)
>>>>> finishes the iteration. The idea is then we could - in theory - create
>>>>> an Iteration PTransform, that would take another PTransform (probably
>>>>> something like PTransform<PCollection<KV<K, V>>, PCollection<KV<K,
>>>>> IterationResult<K, V>>>, where the IterationResult<K, V> would contain
>>>>> the original KV<K, V> and a stopping condition (true, false) and by
>>>>> creating the feedback loop from the output of this PCollection we
>>>>> could
>>>>> actually implement this without any need of support on the side of
>>>>> runners.
>>>>>
>>>>> Does that seem like something that might be worth exploring?
>>>>>
>>>>>   Jan
>>>>>
>>>>>

Reply via email to