This is an automated email from the ASF dual-hosted git repository.
snuyanzin pushed a commit to branch release-1.20
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.20 by this push:
new f31fae4b0f5 [FLINK-39753][state/rocksdb] Close ColumnFamilyOptions
from getDescriptor() in Compactor
f31fae4b0f5 is described below
commit f31fae4b0f5eb922898880e5dfe2a99a14359a3b
Author: Keith Lee <[email protected]>
AuthorDate: Sun May 31 18:48:43 2026 +0100
[FLINK-39753][state/rocksdb] Close ColumnFamilyOptions from getDescriptor()
in Compactor
ColumnFamilyHandle.getDescriptor() allocates a new native
ColumnFamilyOptions
on every call and does not close it, preventing the shared block cache from
being freed. Wrap the call in try-with-resources so the options are closed
after reading numLevels().
---
.../apache/flink/contrib/streaming/state/sstmerge/Compactor.java | 9 ++++++++-
1 file changed, 8 insertions(+), 1 deletion(-)
diff --git
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/sstmerge/Compactor.java
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/sstmerge/Compactor.java
index 8471182e759..cf00e5317a3 100644
---
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/sstmerge/Compactor.java
+++
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/sstmerge/Compactor.java
@@ -19,6 +19,7 @@
package org.apache.flink.contrib.streaming.state.sstmerge;
import org.rocksdb.ColumnFamilyHandle;
+import org.rocksdb.ColumnFamilyOptions;
import org.rocksdb.CompactionJobInfo;
import org.rocksdb.CompactionOptions;
import org.rocksdb.RocksDB;
@@ -51,7 +52,13 @@ class Compactor {
}
void compact(ColumnFamilyHandle cfName, int level, List<String> files)
throws RocksDBException {
- int outputLevel = Math.min(level + 1,
cfName.getDescriptor().getOptions().numLevels() - 1);
+ // FLINK-39753: getDescriptor() allocates a new native
ColumnFamilyOptions on every call.
+ // Closing it is required to release the native object and its
reference to the shared
+ // block cache; leaking it keeps the LRUCache alive and grows native
memory until OOM.
+ final int outputLevel;
+ try (ColumnFamilyOptions cfOptions =
cfName.getDescriptor().getOptions()) {
+ outputLevel = Math.min(level + 1, cfOptions.numLevels() - 1);
+ }
LOG.debug(
"Manually compacting {} files from level {} to {}: {}",
files.size(),