TL;DR: GroupingState should be just coding work to automate, while merging ValueState probably needs a design doc
You can automatically merge GroupingState, though it is merely implied - if all the user can do is add(datum) then the definition of merged state is necessarily equivalent to calling add() with all the data added to anything being merged. We could also consider an explicit @Merge method, but it is more likely to violate associativity (just accidentally) and we are just a bit further from a good implementation. There are design decisions like whether to do it lazily or eagerly, how the runner decides, how we round trip through the SDK harness, etc. Kenn On Wed, Sep 13, 2017 at 10:14 PM, Thomas Weise <[email protected]> wrote: > Since how the state is mutated depends on the DoFn implementation, wouldn't > this require the DoFn author to supply the merge logic? > > Something like > > @Merge > merge(@StateId("myCount") ValueState<Integer> merged, @StateId("myCount") > ValueState<Integer>[] src) > > Without the support in Beam, what workaround could be suitable to mimic the > effect of a session window with stateful DoFn in user land? Perhaps using > the global window and manage the gap in the DoFn? The problem is that > garbage collection would not occur with unlimited growth of state as > result. > > Thanks, > Thomas > > > > On Sat, Sep 9, 2017 at 3:10 PM, Kenneth Knowles <[email protected]> > wrote: > > > It is likely that this may be runner specific, but may fit into the > > StatefulDoFnRunner. The approach of adding a tweaked GBK under the hood > > (what the DirectRunner and Dataflow do) won't work; the ParDo needs to > > re-run window merging on its own state. Also, I don't have an ready > answer > > for timers. > > > > On Fri, Sep 8, 2017 at 4:22 PM, Reuven Lax <[email protected]> > > wrote: > > > > > Of course if you want to help make stateful DoFn work with session > > windows, > > > I'm sure that would be much appreciated :) > > > > > > On Fri, Sep 8, 2017 at 4:12 PM, Thomas Weise <[email protected]> wrote: > > > > > > > Thanks, this first of all helped me finding a bug in my test > (assigning > > > the > > > > timestamps for the Create.of(...) case consistent with TestStream. > > After > > > > this the result is as per what you suggest: > > > > > > > > Expected result with global or fixed window and stateful DoFn is not > > > > working with session window, unless elements have identical > timestamp. > > > > > > > > Thanks, > > > > Thomas > > > > > > > > > > > > On Fri, Sep 8, 2017 at 1:52 PM, Reuven Lax <[email protected] > > > > > > wrote: > > > > > > > > > I believe that stateful DoFn does not yet work with merging > windows. > > > This > > > > > is an open bug in Beam that should be fixed. > > > > > > > > > > On Fri, Sep 8, 2017 at 12:41 PM, Thomas Weise <[email protected]> > > wrote: > > > > > > > > > > > Hi, > > > > > > > > > > > > Is there a known limitation in TestStream that causes stateful > DoFn > > > not > > > > > to > > > > > > work with windows (state is not tracked)? > > > > > > > > > > > > The pipeline is: > > > > > > > > > > > > TestStream (or Create.of(...)) -> Session Window -> MapElements > -> > > > > > Stateful > > > > > > DoFn > > > > > > > > > > > > With TestStream, only when I remove the session window then state > > > > tracks. > > > > > > Session window functionality seems to works with other transforms > > > > though. > > > > > > > > > > > > With Create.of(...), pipeline including the window produces > > expected > > > > > > result. > > > > > > > > > > > > Just checking first.. I can extract relevant pieces into a gist > if > > > what > > > > > I'm > > > > > > trying is supposed to work. > > > > > > > > > > > > Thanks, > > > > > > Thomas > > > > > > > > > > > > > > > > > > > > >
