The problem is that it's not obvious how to merge timers. For any obvious
strategy, I think I can imagine a use case where it will be wrong.

I'm leaning to the conclusion that we're much better off providing an
onMerge callback, and letting the user explicitly handle the merging there.

Reuven

On Mon, Jul 1, 2019 at 1:04 AM Jan Lukavský <[email protected]> wrote:

> What are the issues you see with merging timers? Does it "only" suffer
> from the same issue as early emitting from merging windows (i.e. needs
> retractions to work correctly), or are there some other issues?
> On 6/28/19 3:48 PM, Reuven Lax wrote:
>
> We do have merging state. However merging timers is a bit more awkward. I
> now tend to think that we're better off providing an onMerge function and
> let the user handle this.
>
> On Fri, Jun 28, 2019, 11:06 AM Jan Lukavský <[email protected]> wrote:
>
>> Hi,
>>
>> during my implementation of @RequiresTimeSortedInput I found out, that
>> current runners do not support stateful DoFns on merging windows [1].
>> The question is why is that? One obvious reason seems to be, that
>> current definition of StateSpec doesn't define a state merge function,
>> which is necessary for merging windows to work. Actually, on Euphoria
>> [2] (originated separately, now merged in Beam) we had only two basic
>> operators (PTransforms) - FlatMap (stateless ParDo) and an operation we
>> called ReduceStateByKey [3] (not merged into Beam, as there were some
>> difficulties, one of which might be the missing support for merging
>> windows). All the others operations could be derived from these two. The
>> ReduceStateByKey (RSBK) operator was keyed operator (shuffle) with
>> following user defined function:
>>
>>   - state factory (roughly equivalent to declaring a StateSpec)
>>
>>   - state merge function (State1 + State2 = State3)
>>
>>   - state update function (State + Value = new State)
>>
>>   - and state extract function (State -> Output) -- actually called
>> after each element was added to the state
>>
>> Now if you look at this, this is essentially both Beam's Combine
>> operator and stateful ParDo. Moreove, GroupByKey is just RSBK with
>> BagState and append merge function. So, a question that come to mind,
>> what would happen if we add state merge function to StateSpec? I think
>> it can have the following benefits:
>>
>>   - Both Combine and GroupByKey can become derived operators (this is no
>> breaking change, as runners are always free to provide their override to
>> any PTransform)
>>
>>   - in batch, stateful ParDo can now be implemented more efficiently,
>> using Combine operation (as long, as doesn't @RequiresTimeSortedInput,
>> which is my favourite :))
>>
>>   - even in stream a combining approach to stateful pardo would be
>> possible (provided trigger would be AfterWatermark with no early
>> firings, and there will be no user timers)
>>
>>   - there is still a problem with merging windows on stateful DoFns,
>> which is early firings in general (that needs retractions, which is what
>> we first hit here [4], and solved by disabling early emitting from
>> merging windows)
>>
>> I'd really like to hear any comments on this.
>>
>> Jan
>>
>> [1]
>>
>> https://github.com/apache/beam/blob/1992cde69343b6e8bb5eea537182af3d036d155d/runners/core-java/src/main/java/org/apache/beam/runners/core/StatefulDoFnRunner.java#L71
>>
>> [2]
>> https://github.com/apache/beam/tree/master/sdks/java/extensions/euphoria
>>
>> [3]
>>
>> https://github.com/seznam/euphoria/blob/master/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/operator/ReduceStateByKey.java
>>
>> [4] https://github.com/seznam/euphoria/issues/43
>>
>>

Reply via email to