Yea, none of the state flavors will be merged. It used to be automatically done lazily, but someone removed the automation and made it eager, I think for performance reasons. It is done somewhat manually in triggering, and all the code is at https://github.com/apache/beam/blob/master/runners/core-java/src/main/java/org/apache/beam/runners/core/StateMerging.java
All that needs to happen is to invoke this (and the window merging logic) in the same step that is runner the DoFn. There's only a ticket for Dataflow: https://issues.apache.org/jira/browse/BEAM-2507 Kenn On Wed, Sep 27, 2017 at 5:53 PM, Thomas Weise <[email protected]> wrote: > Just to confirm: As of now none of the state flavors will be merged in a > session window, even when that could be done automatically, such as for > CombiningState (GroupingState)? > > I'm testing this with DirectRunner and no merging occurs. > > Is there an existing JIRA for this? > > Thanks, > Thomas > > > On Thu, Sep 14, 2017 at 9:50 AM, Kenneth Knowles <[email protected]> > wrote: > > > TL;DR: GroupingState should be just coding work to automate, while > merging > > ValueState probably needs a design doc > > > > You can automatically merge GroupingState, though it is merely implied - > if > > all the user can do is add(datum) then the definition of merged state is > > necessarily equivalent to calling add() with all the data added to > anything > > being merged. > > > > We could also consider an explicit @Merge method, but it is more likely > to > > violate associativity (just accidentally) and we are just a bit further > > from a good implementation. There are design decisions like whether to do > > it lazily or eagerly, how the runner decides, how we round trip through > the > > SDK harness, etc. > > > > Kenn > > > > On Wed, Sep 13, 2017 at 10:14 PM, Thomas Weise <[email protected]> wrote: > > > > > Since how the state is mutated depends on the DoFn implementation, > > wouldn't > > > this require the DoFn author to supply the merge logic? > > > > > > Something like > > > > > > @Merge > > > merge(@StateId("myCount") ValueState<Integer> merged, > @StateId("myCount") > > > ValueState<Integer>[] src) > > > > > > Without the support in Beam, what workaround could be suitable to mimic > > the > > > effect of a session window with stateful DoFn in user land? Perhaps > using > > > the global window and manage the gap in the DoFn? The problem is that > > > garbage collection would not occur with unlimited growth of state as > > > result. > > > > > > Thanks, > > > Thomas > > > > > > > > > > > > On Sat, Sep 9, 2017 at 3:10 PM, Kenneth Knowles <[email protected] > > > > > wrote: > > > > > > > It is likely that this may be runner specific, but may fit into the > > > > StatefulDoFnRunner. The approach of adding a tweaked GBK under the > hood > > > > (what the DirectRunner and Dataflow do) won't work; the ParDo needs > to > > > > re-run window merging on its own state. Also, I don't have an ready > > > answer > > > > for timers. > > > > > > > > On Fri, Sep 8, 2017 at 4:22 PM, Reuven Lax <[email protected] > > > > > > wrote: > > > > > > > > > Of course if you want to help make stateful DoFn work with session > > > > windows, > > > > > I'm sure that would be much appreciated :) > > > > > > > > > > On Fri, Sep 8, 2017 at 4:12 PM, Thomas Weise <[email protected]> > wrote: > > > > > > > > > > > Thanks, this first of all helped me finding a bug in my test > > > (assigning > > > > > the > > > > > > timestamps for the Create.of(...) case consistent with > TestStream. > > > > After > > > > > > this the result is as per what you suggest: > > > > > > > > > > > > Expected result with global or fixed window and stateful DoFn is > > not > > > > > > working with session window, unless elements have identical > > > timestamp. > > > > > > > > > > > > Thanks, > > > > > > Thomas > > > > > > > > > > > > > > > > > > On Fri, Sep 8, 2017 at 1:52 PM, Reuven Lax > > <[email protected] > > > > > > > > > > wrote: > > > > > > > > > > > > > I believe that stateful DoFn does not yet work with merging > > > windows. > > > > > This > > > > > > > is an open bug in Beam that should be fixed. > > > > > > > > > > > > > > On Fri, Sep 8, 2017 at 12:41 PM, Thomas Weise <[email protected]> > > > > wrote: > > > > > > > > > > > > > > > Hi, > > > > > > > > > > > > > > > > Is there a known limitation in TestStream that causes > stateful > > > DoFn > > > > > not > > > > > > > to > > > > > > > > work with windows (state is not tracked)? > > > > > > > > > > > > > > > > The pipeline is: > > > > > > > > > > > > > > > > TestStream (or Create.of(...)) -> Session Window -> > MapElements > > > -> > > > > > > > Stateful > > > > > > > > DoFn > > > > > > > > > > > > > > > > With TestStream, only when I remove the session window then > > state > > > > > > tracks. > > > > > > > > Session window functionality seems to works with other > > transforms > > > > > > though. > > > > > > > > > > > > > > > > With Create.of(...), pipeline including the window produces > > > > expected > > > > > > > > result. > > > > > > > > > > > > > > > > Just checking first.. I can extract relevant pieces into a > gist > > > if > > > > > what > > > > > > > I'm > > > > > > > > trying is supposed to work. > > > > > > > > > > > > > > > > Thanks, > > > > > > > > Thomas > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > >
