[ 
https://issues.apache.org/jira/browse/FLINK-12785?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Congxian Qiu(klion26) updated FLINK-12785:
------------------------------------------
    Release Note: 
Before FLINK-12785, user may encounter OOM if there are huge KV pairs when 
restoring from savepoint of RocksDB state backend. In FLINK-12785 we introduce 
a size limit in RocksDBWriteBatchWrapper with default value 2MB, and RocksDB's 
WriteBatch will flush if the consumed memory exceeds it. User could tune the 
limit through the state.backend.rocksdb.write-batch-size property in 
flink-conf.yaml if needed.
User can use `state.backend.rocksdb.write-batch-size` to change the size of 
WriteBatch if you needed. 

> RocksDB savepoint recovery can use a lot of unmanaged memory
> ------------------------------------------------------------
>
>                 Key: FLINK-12785
>                 URL: https://issues.apache.org/jira/browse/FLINK-12785
>             Project: Flink
>          Issue Type: Bug
>          Components: Runtime / State Backends
>            Reporter: Mike Kaplinskiy
>            Assignee: Congxian Qiu(klion26)
>            Priority: Major
>              Labels: pull-request-available
>             Fix For: 1.10.0
>
>          Time Spent: 20m
>  Remaining Estimate: 0h
>
> I'm running an application that's backfilling data from Kafka. There's 
> approximately 3 years worth of data, with a lot of watermark skew (i.e. new 
> partitions were created over time) and I'm using daily windows. This makes a 
> lot of the windows buffer their contents before the watermark catches up to 
> "release" them. In turn, this gives me a lot of in-flight windows (200-300) 
> with very large state keys in rocksdb (on the order of 40-50mb per key).
> Running the pipeline tends to be mostly fine - it's not terribly fast when 
> appends happen but everything works. The problem comes when doing a savepoint 
> restore - specifically, the taskmanagers eat ram until the kernel kills it 
> due to being out of memory. The extra memory isn't JVM heap since the memory 
> usage of the process is ~4x the -Xmx value and there aren't any 
> {{OutOfMemoryError}} exceptions.
> I traced the culprit of the memory growth to 
> [RocksDBFullRestoreOperation.java#L212|https://github.com/apache/flink/blob/68910fa5381c8804ddbde3087a2481911ebd6d85/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/restore/RocksDBFullRestoreOperation.java#L212]
>  . Specifically, while the keys/values are deserialized on the Java heap, 
> {{RocksDBWriteBatchWrapper}} forwards it to RocksDB's {{WriteBatch}} which 
> buffers in unmanaged memory. That's not in itself an issue, but 
> {{RocksDBWriteBatchWrapper}} flushes only based on a number of records - not 
> a number of bytes in-flight. Specifically, {{RocksDBWriteBatchWrapper}} will 
> flush only once it has 500 records, and at 40mb per key, that's at least 20Gb 
> of unmanaged memory before a flush.
> My suggestion would be to add an additional flush criteria to 
> {{RocksDBWriteBatchWrapper}} - one based on {{batch.getDataSize()}} (e.g. 500 
> records or 5mb buffered). This way large key writes would be immediately 
> flushed to RocksDB on recovery or even writes. I applied this approach and I 
> was able to complete a savepoint restore for my job. That said, I'm not 
> entirely sure what else this change would impact since I'm not very familiar 
> with Flink.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to