Once again this is a great help, thank you Kenneth

On Wed, Jan 8, 2020 at 3:03 PM Kenneth Knowles <[email protected]> wrote:

> Hmm. I've seen this manifest in some other tweaked versions of Sessions.
> Your invariants are right. In fact, the Nexmark queries have auctions that
> truncate in a similar way. This prompted
> https://issues.apache.org/jira/browse/BEAM-654.  I think we have not
> really nailed down the right spec for merging, and we certainly aren't
> enforcing it. To be robust, your merging should be associative and
> commutative, which means that you can't have an "end of session" event that
> contradicts a merge that occurred. OTOH I also know that Tyler has hacked
> window functions that split... it is mostly unexplored, semantically.
>
> About the error, this may help debug: The "state address windows" for a
> given merged window are all the windows that contribute to it. This means
> that when windows A and B merge to become a window AB, we can leave the
> accumulated state stored with A and B and just note that when we read from
> AB we actually have to read from both A and B*. So suppose windows A and B
> are about to merge. Before merge, the state address window map is:
>
> A -> [A]
> B -> [B]
>
> After merge, there a new window AB and "window to state address window"
> mapping
>
> AB -> [A, B]
>
> The error means that there is more than one merged window that will read
> data from a pre-merged window. So there is a situation like
>
> AB -> [A, B]
> BC -> [B, C]
>
> This is not intended to happen. It would be the consequence of B merging
> into two different new windows. Hence it is an internal error. Most likely
> a bug or a mismatch based on the assumptions. Note that this code/logic is
> shared by all runners. I do think you can write a WindowFn that induces it.
>
> Kenn
>
> *this was intended to be a performance optimization, but eagerly copying
> the data turned out faster so now it is a legacy compatibility thing that
> we could remove it I think, but changing this code is tricky
>
> On Tue, Jan 7, 2020 at 3:27 PM Aaron Dixon <[email protected]> wrote:
>
>> What I'm attempting is a variation on Session windows in which there may
>> exist a "terminal" element in the stream that immediately stops the session
>> (or perhaps after some configured delay.)
>>
>> My implementation behaves just like Sessions until any such "terminal"
>> element is encountered in which case I mark the window as "terminal" and
>> all windows "merge down" such that any terminal windows get to dictate the
>> Interval.end()/Window.maxTimestamp().
>>
>> So, trivial example, if I have windows W1 [0, 100) and W2 [50, 75,
>> terminal = true] then the merged result will be W3 [0, 75).
>>
>> I've been successful doing this so far but I've been inferring some
>> invariants about windows that I'm not sure are official or documented
>> anywhere.
>>
>> The invariants that I've inferred go like this:
>>
>> (I) Definition. An element is "in" window W if it originated in W or in a
>> window that was merged into W (, recursively.)
>>
>> (II) Invariant. Any element, e, in window W MUST have e.timestamp <=
>> W.maxTimestamp().
>>
>> So far, I think this is obvious and true stuff (I hope). (It would
>> actually be better or great if there was a way for II to not have to hold,
>> but that is a whole other separate discussion I think.)
>>
>> The main invariant I'm trying to formalize is one that allows me to
>> "merge down" -- i.e., to merge in such a way that the merged window's
>> (mergedResult's) maxTimestamp *is less than* one of the source's
>> (toBeMerged's) windows' maxTimestamp.
>>
>> The (undocumented?) invariant I've been working from goes something like
>> this:
>>
>> (III) Corollary. Windows W1 and W2 can merge such that either
>> maxTimestamp() is regressed (moved backward in time aka "merge down") in
>> the merged window -- however they cannot merge such that (II) is ever
>> violated.
>>
>> Is this correct?
>>
>> (If you can this can be confirmed, I'll go back and ensure I'm not
>> violating the merge() precondition and these invariants and post some code
>> if needed..) Thank you for assistance heere!
>>
>>
>> On Tue, Jan 7, 2020 at 4:21 PM Reuven Lax <[email protected]> wrote:
>>
>>> Have you used Dataflow's update feature on this pipeline? Also, do
>>> you have the code for your WindowFn?
>>>
>>> On Tue, Jan 7, 2020 at 12:05 PM Aaron Dixon <[email protected]> wrote:
>>>
>>>> Dataflow. (See stacktrace)
>>>>
>>>> On Tue, Jan 7, 2020 at 1:50 PM Reuven Lax <[email protected]> wrote:
>>>>
>>>>> Which runner are you using?
>>>>>
>>>>> On Tue, Jan 7, 2020, 11:17 AM Aaron Dixon <[email protected]> wrote:
>>>>>
>>>>>> I get an IllegalStateException "<window> is in more than one state
>>>>>> address window set" (stacktrace below).
>>>>>>
>>>>>> What does this mean? What invariant of custom window implementation
>>>>>> & merging am I violating?
>>>>>>
>>>>>> Thank you for any advise.
>>>>>>
>>>>>> ```
>>>>>> java.lang.IllegalStateException:
>>>>>> {[2019-12-05T01:36:48.870Z..2019-12-05T01:36:48.871Z),terminal} is in 
>>>>>> more
>>>>>> than one state address window set
>>>>>> at
>>>>>> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkState
>>>>>> (Preconditions.java:588)
>>>>>> at
>>>>>> org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.MergingActiveWindowSet.checkInvariants
>>>>>> (MergingActiveWindowSet.java:334)
>>>>>> at
>>>>>> org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.MergingActiveWindowSet.persist
>>>>>> (MergingActiveWindowSet.java:88)
>>>>>> at
>>>>>> org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.ReduceFnRunner.persist
>>>>>> (ReduceFnRunner.java:380)
>>>>>> at
>>>>>> org.apache.beam.runners.dataflow.worker.StreamingGroupAlsoByWindowViaWindowSetFn.processElement
>>>>>> (StreamingGroupAlsoByWindowViaWindowSetFn.java:96)
>>>>>> ...
>>>>>> ```
>>>>>>
>>>>>

Reply via email to