Thanks for debugging this Gabor, indeed a good catch.

I am not so sure about surfacing it in the API though - it seems very
specific for the session windowing case. I am also wondering whether maybe
this should actually be the default behavior - if there are already empty
windows for a group why not drop the previous states?

On Thu, May 28, 2015 at 3:01 PM, Gábor Gévay <gga...@gmail.com> wrote:

> Hi,
>
> At Ericsson, we are implementing something similar to what the
> SessionWindowing example does:
>
> There are events belonging to phone calls (sessions), and every event
> has a call_id, which tells us which call it belongs to. At the end of
> every call, a large event has to be emitted that contains some
> aggregated information about the call. Furthermore, the events that
> mark the end of the calls don't always reach our system, so the
> sessions have to timeout, just like in the example.
>
> Therefore, I have experimented a bit with the SessionWindowing
> example, and there is a problem: The trigger policy objects belonging
> to already terminated sessions are kept in memory, and also
> NotifyOnLastGlobalElement gets called on them. So, the application is
> eating up more and more memory, and is also getting slower.
>
> I understand that Flink can't just simply discard all state belonging
> to empty groups, as it has no way of knowing whether the user supplied
> policy wants to trigger in the future (perhaps based on some state
> collected before it first triggered).
>
> Therefore, I propose the following addition to the API:
> WindowedDataStream would get a method called something like
> dropEmptyGroups, by which the user could tell Flink to automatically
> discard all state belonging to a group, when the window becomes empty.
>
> The implementation could look like the following: dropEmptyGroups()
> would set a flag, and at the end of StreamDiscretizer.evict, if the
> flag is true and bufferSize has just become 0, then this
> StreamDiscretizer would be removed from the groupedDiscretizers map of
> GroupedStreamDiscretizer. (StreamDiscretizer would need a new field
> set at creation to have a reference to the GroupedStreamDiscretizer
> that contains it.) (And GroupedStreamDiscretizer.makeNewGroup would
> just run again if an element would later appear in a dropped group
> (but this won't happen in this example).) What do you think?
>
> Best regards,
> Gabor
>

Reply via email to