Since how the state is mutated depends on the DoFn implementation, wouldn't
this require the DoFn author to supply the merge logic?

Something like

@Merge
merge(@StateId("myCount") ValueState<Integer> merged, @StateId("myCount")
ValueState<Integer>[] src)

Without the support in Beam, what workaround could be suitable to mimic the
effect of a session window with stateful DoFn in user land? Perhaps using
the global window and manage the gap in the DoFn? The problem is that
garbage collection would not occur with unlimited growth of state as result.

Thanks,
Thomas



On Sat, Sep 9, 2017 at 3:10 PM, Kenneth Knowles <[email protected]>
wrote:

> It is likely that this may be runner specific, but may fit into the
> StatefulDoFnRunner. The approach of adding a tweaked GBK under the hood
> (what the DirectRunner and Dataflow do) won't work; the ParDo needs to
> re-run window merging on its own state. Also, I don't have an ready answer
> for timers.
>
> On Fri, Sep 8, 2017 at 4:22 PM, Reuven Lax <[email protected]>
> wrote:
>
> > Of course if you want to help make stateful DoFn work with session
> windows,
> > I'm sure that would be much appreciated  :)
> >
> > On Fri, Sep 8, 2017 at 4:12 PM, Thomas Weise <[email protected]> wrote:
> >
> > > Thanks, this first of all helped me finding a bug in my test (assigning
> > the
> > > timestamps for the Create.of(...) case consistent with TestStream.
> After
> > > this the result is as per what you suggest:
> > >
> > > Expected result with global or fixed window and stateful DoFn is not
> > > working with session window, unless elements have identical timestamp.
> > >
> > > Thanks,
> > > Thomas
> > >
> > >
> > > On Fri, Sep 8, 2017 at 1:52 PM, Reuven Lax <[email protected]>
> > > wrote:
> > >
> > > > I believe that stateful DoFn does not yet work with merging windows.
> > This
> > > > is an open bug in Beam that should be fixed.
> > > >
> > > > On Fri, Sep 8, 2017 at 12:41 PM, Thomas Weise <[email protected]>
> wrote:
> > > >
> > > > > Hi,
> > > > >
> > > > > Is there a known limitation in TestStream that causes stateful DoFn
> > not
> > > > to
> > > > > work with windows (state is not tracked)?
> > > > >
> > > > > The pipeline is:
> > > > >
> > > > > TestStream (or Create.of(...)) -> Session Window -> MapElements ->
> > > > Stateful
> > > > > DoFn
> > > > >
> > > > > With TestStream, only when I remove the session window then state
> > > tracks.
> > > > > Session window functionality seems to works with other transforms
> > > though.
> > > > >
> > > > > With Create.of(...), pipeline including the window produces
> expected
> > > > > result.
> > > > >
> > > > > Just checking first.. I can extract relevant pieces into a gist if
> > what
> > > > I'm
> > > > > trying is supposed to work.
> > > > >
> > > > > Thanks,
> > > > > Thomas
> > > > >
> > > >
> > >
> >
>

Reply via email to