Hi,
I think one improvement that could be made to avoid this situation is the
broadcast state should probably not be rebroadcasted if it was not read.

The problem here is that our program logic ensures that this growing of
state does not occur when it accesses the state, but since we dont to that
after changing the name it just explodes by the rebroadcasts.

What do you think?

Gyula

Stefan Richter <s.rich...@data-artisans.com> ezt írta (időpont: 2018. febr.
22., Cs, 12:26):

> I had a quick look at it, and we could do that, even for RocksDB: the
> method does a meta data lookup similar to what state registration does,
> remove the meta data and drop the column family. But until then, there is
> currently no complete dropping a keyed state.
>
> > Am 22.02.2018 um 12:19 schrieb Stefan Richter <
> s.rich...@data-artisans.com>:
> >
> > Right now, I don’t think there is a way of doing that. I don’t think
> there is something fundament against having a method that drops a state
> complete, data and registered meta data. But so far that never existed and
> it seems nobody ever needed it (or asked for it at least). The closest
> thing for keyed state is calling the „clear()“ method, but the meta data
> will stick.
> >
> >> Am 22.02.2018 um 12:06 schrieb Gyula Fóra <gyula.f...@gmail.com>:
> >>
> >> Do you have any suggestion how to completely delete an operator and
> keyed
> >> state?
> >> For operator state this seems to be easy enough, but what about
> completely
> >> dropping a keyed state?
> >>
> >> Gyula
> >>
> >> Stefan Richter <s.rich...@data-artisans.com> ezt írta (időpont: 2018.
> febr.
> >> 22., Cs, 11:46):
> >>
> >>>
> >>> Hi,
> >>>
> >>> I don’t think that this is a bug, but rather a necessity that comes
> with
> >>> the (imo questionable) design of allowing lazy state registration. In
> this
> >>> design, just because a state is *currently* not registered does not
> mean
> >>> that you can simply drop it. Imagine that your code did *not yet*
> >>> re-register a state, but could still do so in the future. If a
> >>> checkpoint/recovery happens in between, all data for that state would
> >>> suddenly be lost, just because by chance the state was not registered
> „fast
> >>> enough“. As I see it, the proper way is the register the state under
> the
> >>> same name and clear it if you want to get rid of the data. There is
> >>> currently no call that explicitly drops a state that was once
> declared, and
> >>> you might make a case that this is a feature to have for the future.
> Then
> >>> again, we need a general decision about lazy and eager state IMO.
> >>>
> >>> Best,
> >>> Stefan
> >>>
> >>>> Am 22.02.2018 um 11:10 schrieb Gyula Fóra <gyula.f...@gmail.com>:
> >>>>
> >>>> Hi all,
> >>>>
> >>>> We have discovered a fairly serious memory leak
> >>>> in DefaultOperatorStateBackend, with broadcast (union) list states.
> >>>>
> >>>> The problem seems to occur when a broadcast state name is changed, in
> >>> order
> >>>> to drop some state (intentionally).
> >>>>
> >>>> Flink does not drop the "garbage" broadcast state, and keeps
> >>> snapshotting,
> >>>> broadcasting, multiplying it exponentially at every savepoint/restore
> >>> cycle.
> >>>>
> >>>> With high enough parallelism this can easily lead to small states (few
> >>>> bytes) growing to several gigs and more over a few restarts eventually
> >>>> leading to a very bad crash restart cycle where TMs OOM in a few secs.
> >>>>
> >>>> Basically 2 things seems to be missing, garbage collection of
> >>> unreferenced
> >>>> operator states (they are eagerly restored into memory). And probably
> >>> lazy
> >>>> restore would also be nice :)
> >>>>
> >>>> We run Flink 1.4.0 but 1.4.1 seems to be affected as well, haven't
> >>> checked
> >>>> the latest master.
> >>>>
> >>>> Could someone please confirm that this behaviour is not as intended?
> >>>>
> >>>> Cheers,
> >>>> Gyula
> >>>
> >>>
> >
>
>

Reply via email to