What I'm attempting is a variation on Session windows in which there may exist a "terminal" element in the stream that immediately stops the session (or perhaps after some configured delay.)
My implementation behaves just like Sessions until any such "terminal" element is encountered in which case I mark the window as "terminal" and all windows "merge down" such that any terminal windows get to dictate the Interval.end()/Window.maxTimestamp(). So, trivial example, if I have windows W1 [0, 100) and W2 [50, 75, terminal = true] then the merged result will be W3 [0, 75). I've been successful doing this so far but I've been inferring some invariants about windows that I'm not sure are official or documented anywhere. The invariants that I've inferred go like this: (I) Definition. An element is "in" window W if it originated in W or in a window that was merged into W (, recursively.) (II) Invariant. Any element, e, in window W MUST have e.timestamp <= W.maxTimestamp(). So far, I think this is obvious and true stuff (I hope). (It would actually be better or great if there was a way for II to not have to hold, but that is a whole other separate discussion I think.) The main invariant I'm trying to formalize is one that allows me to "merge down" -- i.e., to merge in such a way that the merged window's (mergedResult's) maxTimestamp *is less than* one of the source's (toBeMerged's) windows' maxTimestamp. The (undocumented?) invariant I've been working from goes something like this: (III) Corollary. Windows W1 and W2 can merge such that either maxTimestamp() is regressed (moved backward in time aka "merge down") in the merged window -- however they cannot merge such that (II) is ever violated. Is this correct? (If you can this can be confirmed, I'll go back and ensure I'm not violating the merge() precondition and these invariants and post some code if needed..) Thank you for assistance heere! On Tue, Jan 7, 2020 at 4:21 PM Reuven Lax <[email protected]> wrote: > Have you used Dataflow's update feature on this pipeline? Also, do > you have the code for your WindowFn? > > On Tue, Jan 7, 2020 at 12:05 PM Aaron Dixon <[email protected]> wrote: > >> Dataflow. (See stacktrace) >> >> On Tue, Jan 7, 2020 at 1:50 PM Reuven Lax <[email protected]> wrote: >> >>> Which runner are you using? >>> >>> On Tue, Jan 7, 2020, 11:17 AM Aaron Dixon <[email protected]> wrote: >>> >>>> I get an IllegalStateException "<window> is in more than one state >>>> address window set" (stacktrace below). >>>> >>>> What does this mean? What invariant of custom window implementation >>>> & merging am I violating? >>>> >>>> Thank you for any advise. >>>> >>>> ``` >>>> java.lang.IllegalStateException: >>>> {[2019-12-05T01:36:48.870Z..2019-12-05T01:36:48.871Z),terminal} is in more >>>> than one state address window set >>>> at >>>> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkState >>>> (Preconditions.java:588) >>>> at >>>> org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.MergingActiveWindowSet.checkInvariants >>>> (MergingActiveWindowSet.java:334) >>>> at >>>> org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.MergingActiveWindowSet.persist >>>> (MergingActiveWindowSet.java:88) >>>> at >>>> org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.ReduceFnRunner.persist >>>> (ReduceFnRunner.java:380) >>>> at >>>> org.apache.beam.runners.dataflow.worker.StreamingGroupAlsoByWindowViaWindowSetFn.processElement >>>> (StreamingGroupAlsoByWindowViaWindowSetFn.java:96) >>>> ... >>>> ``` >>>> >>>
