For future reference (for anyone searching mailing list for this or similar
issue), the ticket Kenneth pointed to (
https://issues.apache.org/jira/browse/BEAM-654) is precisely the use case I
have -- "session windows with a terminal/stop event" For now I've opted to
use the State API (
https://beam.apache.org/blog/2017/02/13/stateful-processing.html) to
achieve my particularly special windowing needs as it yields a
straightforward implementation that I can prove correct (and for my use
cases distributing the windowing logic for a key is not a performance win.)

However I'm very interested in seeing how the windowing merge semantics get
nailed down / evolve, and to explore what kind of innovative stuff could be
ultimately done with them..

On Fri, Jan 10, 2020 at 4:38 PM Aaron Dixon <[email protected]> wrote:

> Once again this is a great help, thank you Kenneth
>
> On Wed, Jan 8, 2020 at 3:03 PM Kenneth Knowles <[email protected]> wrote:
>
>> Hmm. I've seen this manifest in some other tweaked versions of Sessions.
>> Your invariants are right. In fact, the Nexmark queries have auctions that
>> truncate in a similar way. This prompted
>> https://issues.apache.org/jira/browse/BEAM-654.  I think we have not
>> really nailed down the right spec for merging, and we certainly aren't
>> enforcing it. To be robust, your merging should be associative and
>> commutative, which means that you can't have an "end of session" event that
>> contradicts a merge that occurred. OTOH I also know that Tyler has hacked
>> window functions that split... it is mostly unexplored, semantically.
>>
>> About the error, this may help debug: The "state address windows" for a
>> given merged window are all the windows that contribute to it. This means
>> that when windows A and B merge to become a window AB, we can leave the
>> accumulated state stored with A and B and just note that when we read from
>> AB we actually have to read from both A and B*. So suppose windows A and B
>> are about to merge. Before merge, the state address window map is:
>>
>> A -> [A]
>> B -> [B]
>>
>> After merge, there a new window AB and "window to state address window"
>> mapping
>>
>> AB -> [A, B]
>>
>> The error means that there is more than one merged window that will read
>> data from a pre-merged window. So there is a situation like
>>
>> AB -> [A, B]
>> BC -> [B, C]
>>
>> This is not intended to happen. It would be the consequence of B merging
>> into two different new windows. Hence it is an internal error. Most likely
>> a bug or a mismatch based on the assumptions. Note that this code/logic is
>> shared by all runners. I do think you can write a WindowFn that induces it.
>>
>> Kenn
>>
>> *this was intended to be a performance optimization, but eagerly copying
>> the data turned out faster so now it is a legacy compatibility thing that
>> we could remove it I think, but changing this code is tricky
>>
>> On Tue, Jan 7, 2020 at 3:27 PM Aaron Dixon <[email protected]> wrote:
>>
>>> What I'm attempting is a variation on Session windows in which there may
>>> exist a "terminal" element in the stream that immediately stops the session
>>> (or perhaps after some configured delay.)
>>>
>>> My implementation behaves just like Sessions until any such "terminal"
>>> element is encountered in which case I mark the window as "terminal" and
>>> all windows "merge down" such that any terminal windows get to dictate the
>>> Interval.end()/Window.maxTimestamp().
>>>
>>> So, trivial example, if I have windows W1 [0, 100) and W2 [50, 75,
>>> terminal = true] then the merged result will be W3 [0, 75).
>>>
>>> I've been successful doing this so far but I've been inferring some
>>> invariants about windows that I'm not sure are official or documented
>>> anywhere.
>>>
>>> The invariants that I've inferred go like this:
>>>
>>> (I) Definition. An element is "in" window W if it originated in W or in
>>> a window that was merged into W (, recursively.)
>>>
>>> (II) Invariant. Any element, e, in window W MUST have e.timestamp <=
>>> W.maxTimestamp().
>>>
>>> So far, I think this is obvious and true stuff (I hope). (It would
>>> actually be better or great if there was a way for II to not have to hold,
>>> but that is a whole other separate discussion I think.)
>>>
>>> The main invariant I'm trying to formalize is one that allows me to
>>> "merge down" -- i.e., to merge in such a way that the merged window's
>>> (mergedResult's) maxTimestamp *is less than* one of the source's
>>> (toBeMerged's) windows' maxTimestamp.
>>>
>>> The (undocumented?) invariant I've been working from goes something like
>>> this:
>>>
>>> (III) Corollary. Windows W1 and W2 can merge such that either
>>> maxTimestamp() is regressed (moved backward in time aka "merge down") in
>>> the merged window -- however they cannot merge such that (II) is ever
>>> violated.
>>>
>>> Is this correct?
>>>
>>> (If you can this can be confirmed, I'll go back and ensure I'm not
>>> violating the merge() precondition and these invariants and post some code
>>> if needed..) Thank you for assistance heere!
>>>
>>>
>>> On Tue, Jan 7, 2020 at 4:21 PM Reuven Lax <[email protected]> wrote:
>>>
>>>> Have you used Dataflow's update feature on this pipeline? Also, do
>>>> you have the code for your WindowFn?
>>>>
>>>> On Tue, Jan 7, 2020 at 12:05 PM Aaron Dixon <[email protected]> wrote:
>>>>
>>>>> Dataflow. (See stacktrace)
>>>>>
>>>>> On Tue, Jan 7, 2020 at 1:50 PM Reuven Lax <[email protected]> wrote:
>>>>>
>>>>>> Which runner are you using?
>>>>>>
>>>>>> On Tue, Jan 7, 2020, 11:17 AM Aaron Dixon <[email protected]> wrote:
>>>>>>
>>>>>>> I get an IllegalStateException "<window> is in more than one state
>>>>>>> address window set" (stacktrace below).
>>>>>>>
>>>>>>> What does this mean? What invariant of custom window implementation
>>>>>>> & merging am I violating?
>>>>>>>
>>>>>>> Thank you for any advise.
>>>>>>>
>>>>>>> ```
>>>>>>> java.lang.IllegalStateException:
>>>>>>> {[2019-12-05T01:36:48.870Z..2019-12-05T01:36:48.871Z),terminal} is in 
>>>>>>> more
>>>>>>> than one state address window set
>>>>>>> at
>>>>>>> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkState
>>>>>>> (Preconditions.java:588)
>>>>>>> at
>>>>>>> org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.MergingActiveWindowSet.checkInvariants
>>>>>>> (MergingActiveWindowSet.java:334)
>>>>>>> at
>>>>>>> org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.MergingActiveWindowSet.persist
>>>>>>> (MergingActiveWindowSet.java:88)
>>>>>>> at
>>>>>>> org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.ReduceFnRunner.persist
>>>>>>> (ReduceFnRunner.java:380)
>>>>>>> at
>>>>>>> org.apache.beam.runners.dataflow.worker.StreamingGroupAlsoByWindowViaWindowSetFn.processElement
>>>>>>> (StreamingGroupAlsoByWindowViaWindowSetFn.java:96)
>>>>>>> ...
>>>>>>> ```
>>>>>>>
>>>>>>

Reply via email to