[ 
https://issues.apache.org/jira/browse/FLINK-21726?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17305647#comment-17305647
 ] 

fanrui commented on FLINK-21726:
--------------------------------

[~yunta] Thanks for your work on the frocksdb project.


Most of our flink jobs are also using Level Compaction. Only some flink jobs 
that use IntervalJoin are using UNIVERSIAL Compaction.

Some IntervalJoin Flink jobs have very large State, and a single subtask has 
20G. IntervalJoin writes data very frequently, Level Compaction write 
amplification is more serious and Compaction is very frequent, resulting in 
higher CPU and disk IO. So we switched to UNIVERSIAL Compaction, which can save 
more than 20% of the CPU.

> Fix checkpoint stuck
> --------------------
>
>                 Key: FLINK-21726
>                 URL: https://issues.apache.org/jira/browse/FLINK-21726
>             Project: Flink
>          Issue Type: Bug
>          Components: Runtime / State Backends
>    Affects Versions: 1.11.3, 1.12.2, 1.13.0
>            Reporter: fanrui
>            Priority: Critical
>             Fix For: 1.13.0
>
>
> h1. 1. Bug description:
> When RocksDB Checkpoint, it may be stuck in 
> `WaitUntilFlushWouldNotStallWrites` method.
> h1. 2. Simple analysis of the reasons:
> h2. 2.1 Configuration parameters:
>  
> {code:java}
> # Flink yaml:
> state.backend.rocksdb.predefined-options: SPINNING_DISK_OPTIMIZED_HIGH_MEM
> state.backend.rocksdb.compaction.style: UNIVERSAL
> # corresponding RocksDB config
> Compaction Style : Universal 
> max_write_buffer_number : 4
> min_write_buffer_number_to_merge : 3{code}
> Checkpoint is usually very fast. When the Checkpoint is executed, 
> `WaitUntilFlushWouldNotStallWrites` is called. If there are 2 Immutable 
> MemTables, which are less than `min_write_buffer_number_to_merge`, they will 
> not be flushed. But will enter this code.
>  
> {code:java}
> // method: GetWriteStallConditionAndCause
> if (mutable_cf_options.max_write_buffer_number> 3 &&
>               num_unflushed_memtables >=
>                   mutable_cf_options.max_write_buffer_number-1) {
>      return {WriteStallCondition::kDelayed, WriteStallCause::kMemtableLimit};
> }
> {code}
> code link: 
> [https://github.com/facebook/rocksdb/blob/fbed72f03c3d9e4fdca3e5993587ef2559ba6ab9/db/column_family.cc#L847]
> Checkpoint thought there was a FlushJob, but it didn't. So will always wait.
> h2. 2.2 solution:
> Increase the restriction: the `number of Immutable MemTable` >= 
> `min_write_buffer_number_to_merge will wait`.
> The rocksdb community has fixed this bug, link: 
> [https://github.com/facebook/rocksdb/pull/7921]
> h2. 2.3 Code that can reproduce the bug:
> [https://github.com/1996fanrui/fanrui-learning/blob/flink-1.12/module-java/src/main/java/com/dream/rocksdb/RocksDBCheckpointStuck.java]
> h1. 3. Interesting point
> This bug will be triggered only when `the number of sorted runs >= 
> level0_file_num_compaction_trigger`.
> Because there is a break in WaitUntilFlushWouldNotStallWrites.
> {code:java}
> if (cfd->imm()->NumNotFlushed() <
>         cfd->ioptions()->min_write_buffer_number_to_merge &&
>     vstorage->l0_delay_trigger_count() <
>         mutable_cf_options.level0_file_num_compaction_trigger) {
>   break;
> }
> {code}
> code link: 
> [https://github.com/facebook/rocksdb/blob/fbed72f03c3d9e4fdca3e5993587ef2559ba6ab9/db/db_impl/db_impl_compaction_flush.cc#L1974]
> Universal may have `l0_delay_trigger_count() >= 
> level0_file_num_compaction_trigger`, so this bug is triggered.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to