[
https://issues.apache.org/jira/browse/FLINK-39753?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Keith Lee updated FLINK-39753:
------------------------------
Description:
h3. Summary
In
[Compactor.java#L54|https://github.com/apache/flink/blob/f3ffe4e1e1bc8e833e86509c9f22d45290beb6a6/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/state/rocksdb/sstmerge/Compactor.java#L54],
ColumnFamilyHandle.getDescriptor() allocates a new native ColumnFamilyOptions
on every JNI call (similar class of bug as FLINK-21986)
This causes two leaks:
1. Descriptor native objects
2. Whole LRUCache leak during even when all tasks are stopped on task manager.
If compactor has ran (compaction for SST files at >L0), the native memory used
by LRUCache is never freed as as these descriptors transitively hold reference
to LRU cache (or rather, increment reference count on shared pointer causing
non-zero reference count and preventing older shared blockcache from being
cleaned up)
Here's the full chain with file paths and line numbers on how LRUCache leaks:
{quote}Compactor.compact()
flink-state-backends/.../sstmerge/Compactor.java:58
│ cfName.getDescriptor().getOptions()
▼
ColumnFamilyHandle.getDescriptor() → JNI call
frocksdb/java/src/main/java/org/rocksdb/ColumnFamilyHandle.java:91
│
▼
Java_org_rocksdb_ColumnFamilyHandle_getDescriptor()
frocksdb/java/rocksjni/columnfamilyhandle.cc:52
│ cfh->GetDescriptor(&desc)
│ → ColumnFamilyDescriptorJni::construct(env, &desc)
▼
ColumnFamilyDescriptorJni::construct()
frocksdb/java/rocksjni/portal.h:6602
│ ColumnFamilyOptionsJni::construct(env, &(cfd->options))
▼
ColumnFamilyOptionsJni::construct()
frocksdb/java/rocksjni/portal.h:2884
│ auto* cfo = new ColumnFamilyOptions(*cfoptions); ← C++ COPY CONSTRUCTOR
▼
ColumnFamilyOptions (the copy)
frocksdb/include/rocksdb/options.h:303
│ std::shared_ptr<TableFactory> table_factory; ← shared_ptr copied,
refcount++
▼
BlockBasedTableFactory (same instance, not a copy)
frocksdb/table/block_based/block_based_table_factory.h:93
│ BlockBasedTableOptions table_options_; ← value member of
the factory
▼
BlockBasedTableOptions
frocksdb/include/rocksdb/table.h:266
│ std::shared_ptr<Cache> block_cache; ← the LRUCache
▼
LRUCache (the single shared instance)
flink-state-backends/.../rocksdb/RocksDBMemoryControllerUtils.java:127
new LRUCache(cacheCapacity, -1, false, highPriorityPoolRatio);
{quote} #
## Verification of OOMKill and fix
The fix for this issue is to close the options object as follows:
{{{}---
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/state/rocksdb/sstmerge/Compactor.java{}}}{{{}+++
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/state/rocksdb/sstmerge/Compactor.java{}}}
{{@@ -19,6 +19,7 @@}}
{{ package org.apache.flink.state.rocksdb.sstmerge;}}
{{ }}
{{ import org.rocksdb.ColumnFamilyHandle;}}
{{+import org.rocksdb.ColumnFamilyOptions;}}
{{ import org.rocksdb.CompactionJobInfo;}}
{{ import org.rocksdb.CompactionOptions;}}
{{ import org.rocksdb.RocksDB;}}
{{@@ -51,7 +52,15 @@ class Compactor {}}
{{ }}}
{{ }}
{{ void compact(ColumnFamilyHandle cfName, int level, List<String> files)
throws RocksDBException {}}
{{- int outputLevel = Math.min(level + 1,
cfName.getDescriptor().getOptions().numLevels() - 1);}}
{{+ // ColumnFamilyHandle.getDescriptor() allocates a new native
ColumnFamilyOptions on every}}
{{+ // call (it copies the column family's options across JNI) and does
not close it. Leaking}}
{{+ // it also bumps the reference count on the shared block cache's
shared_ptr, preventing the}}
{{+ // cache from ever being freed (same class of leak as FLINK-21986).
Close it once we have}}
{{+ // read numLevels().}}
{{+ final int outputLevel;}}
{{+ try (ColumnFamilyOptions cfOptions =
cfName.getDescriptor().getOptions()) {}}
{{+ outputLevel = Math.min(level + 1, cfOptions.numLevels() - 1);}}
{{+ }}}
{{ LOG.debug(}}
{{ "Manually compacting {} files from level {} to {}: {}",}}
{{ files.size(),}}
Two sets of Flink SQL JOB were ran on local clusters. After 6+ hours, 8 task
managers without fix have working set RSS > 3.4 GB. With the fix, highest RSS
is at 2.8 GB
Without closing of , TM with high RSS have ~1.54GB in ReadBlockContents,
exceeding configured 833MB capacity.
{quote}— Cumulative View —
Using local file /usr/local/lib/[libjemalloc.so|http://libjemalloc.so/].
Argument "MSWin32" isn't numeric in numeric eq (==) at /usr/local/bin/jeprof
line 5222.
Argument "linux" isn't numeric in numeric eq (==) at /usr/local/bin/jeprof line
5222.
Using local file /tmp/jeprof/prof.293.3194.i3194.heap.
Total: [2280777636|tel:2280777636] B
0 0.0% 0.0% [2255548014|tel:2255548014] 98.9% je_prof_backtrace
0 0.0% 0.0% [2255548014|tel:2255548014] 98.9% je_prof_tctx_create
[2255548014|tel:2255548014] 98.9% 98.9% [2255548014|tel:2255548014] 98.9%
prof_backtrace_impl
0 0.0% 98.9% [2247187497|tel:2247187497] 98.5% je_malloc_default
0 0.0% 98.9% [1694877743|tel:1694877743] 74.3% void* fallback_impl
0 0.0% 98.9% [1655889046|tel:1655889046] 72.6%
rocksdb::BlockFetcher::ReadBlockContents
0 0.0% 98.9% [1573497612|tel:1573497612] 69.0%
rocksdb::BlockBasedTable::NewDataBlockIterator
0 0.0% 98.9% [1573201192|tel:1573201192] 69.0%
rocksdb::BlockBasedTable::MaybeReadBlockAndLoadToCache@6eeb40
0 0.0% 98.9% [1573201192|tel:1573201192] 69.0%
rocksdb::BlockBasedTable::RetrieveBlock@6ef610
0 0.0% 98.9% [1510010892|tel:1510010892] 66.2%
rocksdb::BlockBasedTableIterator::InitDataBlock
0 0.0% 98.9% [1414778240|tel:1414778240] 62.0%
rocksdb::AtomicGroupReadBuffer::Clear
0 0.0% 98.9% [1097112297|tel:1097112297] 48.1% 0x00007fb693d7cc01
0 0.0% 98.9% [1097112297|tel:1097112297] 48.1%
Java_org_rocksdb_RocksIterator_seek0
0 0.0% 98.9% [1097112297|tel:1097112297] 48.1% rocksdb::DBIter::Seek
0 0.0% 98.9% [1095159981|tel:1095159981] 48.0%
rocksdb::BlockBasedTableIterator::SeekImpl
0 0.0% 98.9% [1095024770|tel:1095024770] 48.0%
rocksdb::MergingIterator::Seek
0 0.0% 98.9% [1095024770|tel:1095024770] 48.0%
rocksdb::MergingIterator::SeekImpl
0 0.0% 98.9% [651438249|tel:651438249] 28.6%
rocksdb::UncompressBlockData
0 0.0% 98.9% [651438249|tel:651438249] 28.6%
rocksdb::UncompressSerializedBlock
0 0.0% 98.9% [519018670|tel:519018670] 22.8%
rocksdb::BlockBasedTableIterator::FindBlockForward
0 0.0% 98.9% [438749002|tel:438749002] 19.2%
rocksdb::BlockBasedTableIterator::Next
0 0.0% 98.9% [438749002|tel:438749002] 19.2%
rocksdb::BlockBasedTableIterator::NextAndGetResult
0 0.0% 98.9% [438587793|tel:438587793] 19.2%
rocksdb::MergingIterator::Next
0 0.0% 98.9% [438587793|tel:438587793] 19.2%
rocksdb::MergingIterator::NextAndGetResult
0 0.0% 98.9% [436500266|tel:436500266] 19.1% 0x00007fb693d82c67
0 0.0% 98.9% [436500266|tel:436500266] 19.1% rocksdb::DBIter::Next
0 0.0% 98.9% [356557773|tel:356557773] 15.6% os::malloc@d01a60
0 0.0% 98.9% [352586303|tel:352586303] 15.5% Unsafe_AllocateMemory0
0 0.0% 98.9% [352497944|tel:352497944] 15.5% 0x00007fb692fe77e0
0 0.0% 98.9% [142878042|tel:142878042] 6.3% os::malloc@d01ca0
0 0.0% 98.9% [136630920|tel:136630920] 6.0% AllocateHeap@4327f0
0 0.0% 98.9% [102717059|tel:102717059] 4.5% _{_}GI{_}__clone3
0 0.0% 98.9% [102717059|tel:102717059] 4.5% start_thread
0 0.0% 98.9% [92210334|tel:92210334] 4.0%
KlassFactory::create_from_stream
...
{quote}
With fix, TM with highest RSS have ~800MB in ReadBlockContents, consistent with
configured 833MB capacity.
{quote}— Cumulative View —
Using local file /usr/local/lib/[libjemalloc.so|http://libjemalloc.so/].
Argument "MSWin32" isn't numeric in numeric eq (==) at /usr/local/bin/jeprof
line 5222.
Argument "linux" isn't numeric in numeric eq (==) at /usr/local/bin/jeprof line
5222.
Using local file /tmp/jeprof/prof.290.8922.i8922.heap.
Total: [1416132765|tel:1416132765] B
0 0.0% 0.0% [1391278564|tel:1391278564] 98.2% je_prof_backtrace
0 0.0% 0.0% [1391278564|tel:1391278564] 98.2% je_prof_tctx_create
[1391278564|tel:1391278564] 98.2% 98.2% [1391278564|tel:1391278564] 98.2%
prof_backtrace_impl
0 0.0% 98.2% [1382556219|tel:1382556219] 97.6% je_malloc_default
0 0.0% 98.2% [856531103|tel:856531103] 60.5% void* fallback_impl
0 0.0% 98.2% [820714114|tel:820714114] 58.0%
rocksdb::BlockFetcher::ReadBlockContents
0 0.0% 98.2% [810027091|tel:810027091] 57.2%
rocksdb::BlockBasedTable::NewDataBlockIterator
0 0.0% 98.2% [809508356|tel:809508356] 57.2%
rocksdb::BlockBasedTable::RetrieveBlock@6ef610
0 0.0% 98.2% [809438634|tel:809438634] 57.2%
rocksdb::BlockBasedTable::MaybeReadBlockAndLoadToCache@6eeb40
0 0.0% 98.2% [767651473|tel:767651473] 54.2%
rocksdb::BlockBasedTableIterator::InitDataBlock
0 0.0% 98.2% [734287999|tel:734287999] 51.9% 0x00007fce45a7b781
0 0.0% 98.2% [734287999|tel:734287999] 51.9%
Java_org_rocksdb_RocksIterator_seek0
0 0.0% 98.2% [734287999|tel:734287999] 51.9%
rocksdb::BlockBasedTableIterator::SeekImpl
0 0.0% 98.2% [734287999|tel:734287999] 51.9% rocksdb::DBIter::Seek
0 0.0% 98.2% [734287999|tel:734287999] 51.9%
rocksdb::MergingIterator::Seek
0 0.0% 98.2% [734287999|tel:734287999] 51.9%
rocksdb::MergingIterator::SeekImpl
0 0.0% 98.2% [733060136|tel:733060136] 51.8%
rocksdb::AtomicGroupReadBuffer::Clear
0 0.0% 98.2% [353073142|tel:353073142] 24.9% os::malloc@d01a60
0 0.0% 98.2% [349535104|tel:349535104] 24.7% Unsafe_AllocateMemory0
0 0.0% 98.2% [349464287|tel:349464287] 24.7% 0x00007fce455e6160
0 0.0% 98.2% [136836241|tel:136836241] 9.7% os::malloc@d01ca0
0 0.0% 98.2% [135269945|tel:135269945] 9.6%
rocksdb::UncompressBlockData
0 0.0% 98.2% [135269945|tel:135269945] 9.6%
rocksdb::UncompressSerializedBlock
0 0.0% 98.2% [130278866|tel:130278866] 9.2% AllocateHeap@4327f0
0 0.0% 98.2% [124409905|tel:124409905] 8.8%
rocksdb::BlockBasedTableIterator::FindBlockForward
0 0.0% 98.2% [91613963|tel:91613963] 6.5%
KlassFactory::create_from_stream
...
{quote}
See here for reproduction steps to trigger OOMKill by starting, stopping Flink
job multiple times:
[https://github.com/leekeiabstraction/flink/tree/sst-merge-getdescriptor-leak-repro/sst-leak-repro]
(note, the configuration in these reproduction have higher managed memory
fraction to more quickly trigger OOMKill).
was:
h3. Summary
In
[Compactor.java#L54|https://github.com/apache/flink/blob/f3ffe4e1e1bc8e833e86509c9f22d45290beb6a6/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/state/rocksdb/sstmerge/Compactor.java#L54],
ColumnFamilyHandle.getDescriptor() allocates a new native ColumnFamilyOptions
on every JNI call FLINK-21986
This causes two leaks:
1. Descriptor native objects
2. Whole LRUCache leak during even when all tasks are stopped on task manager.
If compactor has ran (compaction for SST files at >L0), the native memory used
by LRUCache is never freed as as these descriptors transitively hold reference
to LRU cache (or rather, increment reference count on shared pointer causing
non-zero reference count and preventing older shared blockcache from being
cleaned up)
Here's the full chain with file paths and line numbers on how LRUCache leaks:
{quote}Compactor.compact()
flink-state-backends/.../sstmerge/Compactor.java:58
│ cfName.getDescriptor().getOptions()
▼
ColumnFamilyHandle.getDescriptor() → JNI call
frocksdb/java/src/main/java/org/rocksdb/ColumnFamilyHandle.java:91
│
▼
Java_org_rocksdb_ColumnFamilyHandle_getDescriptor()
frocksdb/java/rocksjni/columnfamilyhandle.cc:52
│ cfh->GetDescriptor(&desc)
│ → ColumnFamilyDescriptorJni::construct(env, &desc)
▼
ColumnFamilyDescriptorJni::construct()
frocksdb/java/rocksjni/portal.h:6602
│ ColumnFamilyOptionsJni::construct(env, &(cfd->options))
▼
ColumnFamilyOptionsJni::construct()
frocksdb/java/rocksjni/portal.h:2884
│ auto* cfo = new ColumnFamilyOptions(*cfoptions); ← C++ COPY CONSTRUCTOR
▼
ColumnFamilyOptions (the copy)
frocksdb/include/rocksdb/options.h:303
│ std::shared_ptr<TableFactory> table_factory; ← shared_ptr copied,
refcount++
▼
BlockBasedTableFactory (same instance, not a copy)
frocksdb/table/block_based/block_based_table_factory.h:93
│ BlockBasedTableOptions table_options_; ← value member of
the factory
▼
BlockBasedTableOptions
frocksdb/include/rocksdb/table.h:266
│ std::shared_ptr<Cache> block_cache; ← the LRUCache
▼
LRUCache (the single shared instance)
flink-state-backends/.../rocksdb/RocksDBMemoryControllerUtils.java:127
new LRUCache(cacheCapacity, -1, false, highPriorityPoolRatio);
{quote}
## Verification of OOMKill and fix
The fix for this issue is to close the options object as follows:
{{{}---
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/state/rocksdb/sstmerge/Compactor.java{}}}{{{}+++
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/state/rocksdb/sstmerge/Compactor.java{}}}
{{@@ -19,6 +19,7 @@}}
{{ package org.apache.flink.state.rocksdb.sstmerge;}}
{{ }}
{{ import org.rocksdb.ColumnFamilyHandle;}}
{{+import org.rocksdb.ColumnFamilyOptions;}}
{{ import org.rocksdb.CompactionJobInfo;}}
{{ import org.rocksdb.CompactionOptions;}}
{{ import org.rocksdb.RocksDB;}}
{{@@ -51,7 +52,15 @@ class Compactor {}}
{{ }}}
{{ }}
{{ void compact(ColumnFamilyHandle cfName, int level, List<String> files)
throws RocksDBException {}}
{{- int outputLevel = Math.min(level + 1,
cfName.getDescriptor().getOptions().numLevels() - 1);}}
{{+ // ColumnFamilyHandle.getDescriptor() allocates a new native
ColumnFamilyOptions on every}}
{{+ // call (it copies the column family's options across JNI) and does
not close it. Leaking}}
{{+ // it also bumps the reference count on the shared block cache's
shared_ptr, preventing the}}
{{+ // cache from ever being freed (same class of leak as FLINK-21986).
Close it once we have}}
{{+ // read numLevels().}}
{{+ final int outputLevel;}}
{{+ try (ColumnFamilyOptions cfOptions =
cfName.getDescriptor().getOptions()) {}}
{{+ outputLevel = Math.min(level + 1, cfOptions.numLevels() - 1);}}
{{+ }}}
{{ LOG.debug(}}
{{ "Manually compacting {} files from level {} to {}: {}",}}
{{ files.size(),}}
Two sets of Flink SQL JOB were ran on local clusters. After 6+ hours, 8 task
managers without fix have working set RSS > 3.4 GB. With the fix, highest RSS
is at 2.8 GB
Without closing of , TM with high RSS have ~1.54GB in ReadBlockContents,
exceeding configured 833MB capacity.
{quote}--- Cumulative View ---
Using local file /usr/local/lib/[libjemalloc.so|http://libjemalloc.so/].
Argument "MSWin32" isn't numeric in numeric eq (==) at /usr/local/bin/jeprof
line 5222.
Argument "linux" isn't numeric in numeric eq (==) at /usr/local/bin/jeprof line
5222.
Using local file /tmp/jeprof/prof.293.3194.i3194.heap.
Total: [2280777636|tel:2280777636] B
0 0.0% 0.0% [2255548014|tel:2255548014] 98.9% je_prof_backtrace
0 0.0% 0.0% [2255548014|tel:2255548014] 98.9% je_prof_tctx_create
[2255548014|tel:2255548014] 98.9% 98.9% [2255548014|tel:2255548014] 98.9%
prof_backtrace_impl
0 0.0% 98.9% [2247187497|tel:2247187497] 98.5% je_malloc_default
0 0.0% 98.9% [1694877743|tel:1694877743] 74.3% void* fallback_impl
0 0.0% 98.9% [1655889046|tel:1655889046] 72.6%
rocksdb::BlockFetcher::ReadBlockContents
0 0.0% 98.9% [1573497612|tel:1573497612] 69.0%
rocksdb::BlockBasedTable::NewDataBlockIterator
0 0.0% 98.9% [1573201192|tel:1573201192] 69.0%
rocksdb::BlockBasedTable::MaybeReadBlockAndLoadToCache@6eeb40
0 0.0% 98.9% [1573201192|tel:1573201192] 69.0%
rocksdb::BlockBasedTable::RetrieveBlock@6ef610
0 0.0% 98.9% [1510010892|tel:1510010892] 66.2%
rocksdb::BlockBasedTableIterator::InitDataBlock
0 0.0% 98.9% [1414778240|tel:1414778240] 62.0%
rocksdb::AtomicGroupReadBuffer::Clear
0 0.0% 98.9% [1097112297|tel:1097112297] 48.1% 0x00007fb693d7cc01
0 0.0% 98.9% [1097112297|tel:1097112297] 48.1%
Java_org_rocksdb_RocksIterator_seek0
0 0.0% 98.9% [1097112297|tel:1097112297] 48.1% rocksdb::DBIter::Seek
0 0.0% 98.9% [1095159981|tel:1095159981] 48.0%
rocksdb::BlockBasedTableIterator::SeekImpl
0 0.0% 98.9% [1095024770|tel:1095024770] 48.0%
rocksdb::MergingIterator::Seek
0 0.0% 98.9% [1095024770|tel:1095024770] 48.0%
rocksdb::MergingIterator::SeekImpl
0 0.0% 98.9% [651438249|tel:651438249] 28.6%
rocksdb::UncompressBlockData
0 0.0% 98.9% [651438249|tel:651438249] 28.6%
rocksdb::UncompressSerializedBlock
0 0.0% 98.9% [519018670|tel:519018670] 22.8%
rocksdb::BlockBasedTableIterator::FindBlockForward
0 0.0% 98.9% [438749002|tel:438749002] 19.2%
rocksdb::BlockBasedTableIterator::Next
0 0.0% 98.9% [438749002|tel:438749002] 19.2%
rocksdb::BlockBasedTableIterator::NextAndGetResult
0 0.0% 98.9% [438587793|tel:438587793] 19.2%
rocksdb::MergingIterator::Next
0 0.0% 98.9% [438587793|tel:438587793] 19.2%
rocksdb::MergingIterator::NextAndGetResult
0 0.0% 98.9% [436500266|tel:436500266] 19.1% 0x00007fb693d82c67
0 0.0% 98.9% [436500266|tel:436500266] 19.1% rocksdb::DBIter::Next
0 0.0% 98.9% [356557773|tel:356557773] 15.6% os::malloc@d01a60
0 0.0% 98.9% [352586303|tel:352586303] 15.5% Unsafe_AllocateMemory0
0 0.0% 98.9% [352497944|tel:352497944] 15.5% 0x00007fb692fe77e0
0 0.0% 98.9% [142878042|tel:142878042] 6.3% os::malloc@d01ca0
0 0.0% 98.9% [136630920|tel:136630920] 6.0% AllocateHeap@4327f0
0 0.0% 98.9% [102717059|tel:102717059] 4.5% __GI___clone3
0 0.0% 98.9% [102717059|tel:102717059] 4.5% start_thread
0 0.0% 98.9% [92210334|tel:92210334] 4.0%
KlassFactory::create_from_stream
...{quote}
With fix, TM with highest RSS have ~800MB in ReadBlockContents, consistent with
configured 833MB capacity.
{quote}--- Cumulative View ---
Using local file /usr/local/lib/[libjemalloc.so|http://libjemalloc.so/].
Argument "MSWin32" isn't numeric in numeric eq (==) at /usr/local/bin/jeprof
line 5222.
Argument "linux" isn't numeric in numeric eq (==) at /usr/local/bin/jeprof line
5222.
Using local file /tmp/jeprof/prof.290.8922.i8922.heap.
Total: [1416132765|tel:1416132765] B
0 0.0% 0.0% [1391278564|tel:1391278564] 98.2% je_prof_backtrace
0 0.0% 0.0% [1391278564|tel:1391278564] 98.2% je_prof_tctx_create
[1391278564|tel:1391278564] 98.2% 98.2% [1391278564|tel:1391278564] 98.2%
prof_backtrace_impl
0 0.0% 98.2% [1382556219|tel:1382556219] 97.6% je_malloc_default
0 0.0% 98.2% [856531103|tel:856531103] 60.5% void* fallback_impl
0 0.0% 98.2% [820714114|tel:820714114] 58.0%
rocksdb::BlockFetcher::ReadBlockContents
0 0.0% 98.2% [810027091|tel:810027091] 57.2%
rocksdb::BlockBasedTable::NewDataBlockIterator
0 0.0% 98.2% [809508356|tel:809508356] 57.2%
rocksdb::BlockBasedTable::RetrieveBlock@6ef610
0 0.0% 98.2% [809438634|tel:809438634] 57.2%
rocksdb::BlockBasedTable::MaybeReadBlockAndLoadToCache@6eeb40
0 0.0% 98.2% [767651473|tel:767651473] 54.2%
rocksdb::BlockBasedTableIterator::InitDataBlock
0 0.0% 98.2% [734287999|tel:734287999] 51.9% 0x00007fce45a7b781
0 0.0% 98.2% [734287999|tel:734287999] 51.9%
Java_org_rocksdb_RocksIterator_seek0
0 0.0% 98.2% [734287999|tel:734287999] 51.9%
rocksdb::BlockBasedTableIterator::SeekImpl
0 0.0% 98.2% [734287999|tel:734287999] 51.9% rocksdb::DBIter::Seek
0 0.0% 98.2% [734287999|tel:734287999] 51.9%
rocksdb::MergingIterator::Seek
0 0.0% 98.2% [734287999|tel:734287999] 51.9%
rocksdb::MergingIterator::SeekImpl
0 0.0% 98.2% [733060136|tel:733060136] 51.8%
rocksdb::AtomicGroupReadBuffer::Clear
0 0.0% 98.2% [353073142|tel:353073142] 24.9% os::malloc@d01a60
0 0.0% 98.2% [349535104|tel:349535104] 24.7% Unsafe_AllocateMemory0
0 0.0% 98.2% [349464287|tel:349464287] 24.7% 0x00007fce455e6160
0 0.0% 98.2% [136836241|tel:136836241] 9.7% os::malloc@d01ca0
0 0.0% 98.2% [135269945|tel:135269945] 9.6%
rocksdb::UncompressBlockData
0 0.0% 98.2% [135269945|tel:135269945] 9.6%
rocksdb::UncompressSerializedBlock
0 0.0% 98.2% [130278866|tel:130278866] 9.2% AllocateHeap@4327f0
0 0.0% 98.2% [124409905|tel:124409905] 8.8%
rocksdb::BlockBasedTableIterator::FindBlockForward
0 0.0% 98.2% [91613963|tel:91613963] 6.5%
KlassFactory::create_from_stream
...{quote}
See here for reproduction steps to trigger OOMKill by starting, stopping Flink
job multiple times:
[https://github.com/leekeiabstraction/flink/tree/sst-merge-getdescriptor-leak-repro/sst-leak-repro]
(note, the configuration in these reproduction have higher managed memory
fraction to more quickly trigger OOMKill).
> RocksDB ColumnFamilyOptions and LRUCache leak in Compactor
> ----------------------------------------------------------
>
> Key: FLINK-39753
> URL: https://issues.apache.org/jira/browse/FLINK-39753
> Project: Flink
> Issue Type: Bug
> Components: Runtime / State Backends
> Affects Versions: 1.20.4, 2.1.2, 2.0.2, 2.2.1
> Reporter: Keith Lee
> Priority: Critical
>
> h3. Summary
> In
> [Compactor.java#L54|https://github.com/apache/flink/blob/f3ffe4e1e1bc8e833e86509c9f22d45290beb6a6/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/state/rocksdb/sstmerge/Compactor.java#L54],
> ColumnFamilyHandle.getDescriptor() allocates a new native
> ColumnFamilyOptions on every JNI call (similar class of bug as FLINK-21986)
> This causes two leaks:
> 1. Descriptor native objects
> 2. Whole LRUCache leak during even when all tasks are stopped on task
> manager. If compactor has ran (compaction for SST files at >L0), the native
> memory used by LRUCache is never freed as as these descriptors transitively
> hold reference to LRU cache (or rather, increment reference count on shared
> pointer causing non-zero reference count and preventing older shared
> blockcache from being cleaned up)
> Here's the full chain with file paths and line numbers on how LRUCache leaks:
> {quote}Compactor.compact()
> flink-state-backends/.../sstmerge/Compactor.java:58
> │ cfName.getDescriptor().getOptions()
> ▼
> ColumnFamilyHandle.getDescriptor() → JNI call
> frocksdb/java/src/main/java/org/rocksdb/ColumnFamilyHandle.java:91
> │
> ▼
> Java_org_rocksdb_ColumnFamilyHandle_getDescriptor()
> frocksdb/java/rocksjni/columnfamilyhandle.cc:52
> │ cfh->GetDescriptor(&desc)
> │ → ColumnFamilyDescriptorJni::construct(env, &desc)
> ▼
> ColumnFamilyDescriptorJni::construct()
> frocksdb/java/rocksjni/portal.h:6602
> │ ColumnFamilyOptionsJni::construct(env, &(cfd->options))
> ▼
> ColumnFamilyOptionsJni::construct()
> frocksdb/java/rocksjni/portal.h:2884
> │ auto* cfo = new ColumnFamilyOptions(*cfoptions); ← C++ COPY
> CONSTRUCTOR
> ▼
> ColumnFamilyOptions (the copy)
> frocksdb/include/rocksdb/options.h:303
> │ std::shared_ptr<TableFactory> table_factory; ← shared_ptr
> copied, refcount++
> ▼
> BlockBasedTableFactory (same instance, not a copy)
> frocksdb/table/block_based/block_based_table_factory.h:93
> │ BlockBasedTableOptions table_options_; ← value member of
> the factory
> ▼
> BlockBasedTableOptions
> frocksdb/include/rocksdb/table.h:266
> │ std::shared_ptr<Cache> block_cache; ← the LRUCache
> ▼
> LRUCache (the single shared instance)
> flink-state-backends/.../rocksdb/RocksDBMemoryControllerUtils.java:127
> new LRUCache(cacheCapacity, -1, false, highPriorityPoolRatio);
> {quote} #
> ## Verification of OOMKill and fix
>
> The fix for this issue is to close the options object as follows:
>
> {{{}---
> a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/state/rocksdb/sstmerge/Compactor.java{}}}{{{}+++
>
> b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/state/rocksdb/sstmerge/Compactor.java{}}}
> {{@@ -19,6 +19,7 @@}}
> {{ package org.apache.flink.state.rocksdb.sstmerge;}}
> {{ }}
> {{ import org.rocksdb.ColumnFamilyHandle;}}
> {{+import org.rocksdb.ColumnFamilyOptions;}}
> {{ import org.rocksdb.CompactionJobInfo;}}
> {{ import org.rocksdb.CompactionOptions;}}
> {{ import org.rocksdb.RocksDB;}}
> {{@@ -51,7 +52,15 @@ class Compactor {}}
> {{ }}}
> {{ }}
> {{ void compact(ColumnFamilyHandle cfName, int level, List<String> files)
> throws RocksDBException {}}
> {{- int outputLevel = Math.min(level + 1,
> cfName.getDescriptor().getOptions().numLevels() - 1);}}
> {{+ // ColumnFamilyHandle.getDescriptor() allocates a new native
> ColumnFamilyOptions on every}}
> {{+ // call (it copies the column family's options across JNI) and
> does not close it. Leaking}}
> {{+ // it also bumps the reference count on the shared block cache's
> shared_ptr, preventing the}}
> {{+ // cache from ever being freed (same class of leak as
> FLINK-21986). Close it once we have}}
> {{+ // read numLevels().}}
> {{+ final int outputLevel;}}
> {{+ try (ColumnFamilyOptions cfOptions =
> cfName.getDescriptor().getOptions()) {}}
> {{+ outputLevel = Math.min(level + 1, cfOptions.numLevels() - 1);}}
> {{+ }}}
> {{ LOG.debug(}}
> {{ "Manually compacting {} files from level {} to {}: {}",}}
> {{ files.size(),}}
> Two sets of Flink SQL JOB were ran on local clusters. After 6+ hours, 8 task
> managers without fix have working set RSS > 3.4 GB. With the fix, highest RSS
> is at 2.8 GB
> Without closing of , TM with high RSS have ~1.54GB in ReadBlockContents,
> exceeding configured 833MB capacity.
> {quote}— Cumulative View —
> Using local file /usr/local/lib/[libjemalloc.so|http://libjemalloc.so/].
> Argument "MSWin32" isn't numeric in numeric eq (==) at /usr/local/bin/jeprof
> line 5222.
> Argument "linux" isn't numeric in numeric eq (==) at /usr/local/bin/jeprof
> line 5222.
> Using local file /tmp/jeprof/prof.293.3194.i3194.heap.
> Total: [2280777636|tel:2280777636] B
> 0 0.0% 0.0% [2255548014|tel:2255548014] 98.9% je_prof_backtrace
> 0 0.0% 0.0% [2255548014|tel:2255548014] 98.9% je_prof_tctx_create
> [2255548014|tel:2255548014] 98.9% 98.9% [2255548014|tel:2255548014] 98.9%
> prof_backtrace_impl
> 0 0.0% 98.9% [2247187497|tel:2247187497] 98.5% je_malloc_default
> 0 0.0% 98.9% [1694877743|tel:1694877743] 74.3% void* fallback_impl
> 0 0.0% 98.9% [1655889046|tel:1655889046] 72.6%
> rocksdb::BlockFetcher::ReadBlockContents
> 0 0.0% 98.9% [1573497612|tel:1573497612] 69.0%
> rocksdb::BlockBasedTable::NewDataBlockIterator
> 0 0.0% 98.9% [1573201192|tel:1573201192] 69.0%
> rocksdb::BlockBasedTable::MaybeReadBlockAndLoadToCache@6eeb40
> 0 0.0% 98.9% [1573201192|tel:1573201192] 69.0%
> rocksdb::BlockBasedTable::RetrieveBlock@6ef610
> 0 0.0% 98.9% [1510010892|tel:1510010892] 66.2%
> rocksdb::BlockBasedTableIterator::InitDataBlock
> 0 0.0% 98.9% [1414778240|tel:1414778240] 62.0%
> rocksdb::AtomicGroupReadBuffer::Clear
> 0 0.0% 98.9% [1097112297|tel:1097112297] 48.1% 0x00007fb693d7cc01
> 0 0.0% 98.9% [1097112297|tel:1097112297] 48.1%
> Java_org_rocksdb_RocksIterator_seek0
> 0 0.0% 98.9% [1097112297|tel:1097112297] 48.1%
> rocksdb::DBIter::Seek
> 0 0.0% 98.9% [1095159981|tel:1095159981] 48.0%
> rocksdb::BlockBasedTableIterator::SeekImpl
> 0 0.0% 98.9% [1095024770|tel:1095024770] 48.0%
> rocksdb::MergingIterator::Seek
> 0 0.0% 98.9% [1095024770|tel:1095024770] 48.0%
> rocksdb::MergingIterator::SeekImpl
> 0 0.0% 98.9% [651438249|tel:651438249] 28.6%
> rocksdb::UncompressBlockData
> 0 0.0% 98.9% [651438249|tel:651438249] 28.6%
> rocksdb::UncompressSerializedBlock
> 0 0.0% 98.9% [519018670|tel:519018670] 22.8%
> rocksdb::BlockBasedTableIterator::FindBlockForward
> 0 0.0% 98.9% [438749002|tel:438749002] 19.2%
> rocksdb::BlockBasedTableIterator::Next
> 0 0.0% 98.9% [438749002|tel:438749002] 19.2%
> rocksdb::BlockBasedTableIterator::NextAndGetResult
> 0 0.0% 98.9% [438587793|tel:438587793] 19.2%
> rocksdb::MergingIterator::Next
> 0 0.0% 98.9% [438587793|tel:438587793] 19.2%
> rocksdb::MergingIterator::NextAndGetResult
> 0 0.0% 98.9% [436500266|tel:436500266] 19.1% 0x00007fb693d82c67
> 0 0.0% 98.9% [436500266|tel:436500266] 19.1% rocksdb::DBIter::Next
> 0 0.0% 98.9% [356557773|tel:356557773] 15.6% os::malloc@d01a60
> 0 0.0% 98.9% [352586303|tel:352586303] 15.5% Unsafe_AllocateMemory0
> 0 0.0% 98.9% [352497944|tel:352497944] 15.5% 0x00007fb692fe77e0
> 0 0.0% 98.9% [142878042|tel:142878042] 6.3% os::malloc@d01ca0
> 0 0.0% 98.9% [136630920|tel:136630920] 6.0% AllocateHeap@4327f0
> 0 0.0% 98.9% [102717059|tel:102717059] 4.5% _{_}GI{_}__clone3
> 0 0.0% 98.9% [102717059|tel:102717059] 4.5% start_thread
> 0 0.0% 98.9% [92210334|tel:92210334] 4.0%
> KlassFactory::create_from_stream
> ...
> {quote}
> With fix, TM with highest RSS have ~800MB in ReadBlockContents, consistent
> with configured 833MB capacity.
> {quote}— Cumulative View —
> Using local file /usr/local/lib/[libjemalloc.so|http://libjemalloc.so/].
> Argument "MSWin32" isn't numeric in numeric eq (==) at /usr/local/bin/jeprof
> line 5222.
> Argument "linux" isn't numeric in numeric eq (==) at /usr/local/bin/jeprof
> line 5222.
> Using local file /tmp/jeprof/prof.290.8922.i8922.heap.
> Total: [1416132765|tel:1416132765] B
> 0 0.0% 0.0% [1391278564|tel:1391278564] 98.2% je_prof_backtrace
> 0 0.0% 0.0% [1391278564|tel:1391278564] 98.2% je_prof_tctx_create
> [1391278564|tel:1391278564] 98.2% 98.2% [1391278564|tel:1391278564] 98.2%
> prof_backtrace_impl
> 0 0.0% 98.2% [1382556219|tel:1382556219] 97.6% je_malloc_default
> 0 0.0% 98.2% [856531103|tel:856531103] 60.5% void* fallback_impl
> 0 0.0% 98.2% [820714114|tel:820714114] 58.0%
> rocksdb::BlockFetcher::ReadBlockContents
> 0 0.0% 98.2% [810027091|tel:810027091] 57.2%
> rocksdb::BlockBasedTable::NewDataBlockIterator
> 0 0.0% 98.2% [809508356|tel:809508356] 57.2%
> rocksdb::BlockBasedTable::RetrieveBlock@6ef610
> 0 0.0% 98.2% [809438634|tel:809438634] 57.2%
> rocksdb::BlockBasedTable::MaybeReadBlockAndLoadToCache@6eeb40
> 0 0.0% 98.2% [767651473|tel:767651473] 54.2%
> rocksdb::BlockBasedTableIterator::InitDataBlock
> 0 0.0% 98.2% [734287999|tel:734287999] 51.9% 0x00007fce45a7b781
> 0 0.0% 98.2% [734287999|tel:734287999] 51.9%
> Java_org_rocksdb_RocksIterator_seek0
> 0 0.0% 98.2% [734287999|tel:734287999] 51.9%
> rocksdb::BlockBasedTableIterator::SeekImpl
> 0 0.0% 98.2% [734287999|tel:734287999] 51.9% rocksdb::DBIter::Seek
> 0 0.0% 98.2% [734287999|tel:734287999] 51.9%
> rocksdb::MergingIterator::Seek
> 0 0.0% 98.2% [734287999|tel:734287999] 51.9%
> rocksdb::MergingIterator::SeekImpl
> 0 0.0% 98.2% [733060136|tel:733060136] 51.8%
> rocksdb::AtomicGroupReadBuffer::Clear
> 0 0.0% 98.2% [353073142|tel:353073142] 24.9% os::malloc@d01a60
> 0 0.0% 98.2% [349535104|tel:349535104] 24.7% Unsafe_AllocateMemory0
> 0 0.0% 98.2% [349464287|tel:349464287] 24.7% 0x00007fce455e6160
> 0 0.0% 98.2% [136836241|tel:136836241] 9.7% os::malloc@d01ca0
> 0 0.0% 98.2% [135269945|tel:135269945] 9.6%
> rocksdb::UncompressBlockData
> 0 0.0% 98.2% [135269945|tel:135269945] 9.6%
> rocksdb::UncompressSerializedBlock
> 0 0.0% 98.2% [130278866|tel:130278866] 9.2% AllocateHeap@4327f0
> 0 0.0% 98.2% [124409905|tel:124409905] 8.8%
> rocksdb::BlockBasedTableIterator::FindBlockForward
> 0 0.0% 98.2% [91613963|tel:91613963] 6.5%
> KlassFactory::create_from_stream
> ...
> {quote}
> See here for reproduction steps to trigger OOMKill by starting, stopping
> Flink job multiple times:
> [https://github.com/leekeiabstraction/flink/tree/sst-merge-getdescriptor-leak-repro/sst-leak-repro]
> (note, the configuration in these reproduction have higher managed memory
> fraction to more quickly trigger OOMKill).
--
This message was sent by Atlassian Jira
(v8.20.10#820010)