Yea, none of the state flavors will be merged. It used to be automatically
done lazily, but someone removed the automation and made it eager, I think
for performance reasons. It is done somewhat manually in triggering, and
all the code is at
https://github.com/apache/beam/blob/master/runners/core-java/src/main/java/org/apache/beam/runners/core/StateMerging.java

All that needs to happen is to invoke this (and the window merging logic)
in the same step that is runner the DoFn.

There's only a ticket for Dataflow:
https://issues.apache.org/jira/browse/BEAM-2507

Kenn


On Wed, Sep 27, 2017 at 5:53 PM, Thomas Weise <[email protected]> wrote:

> Just to confirm: As of now none of the state flavors will be merged in a
> session window, even when that could be done automatically, such as for
> CombiningState (GroupingState)?
>
> I'm testing this with DirectRunner and no merging occurs.
>
> Is there an existing JIRA for this?
>
> Thanks,
> Thomas
>
>
> On Thu, Sep 14, 2017 at 9:50 AM, Kenneth Knowles <[email protected]>
> wrote:
>
> > TL;DR: GroupingState should be just coding work to automate, while
> merging
> > ValueState probably needs a design doc
> >
> > You can automatically merge GroupingState, though it is merely implied -
> if
> > all the user can do is add(datum) then the definition of merged state is
> > necessarily equivalent to calling add() with all the data added to
> anything
> > being merged.
> >
> > We could also consider an explicit @Merge method, but it is more likely
> to
> > violate associativity (just accidentally) and we are just a bit further
> > from a good implementation. There are design decisions like whether to do
> > it lazily or eagerly, how the runner decides, how we round trip through
> the
> > SDK harness, etc.
> >
> > Kenn
> >
> > On Wed, Sep 13, 2017 at 10:14 PM, Thomas Weise <[email protected]> wrote:
> >
> > > Since how the state is mutated depends on the DoFn implementation,
> > wouldn't
> > > this require the DoFn author to supply the merge logic?
> > >
> > > Something like
> > >
> > > @Merge
> > > merge(@StateId("myCount") ValueState<Integer> merged,
> @StateId("myCount")
> > > ValueState<Integer>[] src)
> > >
> > > Without the support in Beam, what workaround could be suitable to mimic
> > the
> > > effect of a session window with stateful DoFn in user land? Perhaps
> using
> > > the global window and manage the gap in the DoFn? The problem is that
> > > garbage collection would not occur with unlimited growth of state as
> > > result.
> > >
> > > Thanks,
> > > Thomas
> > >
> > >
> > >
> > > On Sat, Sep 9, 2017 at 3:10 PM, Kenneth Knowles <[email protected]
> >
> > > wrote:
> > >
> > > > It is likely that this may be runner specific, but may fit into the
> > > > StatefulDoFnRunner. The approach of adding a tweaked GBK under the
> hood
> > > > (what the DirectRunner and Dataflow do) won't work; the ParDo needs
> to
> > > > re-run window merging on its own state. Also, I don't have an ready
> > > answer
> > > > for timers.
> > > >
> > > > On Fri, Sep 8, 2017 at 4:22 PM, Reuven Lax <[email protected]
> >
> > > > wrote:
> > > >
> > > > > Of course if you want to help make stateful DoFn work with session
> > > > windows,
> > > > > I'm sure that would be much appreciated  :)
> > > > >
> > > > > On Fri, Sep 8, 2017 at 4:12 PM, Thomas Weise <[email protected]>
> wrote:
> > > > >
> > > > > > Thanks, this first of all helped me finding a bug in my test
> > > (assigning
> > > > > the
> > > > > > timestamps for the Create.of(...) case consistent with
> TestStream.
> > > > After
> > > > > > this the result is as per what you suggest:
> > > > > >
> > > > > > Expected result with global or fixed window and stateful DoFn is
> > not
> > > > > > working with session window, unless elements have identical
> > > timestamp.
> > > > > >
> > > > > > Thanks,
> > > > > > Thomas
> > > > > >
> > > > > >
> > > > > > On Fri, Sep 8, 2017 at 1:52 PM, Reuven Lax
> > <[email protected]
> > > >
> > > > > > wrote:
> > > > > >
> > > > > > > I believe that stateful DoFn does not yet work with merging
> > > windows.
> > > > > This
> > > > > > > is an open bug in Beam that should be fixed.
> > > > > > >
> > > > > > > On Fri, Sep 8, 2017 at 12:41 PM, Thomas Weise <[email protected]>
> > > > wrote:
> > > > > > >
> > > > > > > > Hi,
> > > > > > > >
> > > > > > > > Is there a known limitation in TestStream that causes
> stateful
> > > DoFn
> > > > > not
> > > > > > > to
> > > > > > > > work with windows (state is not tracked)?
> > > > > > > >
> > > > > > > > The pipeline is:
> > > > > > > >
> > > > > > > > TestStream (or Create.of(...)) -> Session Window ->
> MapElements
> > > ->
> > > > > > > Stateful
> > > > > > > > DoFn
> > > > > > > >
> > > > > > > > With TestStream, only when I remove the session window then
> > state
> > > > > > tracks.
> > > > > > > > Session window functionality seems to works with other
> > transforms
> > > > > > though.
> > > > > > > >
> > > > > > > > With Create.of(...), pipeline including the window produces
> > > > expected
> > > > > > > > result.
> > > > > > > >
> > > > > > > > Just checking first.. I can extract relevant pieces into a
> gist
> > > if
> > > > > what
> > > > > > > I'm
> > > > > > > > trying is supposed to work.
> > > > > > > >
> > > > > > > > Thanks,
> > > > > > > > Thomas
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
>

Reply via email to