Thanks for debugging this Gabor, indeed a good catch. I am not so sure about surfacing it in the API though - it seems very specific for the session windowing case. I am also wondering whether maybe this should actually be the default behavior - if there are already empty windows for a group why not drop the previous states?
On Thu, May 28, 2015 at 3:01 PM, Gábor Gévay <gga...@gmail.com> wrote: > Hi, > > At Ericsson, we are implementing something similar to what the > SessionWindowing example does: > > There are events belonging to phone calls (sessions), and every event > has a call_id, which tells us which call it belongs to. At the end of > every call, a large event has to be emitted that contains some > aggregated information about the call. Furthermore, the events that > mark the end of the calls don't always reach our system, so the > sessions have to timeout, just like in the example. > > Therefore, I have experimented a bit with the SessionWindowing > example, and there is a problem: The trigger policy objects belonging > to already terminated sessions are kept in memory, and also > NotifyOnLastGlobalElement gets called on them. So, the application is > eating up more and more memory, and is also getting slower. > > I understand that Flink can't just simply discard all state belonging > to empty groups, as it has no way of knowing whether the user supplied > policy wants to trigger in the future (perhaps based on some state > collected before it first triggered). > > Therefore, I propose the following addition to the API: > WindowedDataStream would get a method called something like > dropEmptyGroups, by which the user could tell Flink to automatically > discard all state belonging to a group, when the window becomes empty. > > The implementation could look like the following: dropEmptyGroups() > would set a flag, and at the end of StreamDiscretizer.evict, if the > flag is true and bufferSize has just become 0, then this > StreamDiscretizer would be removed from the groupedDiscretizers map of > GroupedStreamDiscretizer. (StreamDiscretizer would need a new field > set at creation to have a reference to the GroupedStreamDiscretizer > that contains it.) (And GroupedStreamDiscretizer.makeNewGroup would > just run again if an element would later appear in a dropped group > (but this won't happen in this example).) What do you think? > > Best regards, > Gabor >