What I'm attempting is a variation on Session windows in which there may
exist a "terminal" element in the stream that immediately stops the session
(or perhaps after some configured delay.)

My implementation behaves just like Sessions until any such "terminal"
element is encountered in which case I mark the window as "terminal" and
all windows "merge down" such that any terminal windows get to dictate the
Interval.end()/Window.maxTimestamp().

So, trivial example, if I have windows W1 [0, 100) and W2 [50, 75, terminal
= true] then the merged result will be W3 [0, 75).

I've been successful doing this so far but I've been inferring some
invariants about windows that I'm not sure are official or documented
anywhere.

The invariants that I've inferred go like this:

(I) Definition. An element is "in" window W if it originated in W or in a
window that was merged into W (, recursively.)

(II) Invariant. Any element, e, in window W MUST have e.timestamp <=
W.maxTimestamp().

So far, I think this is obvious and true stuff (I hope). (It would actually
be better or great if there was a way for II to not have to hold, but that
is a whole other separate discussion I think.)

The main invariant I'm trying to formalize is one that allows me to "merge
down" -- i.e., to merge in such a way that the merged window's
(mergedResult's) maxTimestamp *is less than* one of the source's
(toBeMerged's) windows' maxTimestamp.

The (undocumented?) invariant I've been working from goes something like
this:

(III) Corollary. Windows W1 and W2 can merge such that either
maxTimestamp() is regressed (moved backward in time aka "merge down") in
the merged window -- however they cannot merge such that (II) is ever
violated.

Is this correct?

(If you can this can be confirmed, I'll go back and ensure I'm not
violating the merge() precondition and these invariants and post some code
if needed..) Thank you for assistance heere!


On Tue, Jan 7, 2020 at 4:21 PM Reuven Lax <[email protected]> wrote:

> Have you used Dataflow's update feature on this pipeline? Also, do
> you have the code for your WindowFn?
>
> On Tue, Jan 7, 2020 at 12:05 PM Aaron Dixon <[email protected]> wrote:
>
>> Dataflow. (See stacktrace)
>>
>> On Tue, Jan 7, 2020 at 1:50 PM Reuven Lax <[email protected]> wrote:
>>
>>> Which runner are you using?
>>>
>>> On Tue, Jan 7, 2020, 11:17 AM Aaron Dixon <[email protected]> wrote:
>>>
>>>> I get an IllegalStateException "<window> is in more than one state
>>>> address window set" (stacktrace below).
>>>>
>>>> What does this mean? What invariant of custom window implementation
>>>> & merging am I violating?
>>>>
>>>> Thank you for any advise.
>>>>
>>>> ```
>>>> java.lang.IllegalStateException:
>>>> {[2019-12-05T01:36:48.870Z..2019-12-05T01:36:48.871Z),terminal} is in more
>>>> than one state address window set
>>>> at
>>>> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkState
>>>> (Preconditions.java:588)
>>>> at
>>>> org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.MergingActiveWindowSet.checkInvariants
>>>> (MergingActiveWindowSet.java:334)
>>>> at
>>>> org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.MergingActiveWindowSet.persist
>>>> (MergingActiveWindowSet.java:88)
>>>> at
>>>> org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.ReduceFnRunner.persist
>>>> (ReduceFnRunner.java:380)
>>>> at
>>>> org.apache.beam.runners.dataflow.worker.StreamingGroupAlsoByWindowViaWindowSetFn.processElement
>>>> (StreamingGroupAlsoByWindowViaWindowSetFn.java:96)
>>>> ...
>>>> ```
>>>>
>>>

Reply via email to