For future reference (for anyone searching mailing list for this or similar issue), the ticket Kenneth pointed to ( https://issues.apache.org/jira/browse/BEAM-654) is precisely the use case I have -- "session windows with a terminal/stop event" For now I've opted to use the State API ( https://beam.apache.org/blog/2017/02/13/stateful-processing.html) to achieve my particularly special windowing needs as it yields a straightforward implementation that I can prove correct (and for my use cases distributing the windowing logic for a key is not a performance win.)
However I'm very interested in seeing how the windowing merge semantics get nailed down / evolve, and to explore what kind of innovative stuff could be ultimately done with them.. On Fri, Jan 10, 2020 at 4:38 PM Aaron Dixon <[email protected]> wrote: > Once again this is a great help, thank you Kenneth > > On Wed, Jan 8, 2020 at 3:03 PM Kenneth Knowles <[email protected]> wrote: > >> Hmm. I've seen this manifest in some other tweaked versions of Sessions. >> Your invariants are right. In fact, the Nexmark queries have auctions that >> truncate in a similar way. This prompted >> https://issues.apache.org/jira/browse/BEAM-654. I think we have not >> really nailed down the right spec for merging, and we certainly aren't >> enforcing it. To be robust, your merging should be associative and >> commutative, which means that you can't have an "end of session" event that >> contradicts a merge that occurred. OTOH I also know that Tyler has hacked >> window functions that split... it is mostly unexplored, semantically. >> >> About the error, this may help debug: The "state address windows" for a >> given merged window are all the windows that contribute to it. This means >> that when windows A and B merge to become a window AB, we can leave the >> accumulated state stored with A and B and just note that when we read from >> AB we actually have to read from both A and B*. So suppose windows A and B >> are about to merge. Before merge, the state address window map is: >> >> A -> [A] >> B -> [B] >> >> After merge, there a new window AB and "window to state address window" >> mapping >> >> AB -> [A, B] >> >> The error means that there is more than one merged window that will read >> data from a pre-merged window. So there is a situation like >> >> AB -> [A, B] >> BC -> [B, C] >> >> This is not intended to happen. It would be the consequence of B merging >> into two different new windows. Hence it is an internal error. Most likely >> a bug or a mismatch based on the assumptions. Note that this code/logic is >> shared by all runners. I do think you can write a WindowFn that induces it. >> >> Kenn >> >> *this was intended to be a performance optimization, but eagerly copying >> the data turned out faster so now it is a legacy compatibility thing that >> we could remove it I think, but changing this code is tricky >> >> On Tue, Jan 7, 2020 at 3:27 PM Aaron Dixon <[email protected]> wrote: >> >>> What I'm attempting is a variation on Session windows in which there may >>> exist a "terminal" element in the stream that immediately stops the session >>> (or perhaps after some configured delay.) >>> >>> My implementation behaves just like Sessions until any such "terminal" >>> element is encountered in which case I mark the window as "terminal" and >>> all windows "merge down" such that any terminal windows get to dictate the >>> Interval.end()/Window.maxTimestamp(). >>> >>> So, trivial example, if I have windows W1 [0, 100) and W2 [50, 75, >>> terminal = true] then the merged result will be W3 [0, 75). >>> >>> I've been successful doing this so far but I've been inferring some >>> invariants about windows that I'm not sure are official or documented >>> anywhere. >>> >>> The invariants that I've inferred go like this: >>> >>> (I) Definition. An element is "in" window W if it originated in W or in >>> a window that was merged into W (, recursively.) >>> >>> (II) Invariant. Any element, e, in window W MUST have e.timestamp <= >>> W.maxTimestamp(). >>> >>> So far, I think this is obvious and true stuff (I hope). (It would >>> actually be better or great if there was a way for II to not have to hold, >>> but that is a whole other separate discussion I think.) >>> >>> The main invariant I'm trying to formalize is one that allows me to >>> "merge down" -- i.e., to merge in such a way that the merged window's >>> (mergedResult's) maxTimestamp *is less than* one of the source's >>> (toBeMerged's) windows' maxTimestamp. >>> >>> The (undocumented?) invariant I've been working from goes something like >>> this: >>> >>> (III) Corollary. Windows W1 and W2 can merge such that either >>> maxTimestamp() is regressed (moved backward in time aka "merge down") in >>> the merged window -- however they cannot merge such that (II) is ever >>> violated. >>> >>> Is this correct? >>> >>> (If you can this can be confirmed, I'll go back and ensure I'm not >>> violating the merge() precondition and these invariants and post some code >>> if needed..) Thank you for assistance heere! >>> >>> >>> On Tue, Jan 7, 2020 at 4:21 PM Reuven Lax <[email protected]> wrote: >>> >>>> Have you used Dataflow's update feature on this pipeline? Also, do >>>> you have the code for your WindowFn? >>>> >>>> On Tue, Jan 7, 2020 at 12:05 PM Aaron Dixon <[email protected]> wrote: >>>> >>>>> Dataflow. (See stacktrace) >>>>> >>>>> On Tue, Jan 7, 2020 at 1:50 PM Reuven Lax <[email protected]> wrote: >>>>> >>>>>> Which runner are you using? >>>>>> >>>>>> On Tue, Jan 7, 2020, 11:17 AM Aaron Dixon <[email protected]> wrote: >>>>>> >>>>>>> I get an IllegalStateException "<window> is in more than one state >>>>>>> address window set" (stacktrace below). >>>>>>> >>>>>>> What does this mean? What invariant of custom window implementation >>>>>>> & merging am I violating? >>>>>>> >>>>>>> Thank you for any advise. >>>>>>> >>>>>>> ``` >>>>>>> java.lang.IllegalStateException: >>>>>>> {[2019-12-05T01:36:48.870Z..2019-12-05T01:36:48.871Z),terminal} is in >>>>>>> more >>>>>>> than one state address window set >>>>>>> at >>>>>>> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkState >>>>>>> (Preconditions.java:588) >>>>>>> at >>>>>>> org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.MergingActiveWindowSet.checkInvariants >>>>>>> (MergingActiveWindowSet.java:334) >>>>>>> at >>>>>>> org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.MergingActiveWindowSet.persist >>>>>>> (MergingActiveWindowSet.java:88) >>>>>>> at >>>>>>> org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.ReduceFnRunner.persist >>>>>>> (ReduceFnRunner.java:380) >>>>>>> at >>>>>>> org.apache.beam.runners.dataflow.worker.StreamingGroupAlsoByWindowViaWindowSetFn.processElement >>>>>>> (StreamingGroupAlsoByWindowViaWindowSetFn.java:96) >>>>>>> ... >>>>>>> ``` >>>>>>> >>>>>>
