We have discovered a fairly serious memory leak
in DefaultOperatorStateBackend, with broadcast (union) list states.
The problem seems to occur when a broadcast state name is changed, in order
to drop some state (intentionally).
Flink does not drop the "garbage" broadcast state, and keeps snapshotting,
broadcasting, multiplying it exponentially at every savepoint/restore cycle.
With high enough parallelism this can easily lead to small states (few
bytes) growing to several gigs and more over a few restarts eventually
leading to a very bad crash restart cycle where TMs OOM in a few secs.
Basically 2 things seems to be missing, garbage collection of unreferenced
operator states (they are eagerly restored into memory). And probably lazy
restore would also be nice :)
We run Flink 1.4.0 but 1.4.1 seems to be affected as well, haven't checked
the latest master.
Could someone please confirm that this behaviour is not as intended?