[
https://issues.apache.org/jira/browse/FLINK-21413?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Jiayi Liao updated FLINK-21413:
-------------------------------
Description:
Take the #TtlMapState as an example,
{code:java}
public Map<UK, TtlValue<UV>> getUnexpiredOrNull(@Nonnull Map<UK, TtlValue<UV>>
ttlValue) {
Map<UK, TtlValue<UV>> unexpired = new HashMap<>();
TypeSerializer<TtlValue<UV>> valueSerializer =
((MapSerializer<UK, TtlValue<UV>>)
original.getValueSerializer()).getValueSerializer();
for (Map.Entry<UK, TtlValue<UV>> e : ttlValue.entrySet()) {
if (!expired(e.getValue())) {
// we have to do the defensive copy to update the value
unexpired.put(e.getKey(),
valueSerializer.copy(e.getValue()));
}
}
return ttlValue.size() == unexpired.size() ? ttlValue : unexpired;
}
{code}
The returned value will never be null and the #StateEntry will exists forever,
which leads to memory leak if the key's range of the stream is very large.
Below we can see that 20+ millison uncleared TtlStateMap could take up several
GB memory.
!image-2021-02-19-11-13-58-672.png!
was:
Take the #TtlMapState as an example,
{code:java}
public Map<UK, TtlValue<UV>> getUnexpiredOrNull(@Nonnull Map<UK, TtlValue<UV>>
ttlValue) {
Map<UK, TtlValue<UV>> unexpired = new HashMap<>();
TypeSerializer<TtlValue<UV>> valueSerializer =
((MapSerializer<UK, TtlValue<UV>>)
original.getValueSerializer()).getValueSerializer();
for (Map.Entry<UK, TtlValue<UV>> e : ttlValue.entrySet()) {
if (!expired(e.getValue())) {
// we have to do the defensive copy to update the value
unexpired.put(e.getKey(),
valueSerializer.copy(e.getValue()));
}
}
return ttlValue.size() == unexpired.size() ? ttlValue : unexpired;
}
{code}
The returned value will never be null and the #StateEntry will exists forever,
which leads to memory leak if the key's range of the stream is very large.
Below we can see that 20+ millison uncleared TtlStateMap could take up several
GB memory.
!image-2021-02-19-11-13-38-691.png!
> TtlMapState and TtlListState cannot be clean completely with Filesystem
> StateBackend
> ------------------------------------------------------------------------------------
>
> Key: FLINK-21413
> URL: https://issues.apache.org/jira/browse/FLINK-21413
> Project: Flink
> Issue Type: Bug
> Components: Runtime / State Backends
> Affects Versions: 1.9.0
> Reporter: Jiayi Liao
> Priority: Major
> Attachments: image-2021-02-19-11-13-58-672.png
>
>
> Take the #TtlMapState as an example,
>
> {code:java}
> public Map<UK, TtlValue<UV>> getUnexpiredOrNull(@Nonnull Map<UK,
> TtlValue<UV>> ttlValue) {
> Map<UK, TtlValue<UV>> unexpired = new HashMap<>();
> TypeSerializer<TtlValue<UV>> valueSerializer =
> ((MapSerializer<UK, TtlValue<UV>>)
> original.getValueSerializer()).getValueSerializer();
> for (Map.Entry<UK, TtlValue<UV>> e : ttlValue.entrySet()) {
> if (!expired(e.getValue())) {
> // we have to do the defensive copy to update the
> value
> unexpired.put(e.getKey(),
> valueSerializer.copy(e.getValue()));
> }
> }
> return ttlValue.size() == unexpired.size() ? ttlValue : unexpired;
> }
> {code}
>
> The returned value will never be null and the #StateEntry will exists
> forever, which leads to memory leak if the key's range of the stream is very
> large. Below we can see that 20+ millison uncleared TtlStateMap could take up
> several GB memory.
>
> !image-2021-02-19-11-13-58-672.png!
--
This message was sent by Atlassian Jira
(v8.3.4#803005)