Mike Kaplinskiy created FLINK-12785:
---------------------------------------
Summary: RocksDB savepoint recovery can use a lot of unmanaged
memory
Key: FLINK-12785
URL: https://issues.apache.org/jira/browse/FLINK-12785
Project: Flink
Issue Type: Bug
Components: Runtime / State Backends
Reporter: Mike Kaplinskiy
I'm running an application that's backfilling data from Kafka. There's
approximately 3 years worth of data, with a lot of watermark skew (i.e. new
partitions were created over time) and I'm using daily windows. This makes a
lot of the windows buffer their contents before the watermark catches up to
"release" them. In turn, this gives me a lot of in-flight windows (200-300)
with very large state keys in rocksdb (on the order of 40-50mb per key).
Running the pipeline tends to be mostly fine - it's not terribly fast when
appends happen but everything works. The problem comes when doing a savepoint
restore - specifically, the taskmanagers eat ram until the kernel kills it due
to being out of memory. The extra memory isn't JVM heap since the memory usage
of the process is ~4x the -Xmx value and there aren't any {{OutOfMemoryError}}
exceptions.
I traced the culprit of the memory growth to
[RocksDBFullRestoreOperation.java#L212|https://github.com/apache/flink/blob/68910fa5381c8804ddbde3087a2481911ebd6d85/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/restore/RocksDBFullRestoreOperation.java#L212]
. Specifically, while the keys/values are deserialized on the Java heap,
{{RocksDBWriteBatchWrapper}} forwards it to RocksDB's {{WriteBatch}} which
buffers in unmanaged memory. That's not in itself an issue, but
{{RocksDBWriteBatchWrapper}} flushes only based on a number of records - not a
number of bytes in-flight. Specifically, {{RocksDBWriteBatchWrapper}} will
flush only once it has 500 records, and at 40mb per key, that's at least 20Gb
of managed memory before a flush.
My suggestion would be to add an additional flush criteria to
{{RocksDBWriteBatchWrapper}} - one based on {{batch.getDataSize()}} (e.g. 500
records or 5mb buffered). This way large key writes would be immediately
flushed to RocksDB on recovery or even writes. I applied this approach and I
was able to complete a savepoint restore for my jon. That said, I'm not
entirely sure what else this change would impact since I'm not very familiar
with Flink.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)