[ 
https://issues.apache.org/jira/browse/FLINK-39753?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Keith Lee updated FLINK-39753:
------------------------------
    Description: 
h3. Summary

In 
[Compactor.java#L54|https://github.com/apache/flink/blob/f3ffe4e1e1bc8e833e86509c9f22d45290beb6a6/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/state/rocksdb/sstmerge/Compactor.java#L54],
 ColumnFamilyHandle.getDescriptor() allocates a new native ColumnFamilyOptions 
on every JNI call (similar class of bug as FLINK-21986)

This causes two leaks:
1. Descriptor native objects
2. Whole LRUCache leak during even when all tasks are stopped on task manager. 
If compactor has ran (compaction for SST files at >L0), the native memory used 
by LRUCache is never freed as as these descriptors transitively hold reference 
to LRU cache (or rather, increment reference count on shared pointer causing 
non-zero reference count and preventing older shared blockcache from being 
cleaned up)

Here's the full chain with file paths and line numbers on how LRUCache leaks:
{quote}Compactor.compact()
  flink-state-backends/.../sstmerge/Compactor.java:58
  │  cfName.getDescriptor().getOptions()
 ▼
ColumnFamilyHandle.getDescriptor()  →  JNI call
  frocksdb/java/src/main/java/org/rocksdb/ColumnFamilyHandle.java:91
  │
 ▼
Java_org_rocksdb_ColumnFamilyHandle_getDescriptor()
  frocksdb/java/rocksjni/columnfamilyhandle.cc:52
  │  cfh->GetDescriptor(&desc)
  │  → ColumnFamilyDescriptorJni::construct(env, &desc)
 ▼
ColumnFamilyDescriptorJni::construct()
  frocksdb/java/rocksjni/portal.h:6602
  │  ColumnFamilyOptionsJni::construct(env, &(cfd->options))
 ▼
ColumnFamilyOptionsJni::construct()
  frocksdb/java/rocksjni/portal.h:2884
  │  auto* cfo = new ColumnFamilyOptions(*cfoptions);    ← C++ COPY CONSTRUCTOR
 ▼
ColumnFamilyOptions (the copy)
  frocksdb/include/rocksdb/options.h:303
  │  std::shared_ptr<TableFactory> table_factory;         ← shared_ptr copied, 
refcount++
 ▼
BlockBasedTableFactory (same instance, not a copy)
  frocksdb/table/block_based/block_based_table_factory.h:93
  │  BlockBasedTableOptions table_options_;                ← value member of 
the factory
 ▼
BlockBasedTableOptions
  frocksdb/include/rocksdb/table.h:266
  │  std::shared_ptr<Cache> block_cache;                  ← the LRUCache
 ▼
LRUCache (the single shared instance)
  flink-state-backends/.../rocksdb/RocksDBMemoryControllerUtils.java:127
     new LRUCache(cacheCapacity, -1, false, highPriorityPoolRatio);
{quote}
h3. Verification of OOMKill and fix


The fix for this issue is to close the options object as follows:
 
{{{}--- 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/state/rocksdb/sstmerge/Compactor.java{}}}{{{}+++
 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/state/rocksdb/sstmerge/Compactor.java{}}}
{{@@ -19,6 +19,7 @@}}
{{ package org.apache.flink.state.rocksdb.sstmerge;}}
{{ }}
{{ import org.rocksdb.ColumnFamilyHandle;}}
{{+import org.rocksdb.ColumnFamilyOptions;}}
{{ import org.rocksdb.CompactionJobInfo;}}
{{ import org.rocksdb.CompactionOptions;}}
{{ import org.rocksdb.RocksDB;}}
{{@@ -51,7 +52,15 @@ class Compactor {}}
{{     }}}
{{ }}
{{     void compact(ColumnFamilyHandle cfName, int level, List<String> files) 
throws RocksDBException {}}
{{-        int outputLevel = Math.min(level + 1, 
cfName.getDescriptor().getOptions().numLevels() - 1);}}
{{+        // ColumnFamilyHandle.getDescriptor() allocates a new native 
ColumnFamilyOptions on every}}
{{+        // call (it copies the column family's options across JNI) and does 
not close it. Leaking}}
{{+        // it also bumps the reference count on the shared block cache's 
shared_ptr, preventing the}}
{{+        // cache from ever being freed (same class of leak as FLINK-21986). 
Close it once we have}}
{{+        // read numLevels().}}
{{+        final int outputLevel;}}
{{+        try (ColumnFamilyOptions cfOptions = 
cfName.getDescriptor().getOptions()) {}}
{{+            outputLevel = Math.min(level + 1, cfOptions.numLevels() - 1);}}
{{+        }}}
{{         LOG.debug(}}
{{                 "Manually compacting {} files from level {} to {}: {}",}}
{{                 files.size(),}}

Two sets of Flink SQL JOB were ran on local clusters. After 6+ hours, 8 task 
managers without fix have working set RSS > 3.4 GB. With the fix, highest RSS 
is at 2.8 GB

Without closing of , TM with high RSS have ~1.54GB in ReadBlockContents, 
exceeding configured 833MB capacity.
{quote}— Cumulative View —
Using local file /usr/local/lib/[libjemalloc.so|http://libjemalloc.so/].
Argument "MSWin32" isn't numeric in numeric eq (==) at /usr/local/bin/jeprof 
line 5222.
Argument "linux" isn't numeric in numeric eq (==) at /usr/local/bin/jeprof line 
5222.
Using local file /tmp/jeprof/prof.293.3194.i3194.heap.
Total: [2280777636|tel:2280777636] B
       0   0.0%   0.0% [2255548014|tel:2255548014]  98.9% je_prof_backtrace
       0   0.0%   0.0% [2255548014|tel:2255548014]  98.9% je_prof_tctx_create
[2255548014|tel:2255548014]  98.9%  98.9% [2255548014|tel:2255548014]  98.9% 
prof_backtrace_impl
       0   0.0%  98.9% [2247187497|tel:2247187497]  98.5% je_malloc_default
       0   0.0%  98.9% [1694877743|tel:1694877743]  74.3% void* fallback_impl
       0   0.0%  98.9% [1655889046|tel:1655889046]  72.6% 
rocksdb::BlockFetcher::ReadBlockContents
       0   0.0%  98.9% [1573497612|tel:1573497612]  69.0% 
rocksdb::BlockBasedTable::NewDataBlockIterator
       0   0.0%  98.9% [1573201192|tel:1573201192]  69.0% 
rocksdb::BlockBasedTable::MaybeReadBlockAndLoadToCache@6eeb40
       0   0.0%  98.9% [1573201192|tel:1573201192]  69.0% 
rocksdb::BlockBasedTable::RetrieveBlock@6ef610
       0   0.0%  98.9% [1510010892|tel:1510010892]  66.2% 
rocksdb::BlockBasedTableIterator::InitDataBlock
       0   0.0%  98.9% [1414778240|tel:1414778240]  62.0% 
rocksdb::AtomicGroupReadBuffer::Clear
       0   0.0%  98.9% [1097112297|tel:1097112297]  48.1% 0x00007fb693d7cc01
       0   0.0%  98.9% [1097112297|tel:1097112297]  48.1% 
Java_org_rocksdb_RocksIterator_seek0
       0   0.0%  98.9% [1097112297|tel:1097112297]  48.1% rocksdb::DBIter::Seek
       0   0.0%  98.9% [1095159981|tel:1095159981]  48.0% 
rocksdb::BlockBasedTableIterator::SeekImpl
       0   0.0%  98.9% [1095024770|tel:1095024770]  48.0% 
rocksdb::MergingIterator::Seek
       0   0.0%  98.9% [1095024770|tel:1095024770]  48.0% 
rocksdb::MergingIterator::SeekImpl
       0   0.0%  98.9% [651438249|tel:651438249]  28.6% 
rocksdb::UncompressBlockData
       0   0.0%  98.9% [651438249|tel:651438249]  28.6% 
rocksdb::UncompressSerializedBlock
       0   0.0%  98.9% [519018670|tel:519018670]  22.8% 
rocksdb::BlockBasedTableIterator::FindBlockForward
       0   0.0%  98.9% [438749002|tel:438749002]  19.2% 
rocksdb::BlockBasedTableIterator::Next
       0   0.0%  98.9% [438749002|tel:438749002]  19.2% 
rocksdb::BlockBasedTableIterator::NextAndGetResult
       0   0.0%  98.9% [438587793|tel:438587793]  19.2% 
rocksdb::MergingIterator::Next
       0   0.0%  98.9% [438587793|tel:438587793]  19.2% 
rocksdb::MergingIterator::NextAndGetResult
       0   0.0%  98.9% [436500266|tel:436500266]  19.1% 0x00007fb693d82c67
       0   0.0%  98.9% [436500266|tel:436500266]  19.1% rocksdb::DBIter::Next
       0   0.0%  98.9% [356557773|tel:356557773]  15.6% os::malloc@d01a60
       0   0.0%  98.9% [352586303|tel:352586303]  15.5% Unsafe_AllocateMemory0
       0   0.0%  98.9% [352497944|tel:352497944]  15.5% 0x00007fb692fe77e0
       0   0.0%  98.9% [142878042|tel:142878042]   6.3% os::malloc@d01ca0
       0   0.0%  98.9% [136630920|tel:136630920]   6.0% AllocateHeap@4327f0
       0   0.0%  98.9% [102717059|tel:102717059]   4.5% 
{_}{{_}}GI{{_}}{_}_clone3
       0   0.0%  98.9% [102717059|tel:102717059]   4.5% start_thread
       0   0.0%  98.9% [92210334|tel:92210334]   4.0% 
KlassFactory::create_from_stream
...
{quote}
With fix, TM with highest RSS have ~800MB in ReadBlockContents, consistent with 
configured 833MB capacity.
{quote}— Cumulative View —
Using local file /usr/local/lib/[libjemalloc.so|http://libjemalloc.so/].
Argument "MSWin32" isn't numeric in numeric eq (==) at /usr/local/bin/jeprof 
line 5222.
Argument "linux" isn't numeric in numeric eq (==) at /usr/local/bin/jeprof line 
5222.
Using local file /tmp/jeprof/prof.290.8922.i8922.heap.
Total: [1416132765|tel:1416132765] B
       0   0.0%   0.0% [1391278564|tel:1391278564]  98.2% je_prof_backtrace
       0   0.0%   0.0% [1391278564|tel:1391278564]  98.2% je_prof_tctx_create
[1391278564|tel:1391278564]  98.2%  98.2% [1391278564|tel:1391278564]  98.2% 
prof_backtrace_impl
       0   0.0%  98.2% [1382556219|tel:1382556219]  97.6% je_malloc_default
       0   0.0%  98.2% [856531103|tel:856531103]  60.5% void* fallback_impl
       0   0.0%  98.2% [820714114|tel:820714114]  58.0% 
rocksdb::BlockFetcher::ReadBlockContents
       0   0.0%  98.2% [810027091|tel:810027091]  57.2% 
rocksdb::BlockBasedTable::NewDataBlockIterator
       0   0.0%  98.2% [809508356|tel:809508356]  57.2% 
rocksdb::BlockBasedTable::RetrieveBlock@6ef610
       0   0.0%  98.2% [809438634|tel:809438634]  57.2% 
rocksdb::BlockBasedTable::MaybeReadBlockAndLoadToCache@6eeb40
       0   0.0%  98.2% [767651473|tel:767651473]  54.2% 
rocksdb::BlockBasedTableIterator::InitDataBlock
       0   0.0%  98.2% [734287999|tel:734287999]  51.9% 0x00007fce45a7b781
       0   0.0%  98.2% [734287999|tel:734287999]  51.9% 
Java_org_rocksdb_RocksIterator_seek0
       0   0.0%  98.2% [734287999|tel:734287999]  51.9% 
rocksdb::BlockBasedTableIterator::SeekImpl
       0   0.0%  98.2% [734287999|tel:734287999]  51.9% rocksdb::DBIter::Seek
       0   0.0%  98.2% [734287999|tel:734287999]  51.9% 
rocksdb::MergingIterator::Seek
       0   0.0%  98.2% [734287999|tel:734287999]  51.9% 
rocksdb::MergingIterator::SeekImpl
       0   0.0%  98.2% [733060136|tel:733060136]  51.8% 
rocksdb::AtomicGroupReadBuffer::Clear
       0   0.0%  98.2% [353073142|tel:353073142]  24.9% os::malloc@d01a60
       0   0.0%  98.2% [349535104|tel:349535104]  24.7% Unsafe_AllocateMemory0
       0   0.0%  98.2% [349464287|tel:349464287]  24.7% 0x00007fce455e6160
       0   0.0%  98.2% [136836241|tel:136836241]   9.7% os::malloc@d01ca0
       0   0.0%  98.2% [135269945|tel:135269945]   9.6% 
rocksdb::UncompressBlockData
       0   0.0%  98.2% [135269945|tel:135269945]   9.6% 
rocksdb::UncompressSerializedBlock
       0   0.0%  98.2% [130278866|tel:130278866]   9.2% AllocateHeap@4327f0
       0   0.0%  98.2% [124409905|tel:124409905]   8.8% 
rocksdb::BlockBasedTableIterator::FindBlockForward
       0   0.0%  98.2% [91613963|tel:91613963]   6.5% 
KlassFactory::create_from_stream
 ...
{quote}
See here for reproduction steps to trigger OOMKill by starting, stopping Flink 
job multiple times: 
[https://github.com/leekeiabstraction/flink/tree/sst-merge-getdescriptor-leak-repro/sst-leak-repro]
 (note, the configuration in these reproduction have higher managed memory 
fraction to more quickly trigger OOMKill).

  was:
h3. Summary

In 
[Compactor.java#L54|https://github.com/apache/flink/blob/f3ffe4e1e1bc8e833e86509c9f22d45290beb6a6/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/state/rocksdb/sstmerge/Compactor.java#L54],
 ColumnFamilyHandle.getDescriptor() allocates a new native ColumnFamilyOptions 
on every JNI call (similar class of bug as FLINK-21986)

This causes two leaks:
1. Descriptor native objects
2. Whole LRUCache leak during even when all tasks are stopped on task manager. 
If compactor has ran (compaction for SST files at >L0), the native memory used 
by LRUCache is never freed as as these descriptors transitively hold reference 
to LRU cache (or rather, increment reference count on shared pointer causing 
non-zero reference count and preventing older shared blockcache from being 
cleaned up)

Here's the full chain with file paths and line numbers on how LRUCache leaks:
{quote}Compactor.compact()
  flink-state-backends/.../sstmerge/Compactor.java:58
  │  cfName.getDescriptor().getOptions()
 ▼
ColumnFamilyHandle.getDescriptor()  →  JNI call
  frocksdb/java/src/main/java/org/rocksdb/ColumnFamilyHandle.java:91
  │
 ▼
Java_org_rocksdb_ColumnFamilyHandle_getDescriptor()
  frocksdb/java/rocksjni/columnfamilyhandle.cc:52
  │  cfh->GetDescriptor(&desc)
  │  → ColumnFamilyDescriptorJni::construct(env, &desc)
 ▼
ColumnFamilyDescriptorJni::construct()
  frocksdb/java/rocksjni/portal.h:6602
  │  ColumnFamilyOptionsJni::construct(env, &(cfd->options))
 ▼
ColumnFamilyOptionsJni::construct()
  frocksdb/java/rocksjni/portal.h:2884
  │  auto* cfo = new ColumnFamilyOptions(*cfoptions);    ← C++ COPY CONSTRUCTOR
 ▼
ColumnFamilyOptions (the copy)
  frocksdb/include/rocksdb/options.h:303
  │  std::shared_ptr<TableFactory> table_factory;         ← shared_ptr copied, 
refcount++
 ▼
BlockBasedTableFactory (same instance, not a copy)
  frocksdb/table/block_based/block_based_table_factory.h:93
  │  BlockBasedTableOptions table_options_;                ← value member of 
the factory
 ▼
BlockBasedTableOptions
  frocksdb/include/rocksdb/table.h:266
  │  std::shared_ptr<Cache> block_cache;                  ← the LRUCache
 ▼
LRUCache (the single shared instance)
  flink-state-backends/.../rocksdb/RocksDBMemoryControllerUtils.java:127
     new LRUCache(cacheCapacity, -1, false, highPriorityPoolRatio);
{quote}
 
 # 
 ## Verification of OOMKill and fix
 
The fix for this issue is to close the options object as follows:
 
{{{}--- 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/state/rocksdb/sstmerge/Compactor.java{}}}{{{}+++
 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/state/rocksdb/sstmerge/Compactor.java{}}}
{{@@ -19,6 +19,7 @@}}
{{ package org.apache.flink.state.rocksdb.sstmerge;}}
{{ }}
{{ import org.rocksdb.ColumnFamilyHandle;}}
{{+import org.rocksdb.ColumnFamilyOptions;}}
{{ import org.rocksdb.CompactionJobInfo;}}
{{ import org.rocksdb.CompactionOptions;}}
{{ import org.rocksdb.RocksDB;}}
{{@@ -51,7 +52,15 @@ class Compactor {}}
{{     }}}
{{ }}
{{     void compact(ColumnFamilyHandle cfName, int level, List<String> files) 
throws RocksDBException {}}
{{-        int outputLevel = Math.min(level + 1, 
cfName.getDescriptor().getOptions().numLevels() - 1);}}
{{+        // ColumnFamilyHandle.getDescriptor() allocates a new native 
ColumnFamilyOptions on every}}
{{+        // call (it copies the column family's options across JNI) and does 
not close it. Leaking}}
{{+        // it also bumps the reference count on the shared block cache's 
shared_ptr, preventing the}}
{{+        // cache from ever being freed (same class of leak as FLINK-21986). 
Close it once we have}}
{{+        // read numLevels().}}
{{+        final int outputLevel;}}
{{+        try (ColumnFamilyOptions cfOptions = 
cfName.getDescriptor().getOptions()) {}}
{{+            outputLevel = Math.min(level + 1, cfOptions.numLevels() - 1);}}
{{+        }}}
{{         LOG.debug(}}
{{                 "Manually compacting {} files from level {} to {}: {}",}}
{{                 files.size(),}}

Two sets of Flink SQL JOB were ran on local clusters. After 6+ hours, 8 task 
managers without fix have working set RSS > 3.4 GB. With the fix, highest RSS 
is at 2.8 GB

Without closing of , TM with high RSS have ~1.54GB in ReadBlockContents, 
exceeding configured 833MB capacity.
{quote}— Cumulative View —
Using local file /usr/local/lib/[libjemalloc.so|http://libjemalloc.so/].
Argument "MSWin32" isn't numeric in numeric eq (==) at /usr/local/bin/jeprof 
line 5222.
Argument "linux" isn't numeric in numeric eq (==) at /usr/local/bin/jeprof line 
5222.
Using local file /tmp/jeprof/prof.293.3194.i3194.heap.
Total: [2280777636|tel:2280777636] B
       0   0.0%   0.0% [2255548014|tel:2255548014]  98.9% je_prof_backtrace
       0   0.0%   0.0% [2255548014|tel:2255548014]  98.9% je_prof_tctx_create
[2255548014|tel:2255548014]  98.9%  98.9% [2255548014|tel:2255548014]  98.9% 
prof_backtrace_impl
       0   0.0%  98.9% [2247187497|tel:2247187497]  98.5% je_malloc_default
       0   0.0%  98.9% [1694877743|tel:1694877743]  74.3% void* fallback_impl
       0   0.0%  98.9% [1655889046|tel:1655889046]  72.6% 
rocksdb::BlockFetcher::ReadBlockContents
       0   0.0%  98.9% [1573497612|tel:1573497612]  69.0% 
rocksdb::BlockBasedTable::NewDataBlockIterator
       0   0.0%  98.9% [1573201192|tel:1573201192]  69.0% 
rocksdb::BlockBasedTable::MaybeReadBlockAndLoadToCache@6eeb40
       0   0.0%  98.9% [1573201192|tel:1573201192]  69.0% 
rocksdb::BlockBasedTable::RetrieveBlock@6ef610
       0   0.0%  98.9% [1510010892|tel:1510010892]  66.2% 
rocksdb::BlockBasedTableIterator::InitDataBlock
       0   0.0%  98.9% [1414778240|tel:1414778240]  62.0% 
rocksdb::AtomicGroupReadBuffer::Clear
       0   0.0%  98.9% [1097112297|tel:1097112297]  48.1% 0x00007fb693d7cc01
       0   0.0%  98.9% [1097112297|tel:1097112297]  48.1% 
Java_org_rocksdb_RocksIterator_seek0
       0   0.0%  98.9% [1097112297|tel:1097112297]  48.1% rocksdb::DBIter::Seek
       0   0.0%  98.9% [1095159981|tel:1095159981]  48.0% 
rocksdb::BlockBasedTableIterator::SeekImpl
       0   0.0%  98.9% [1095024770|tel:1095024770]  48.0% 
rocksdb::MergingIterator::Seek
       0   0.0%  98.9% [1095024770|tel:1095024770]  48.0% 
rocksdb::MergingIterator::SeekImpl
       0   0.0%  98.9% [651438249|tel:651438249]  28.6% 
rocksdb::UncompressBlockData
       0   0.0%  98.9% [651438249|tel:651438249]  28.6% 
rocksdb::UncompressSerializedBlock
       0   0.0%  98.9% [519018670|tel:519018670]  22.8% 
rocksdb::BlockBasedTableIterator::FindBlockForward
       0   0.0%  98.9% [438749002|tel:438749002]  19.2% 
rocksdb::BlockBasedTableIterator::Next
       0   0.0%  98.9% [438749002|tel:438749002]  19.2% 
rocksdb::BlockBasedTableIterator::NextAndGetResult
       0   0.0%  98.9% [438587793|tel:438587793]  19.2% 
rocksdb::MergingIterator::Next
       0   0.0%  98.9% [438587793|tel:438587793]  19.2% 
rocksdb::MergingIterator::NextAndGetResult
       0   0.0%  98.9% [436500266|tel:436500266]  19.1% 0x00007fb693d82c67
       0   0.0%  98.9% [436500266|tel:436500266]  19.1% rocksdb::DBIter::Next
       0   0.0%  98.9% [356557773|tel:356557773]  15.6% os::malloc@d01a60
       0   0.0%  98.9% [352586303|tel:352586303]  15.5% Unsafe_AllocateMemory0
       0   0.0%  98.9% [352497944|tel:352497944]  15.5% 0x00007fb692fe77e0
       0   0.0%  98.9% [142878042|tel:142878042]   6.3% os::malloc@d01ca0
       0   0.0%  98.9% [136630920|tel:136630920]   6.0% AllocateHeap@4327f0
       0   0.0%  98.9% [102717059|tel:102717059]   4.5% 
{_}{{_}}GI{{_}}{_}_clone3
       0   0.0%  98.9% [102717059|tel:102717059]   4.5% start_thread
       0   0.0%  98.9% [92210334|tel:92210334]   4.0% 
KlassFactory::create_from_stream
...
{quote}
With fix, TM with highest RSS have ~800MB in ReadBlockContents, consistent with 
configured 833MB capacity.
{quote}— Cumulative View —
Using local file /usr/local/lib/[libjemalloc.so|http://libjemalloc.so/].
Argument "MSWin32" isn't numeric in numeric eq (==) at /usr/local/bin/jeprof 
line 5222.
Argument "linux" isn't numeric in numeric eq (==) at /usr/local/bin/jeprof line 
5222.
Using local file /tmp/jeprof/prof.290.8922.i8922.heap.
Total: [1416132765|tel:1416132765] B
       0   0.0%   0.0% [1391278564|tel:1391278564]  98.2% je_prof_backtrace
       0   0.0%   0.0% [1391278564|tel:1391278564]  98.2% je_prof_tctx_create
[1391278564|tel:1391278564]  98.2%  98.2% [1391278564|tel:1391278564]  98.2% 
prof_backtrace_impl
       0   0.0%  98.2% [1382556219|tel:1382556219]  97.6% je_malloc_default
       0   0.0%  98.2% [856531103|tel:856531103]  60.5% void* fallback_impl
       0   0.0%  98.2% [820714114|tel:820714114]  58.0% 
rocksdb::BlockFetcher::ReadBlockContents
       0   0.0%  98.2% [810027091|tel:810027091]  57.2% 
rocksdb::BlockBasedTable::NewDataBlockIterator
       0   0.0%  98.2% [809508356|tel:809508356]  57.2% 
rocksdb::BlockBasedTable::RetrieveBlock@6ef610
       0   0.0%  98.2% [809438634|tel:809438634]  57.2% 
rocksdb::BlockBasedTable::MaybeReadBlockAndLoadToCache@6eeb40
       0   0.0%  98.2% [767651473|tel:767651473]  54.2% 
rocksdb::BlockBasedTableIterator::InitDataBlock
       0   0.0%  98.2% [734287999|tel:734287999]  51.9% 0x00007fce45a7b781
       0   0.0%  98.2% [734287999|tel:734287999]  51.9% 
Java_org_rocksdb_RocksIterator_seek0
       0   0.0%  98.2% [734287999|tel:734287999]  51.9% 
rocksdb::BlockBasedTableIterator::SeekImpl
       0   0.0%  98.2% [734287999|tel:734287999]  51.9% rocksdb::DBIter::Seek
       0   0.0%  98.2% [734287999|tel:734287999]  51.9% 
rocksdb::MergingIterator::Seek
       0   0.0%  98.2% [734287999|tel:734287999]  51.9% 
rocksdb::MergingIterator::SeekImpl
       0   0.0%  98.2% [733060136|tel:733060136]  51.8% 
rocksdb::AtomicGroupReadBuffer::Clear
       0   0.0%  98.2% [353073142|tel:353073142]  24.9% os::malloc@d01a60
       0   0.0%  98.2% [349535104|tel:349535104]  24.7% Unsafe_AllocateMemory0
       0   0.0%  98.2% [349464287|tel:349464287]  24.7% 0x00007fce455e6160
       0   0.0%  98.2% [136836241|tel:136836241]   9.7% os::malloc@d01ca0
       0   0.0%  98.2% [135269945|tel:135269945]   9.6% 
rocksdb::UncompressBlockData
       0   0.0%  98.2% [135269945|tel:135269945]   9.6% 
rocksdb::UncompressSerializedBlock
       0   0.0%  98.2% [130278866|tel:130278866]   9.2% AllocateHeap@4327f0
       0   0.0%  98.2% [124409905|tel:124409905]   8.8% 
rocksdb::BlockBasedTableIterator::FindBlockForward
       0   0.0%  98.2% [91613963|tel:91613963]   6.5% 
KlassFactory::create_from_stream
 ...
{quote}
See here for reproduction steps to trigger OOMKill by starting, stopping Flink 
job multiple times: 
[https://github.com/leekeiabstraction/flink/tree/sst-merge-getdescriptor-leak-repro/sst-leak-repro]
 (note, the configuration in these reproduction have higher managed memory 
fraction to more quickly trigger OOMKill).


> RocksDB ColumnFamilyOptions and LRUCache leak in Compactor
> ----------------------------------------------------------
>
>                 Key: FLINK-39753
>                 URL: https://issues.apache.org/jira/browse/FLINK-39753
>             Project: Flink
>          Issue Type: Bug
>          Components: Runtime / State Backends
>    Affects Versions: 1.20.4, 2.1.2, 2.0.2, 2.2.1
>            Reporter: Keith Lee
>            Priority: Critical
>
> h3. Summary
> In 
> [Compactor.java#L54|https://github.com/apache/flink/blob/f3ffe4e1e1bc8e833e86509c9f22d45290beb6a6/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/state/rocksdb/sstmerge/Compactor.java#L54],
>  ColumnFamilyHandle.getDescriptor() allocates a new native 
> ColumnFamilyOptions on every JNI call (similar class of bug as FLINK-21986)
> This causes two leaks:
> 1. Descriptor native objects
> 2. Whole LRUCache leak during even when all tasks are stopped on task 
> manager. If compactor has ran (compaction for SST files at >L0), the native 
> memory used by LRUCache is never freed as as these descriptors transitively 
> hold reference to LRU cache (or rather, increment reference count on shared 
> pointer causing non-zero reference count and preventing older shared 
> blockcache from being cleaned up)
> Here's the full chain with file paths and line numbers on how LRUCache leaks:
> {quote}Compactor.compact()
>   flink-state-backends/.../sstmerge/Compactor.java:58
>   │  cfName.getDescriptor().getOptions()
>  ▼
> ColumnFamilyHandle.getDescriptor()  →  JNI call
>   frocksdb/java/src/main/java/org/rocksdb/ColumnFamilyHandle.java:91
>   │
>  ▼
> Java_org_rocksdb_ColumnFamilyHandle_getDescriptor()
>   frocksdb/java/rocksjni/columnfamilyhandle.cc:52
>   │  cfh->GetDescriptor(&desc)
>   │  → ColumnFamilyDescriptorJni::construct(env, &desc)
>  ▼
> ColumnFamilyDescriptorJni::construct()
>   frocksdb/java/rocksjni/portal.h:6602
>   │  ColumnFamilyOptionsJni::construct(env, &(cfd->options))
>  ▼
> ColumnFamilyOptionsJni::construct()
>   frocksdb/java/rocksjni/portal.h:2884
>   │  auto* cfo = new ColumnFamilyOptions(*cfoptions);    ← C++ COPY 
> CONSTRUCTOR
>  ▼
> ColumnFamilyOptions (the copy)
>   frocksdb/include/rocksdb/options.h:303
>   │  std::shared_ptr<TableFactory> table_factory;         ← shared_ptr 
> copied, refcount++
>  ▼
> BlockBasedTableFactory (same instance, not a copy)
>   frocksdb/table/block_based/block_based_table_factory.h:93
>   │  BlockBasedTableOptions table_options_;                ← value member of 
> the factory
>  ▼
> BlockBasedTableOptions
>   frocksdb/include/rocksdb/table.h:266
>   │  std::shared_ptr<Cache> block_cache;                  ← the LRUCache
>  ▼
> LRUCache (the single shared instance)
>   flink-state-backends/.../rocksdb/RocksDBMemoryControllerUtils.java:127
>      new LRUCache(cacheCapacity, -1, false, highPriorityPoolRatio);
> {quote}
> h3. Verification of OOMKill and fix
> The fix for this issue is to close the options object as follows:
>  
> {{{}--- 
> a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/state/rocksdb/sstmerge/Compactor.java{}}}{{{}+++
>  
> b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/state/rocksdb/sstmerge/Compactor.java{}}}
> {{@@ -19,6 +19,7 @@}}
> {{ package org.apache.flink.state.rocksdb.sstmerge;}}
> {{ }}
> {{ import org.rocksdb.ColumnFamilyHandle;}}
> {{+import org.rocksdb.ColumnFamilyOptions;}}
> {{ import org.rocksdb.CompactionJobInfo;}}
> {{ import org.rocksdb.CompactionOptions;}}
> {{ import org.rocksdb.RocksDB;}}
> {{@@ -51,7 +52,15 @@ class Compactor {}}
> {{     }}}
> {{ }}
> {{     void compact(ColumnFamilyHandle cfName, int level, List<String> files) 
> throws RocksDBException {}}
> {{-        int outputLevel = Math.min(level + 1, 
> cfName.getDescriptor().getOptions().numLevels() - 1);}}
> {{+        // ColumnFamilyHandle.getDescriptor() allocates a new native 
> ColumnFamilyOptions on every}}
> {{+        // call (it copies the column family's options across JNI) and 
> does not close it. Leaking}}
> {{+        // it also bumps the reference count on the shared block cache's 
> shared_ptr, preventing the}}
> {{+        // cache from ever being freed (same class of leak as 
> FLINK-21986). Close it once we have}}
> {{+        // read numLevels().}}
> {{+        final int outputLevel;}}
> {{+        try (ColumnFamilyOptions cfOptions = 
> cfName.getDescriptor().getOptions()) {}}
> {{+            outputLevel = Math.min(level + 1, cfOptions.numLevels() - 1);}}
> {{+        }}}
> {{         LOG.debug(}}
> {{                 "Manually compacting {} files from level {} to {}: {}",}}
> {{                 files.size(),}}
> Two sets of Flink SQL JOB were ran on local clusters. After 6+ hours, 8 task 
> managers without fix have working set RSS > 3.4 GB. With the fix, highest RSS 
> is at 2.8 GB
> Without closing of , TM with high RSS have ~1.54GB in ReadBlockContents, 
> exceeding configured 833MB capacity.
> {quote}— Cumulative View —
> Using local file /usr/local/lib/[libjemalloc.so|http://libjemalloc.so/].
> Argument "MSWin32" isn't numeric in numeric eq (==) at /usr/local/bin/jeprof 
> line 5222.
> Argument "linux" isn't numeric in numeric eq (==) at /usr/local/bin/jeprof 
> line 5222.
> Using local file /tmp/jeprof/prof.293.3194.i3194.heap.
> Total: [2280777636|tel:2280777636] B
>        0   0.0%   0.0% [2255548014|tel:2255548014]  98.9% je_prof_backtrace
>        0   0.0%   0.0% [2255548014|tel:2255548014]  98.9% je_prof_tctx_create
> [2255548014|tel:2255548014]  98.9%  98.9% [2255548014|tel:2255548014]  98.9% 
> prof_backtrace_impl
>        0   0.0%  98.9% [2247187497|tel:2247187497]  98.5% je_malloc_default
>        0   0.0%  98.9% [1694877743|tel:1694877743]  74.3% void* fallback_impl
>        0   0.0%  98.9% [1655889046|tel:1655889046]  72.6% 
> rocksdb::BlockFetcher::ReadBlockContents
>        0   0.0%  98.9% [1573497612|tel:1573497612]  69.0% 
> rocksdb::BlockBasedTable::NewDataBlockIterator
>        0   0.0%  98.9% [1573201192|tel:1573201192]  69.0% 
> rocksdb::BlockBasedTable::MaybeReadBlockAndLoadToCache@6eeb40
>        0   0.0%  98.9% [1573201192|tel:1573201192]  69.0% 
> rocksdb::BlockBasedTable::RetrieveBlock@6ef610
>        0   0.0%  98.9% [1510010892|tel:1510010892]  66.2% 
> rocksdb::BlockBasedTableIterator::InitDataBlock
>        0   0.0%  98.9% [1414778240|tel:1414778240]  62.0% 
> rocksdb::AtomicGroupReadBuffer::Clear
>        0   0.0%  98.9% [1097112297|tel:1097112297]  48.1% 0x00007fb693d7cc01
>        0   0.0%  98.9% [1097112297|tel:1097112297]  48.1% 
> Java_org_rocksdb_RocksIterator_seek0
>        0   0.0%  98.9% [1097112297|tel:1097112297]  48.1% 
> rocksdb::DBIter::Seek
>        0   0.0%  98.9% [1095159981|tel:1095159981]  48.0% 
> rocksdb::BlockBasedTableIterator::SeekImpl
>        0   0.0%  98.9% [1095024770|tel:1095024770]  48.0% 
> rocksdb::MergingIterator::Seek
>        0   0.0%  98.9% [1095024770|tel:1095024770]  48.0% 
> rocksdb::MergingIterator::SeekImpl
>        0   0.0%  98.9% [651438249|tel:651438249]  28.6% 
> rocksdb::UncompressBlockData
>        0   0.0%  98.9% [651438249|tel:651438249]  28.6% 
> rocksdb::UncompressSerializedBlock
>        0   0.0%  98.9% [519018670|tel:519018670]  22.8% 
> rocksdb::BlockBasedTableIterator::FindBlockForward
>        0   0.0%  98.9% [438749002|tel:438749002]  19.2% 
> rocksdb::BlockBasedTableIterator::Next
>        0   0.0%  98.9% [438749002|tel:438749002]  19.2% 
> rocksdb::BlockBasedTableIterator::NextAndGetResult
>        0   0.0%  98.9% [438587793|tel:438587793]  19.2% 
> rocksdb::MergingIterator::Next
>        0   0.0%  98.9% [438587793|tel:438587793]  19.2% 
> rocksdb::MergingIterator::NextAndGetResult
>        0   0.0%  98.9% [436500266|tel:436500266]  19.1% 0x00007fb693d82c67
>        0   0.0%  98.9% [436500266|tel:436500266]  19.1% rocksdb::DBIter::Next
>        0   0.0%  98.9% [356557773|tel:356557773]  15.6% os::malloc@d01a60
>        0   0.0%  98.9% [352586303|tel:352586303]  15.5% Unsafe_AllocateMemory0
>        0   0.0%  98.9% [352497944|tel:352497944]  15.5% 0x00007fb692fe77e0
>        0   0.0%  98.9% [142878042|tel:142878042]   6.3% os::malloc@d01ca0
>        0   0.0%  98.9% [136630920|tel:136630920]   6.0% AllocateHeap@4327f0
>        0   0.0%  98.9% [102717059|tel:102717059]   4.5% 
> {_}{{_}}GI{{_}}{_}_clone3
>        0   0.0%  98.9% [102717059|tel:102717059]   4.5% start_thread
>        0   0.0%  98.9% [92210334|tel:92210334]   4.0% 
> KlassFactory::create_from_stream
> ...
> {quote}
> With fix, TM with highest RSS have ~800MB in ReadBlockContents, consistent 
> with configured 833MB capacity.
> {quote}— Cumulative View —
> Using local file /usr/local/lib/[libjemalloc.so|http://libjemalloc.so/].
> Argument "MSWin32" isn't numeric in numeric eq (==) at /usr/local/bin/jeprof 
> line 5222.
> Argument "linux" isn't numeric in numeric eq (==) at /usr/local/bin/jeprof 
> line 5222.
> Using local file /tmp/jeprof/prof.290.8922.i8922.heap.
> Total: [1416132765|tel:1416132765] B
>        0   0.0%   0.0% [1391278564|tel:1391278564]  98.2% je_prof_backtrace
>        0   0.0%   0.0% [1391278564|tel:1391278564]  98.2% je_prof_tctx_create
> [1391278564|tel:1391278564]  98.2%  98.2% [1391278564|tel:1391278564]  98.2% 
> prof_backtrace_impl
>        0   0.0%  98.2% [1382556219|tel:1382556219]  97.6% je_malloc_default
>        0   0.0%  98.2% [856531103|tel:856531103]  60.5% void* fallback_impl
>        0   0.0%  98.2% [820714114|tel:820714114]  58.0% 
> rocksdb::BlockFetcher::ReadBlockContents
>        0   0.0%  98.2% [810027091|tel:810027091]  57.2% 
> rocksdb::BlockBasedTable::NewDataBlockIterator
>        0   0.0%  98.2% [809508356|tel:809508356]  57.2% 
> rocksdb::BlockBasedTable::RetrieveBlock@6ef610
>        0   0.0%  98.2% [809438634|tel:809438634]  57.2% 
> rocksdb::BlockBasedTable::MaybeReadBlockAndLoadToCache@6eeb40
>        0   0.0%  98.2% [767651473|tel:767651473]  54.2% 
> rocksdb::BlockBasedTableIterator::InitDataBlock
>        0   0.0%  98.2% [734287999|tel:734287999]  51.9% 0x00007fce45a7b781
>        0   0.0%  98.2% [734287999|tel:734287999]  51.9% 
> Java_org_rocksdb_RocksIterator_seek0
>        0   0.0%  98.2% [734287999|tel:734287999]  51.9% 
> rocksdb::BlockBasedTableIterator::SeekImpl
>        0   0.0%  98.2% [734287999|tel:734287999]  51.9% rocksdb::DBIter::Seek
>        0   0.0%  98.2% [734287999|tel:734287999]  51.9% 
> rocksdb::MergingIterator::Seek
>        0   0.0%  98.2% [734287999|tel:734287999]  51.9% 
> rocksdb::MergingIterator::SeekImpl
>        0   0.0%  98.2% [733060136|tel:733060136]  51.8% 
> rocksdb::AtomicGroupReadBuffer::Clear
>        0   0.0%  98.2% [353073142|tel:353073142]  24.9% os::malloc@d01a60
>        0   0.0%  98.2% [349535104|tel:349535104]  24.7% Unsafe_AllocateMemory0
>        0   0.0%  98.2% [349464287|tel:349464287]  24.7% 0x00007fce455e6160
>        0   0.0%  98.2% [136836241|tel:136836241]   9.7% os::malloc@d01ca0
>        0   0.0%  98.2% [135269945|tel:135269945]   9.6% 
> rocksdb::UncompressBlockData
>        0   0.0%  98.2% [135269945|tel:135269945]   9.6% 
> rocksdb::UncompressSerializedBlock
>        0   0.0%  98.2% [130278866|tel:130278866]   9.2% AllocateHeap@4327f0
>        0   0.0%  98.2% [124409905|tel:124409905]   8.8% 
> rocksdb::BlockBasedTableIterator::FindBlockForward
>        0   0.0%  98.2% [91613963|tel:91613963]   6.5% 
> KlassFactory::create_from_stream
>  ...
> {quote}
> See here for reproduction steps to trigger OOMKill by starting, stopping 
> Flink job multiple times: 
> [https://github.com/leekeiabstraction/flink/tree/sst-merge-getdescriptor-leak-repro/sst-leak-repro]
>  (note, the configuration in these reproduction have higher managed memory 
> fraction to more quickly trigger OOMKill).



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to