Once again this is a great help, thank you Kenneth On Wed, Jan 8, 2020 at 3:03 PM Kenneth Knowles <[email protected]> wrote:
> Hmm. I've seen this manifest in some other tweaked versions of Sessions. > Your invariants are right. In fact, the Nexmark queries have auctions that > truncate in a similar way. This prompted > https://issues.apache.org/jira/browse/BEAM-654. I think we have not > really nailed down the right spec for merging, and we certainly aren't > enforcing it. To be robust, your merging should be associative and > commutative, which means that you can't have an "end of session" event that > contradicts a merge that occurred. OTOH I also know that Tyler has hacked > window functions that split... it is mostly unexplored, semantically. > > About the error, this may help debug: The "state address windows" for a > given merged window are all the windows that contribute to it. This means > that when windows A and B merge to become a window AB, we can leave the > accumulated state stored with A and B and just note that when we read from > AB we actually have to read from both A and B*. So suppose windows A and B > are about to merge. Before merge, the state address window map is: > > A -> [A] > B -> [B] > > After merge, there a new window AB and "window to state address window" > mapping > > AB -> [A, B] > > The error means that there is more than one merged window that will read > data from a pre-merged window. So there is a situation like > > AB -> [A, B] > BC -> [B, C] > > This is not intended to happen. It would be the consequence of B merging > into two different new windows. Hence it is an internal error. Most likely > a bug or a mismatch based on the assumptions. Note that this code/logic is > shared by all runners. I do think you can write a WindowFn that induces it. > > Kenn > > *this was intended to be a performance optimization, but eagerly copying > the data turned out faster so now it is a legacy compatibility thing that > we could remove it I think, but changing this code is tricky > > On Tue, Jan 7, 2020 at 3:27 PM Aaron Dixon <[email protected]> wrote: > >> What I'm attempting is a variation on Session windows in which there may >> exist a "terminal" element in the stream that immediately stops the session >> (or perhaps after some configured delay.) >> >> My implementation behaves just like Sessions until any such "terminal" >> element is encountered in which case I mark the window as "terminal" and >> all windows "merge down" such that any terminal windows get to dictate the >> Interval.end()/Window.maxTimestamp(). >> >> So, trivial example, if I have windows W1 [0, 100) and W2 [50, 75, >> terminal = true] then the merged result will be W3 [0, 75). >> >> I've been successful doing this so far but I've been inferring some >> invariants about windows that I'm not sure are official or documented >> anywhere. >> >> The invariants that I've inferred go like this: >> >> (I) Definition. An element is "in" window W if it originated in W or in a >> window that was merged into W (, recursively.) >> >> (II) Invariant. Any element, e, in window W MUST have e.timestamp <= >> W.maxTimestamp(). >> >> So far, I think this is obvious and true stuff (I hope). (It would >> actually be better or great if there was a way for II to not have to hold, >> but that is a whole other separate discussion I think.) >> >> The main invariant I'm trying to formalize is one that allows me to >> "merge down" -- i.e., to merge in such a way that the merged window's >> (mergedResult's) maxTimestamp *is less than* one of the source's >> (toBeMerged's) windows' maxTimestamp. >> >> The (undocumented?) invariant I've been working from goes something like >> this: >> >> (III) Corollary. Windows W1 and W2 can merge such that either >> maxTimestamp() is regressed (moved backward in time aka "merge down") in >> the merged window -- however they cannot merge such that (II) is ever >> violated. >> >> Is this correct? >> >> (If you can this can be confirmed, I'll go back and ensure I'm not >> violating the merge() precondition and these invariants and post some code >> if needed..) Thank you for assistance heere! >> >> >> On Tue, Jan 7, 2020 at 4:21 PM Reuven Lax <[email protected]> wrote: >> >>> Have you used Dataflow's update feature on this pipeline? Also, do >>> you have the code for your WindowFn? >>> >>> On Tue, Jan 7, 2020 at 12:05 PM Aaron Dixon <[email protected]> wrote: >>> >>>> Dataflow. (See stacktrace) >>>> >>>> On Tue, Jan 7, 2020 at 1:50 PM Reuven Lax <[email protected]> wrote: >>>> >>>>> Which runner are you using? >>>>> >>>>> On Tue, Jan 7, 2020, 11:17 AM Aaron Dixon <[email protected]> wrote: >>>>> >>>>>> I get an IllegalStateException "<window> is in more than one state >>>>>> address window set" (stacktrace below). >>>>>> >>>>>> What does this mean? What invariant of custom window implementation >>>>>> & merging am I violating? >>>>>> >>>>>> Thank you for any advise. >>>>>> >>>>>> ``` >>>>>> java.lang.IllegalStateException: >>>>>> {[2019-12-05T01:36:48.870Z..2019-12-05T01:36:48.871Z),terminal} is in >>>>>> more >>>>>> than one state address window set >>>>>> at >>>>>> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkState >>>>>> (Preconditions.java:588) >>>>>> at >>>>>> org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.MergingActiveWindowSet.checkInvariants >>>>>> (MergingActiveWindowSet.java:334) >>>>>> at >>>>>> org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.MergingActiveWindowSet.persist >>>>>> (MergingActiveWindowSet.java:88) >>>>>> at >>>>>> org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.ReduceFnRunner.persist >>>>>> (ReduceFnRunner.java:380) >>>>>> at >>>>>> org.apache.beam.runners.dataflow.worker.StreamingGroupAlsoByWindowViaWindowSetFn.processElement >>>>>> (StreamingGroupAlsoByWindowViaWindowSetFn.java:96) >>>>>> ... >>>>>> ``` >>>>>> >>>>>
