[ 
https://issues.apache.org/jira/browse/KAFKA-7023?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16508694#comment-16508694
 ] 

ASF GitHub Bot commented on KAFKA-7023:
---------------------------------------

guozhangwang closed pull request #5166: KAFKA-7023: Move prepareForBulkLoad() 
call after customized RocksDBConfigSetter
URL: https://github.com/apache/kafka/pull/5166
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
index cfef035a4fd..6084ecbf1e0 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
@@ -130,10 +130,6 @@ public void openDB(final ProcessorContext context) {
         // (this could be a bug in the RocksDB code and their devs have been 
contacted).
         
options.setIncreaseParallelism(Math.max(Runtime.getRuntime().availableProcessors(),
 2));
 
-        if (prepareForBulkload) {
-            options.prepareForBulkLoad();
-        }
-
         wOptions = new WriteOptions();
         wOptions.setDisableWAL(true);
 
@@ -148,6 +144,11 @@ public void openDB(final ProcessorContext context) {
             final RocksDBConfigSetter configSetter = 
Utils.newInstance(configSetterClass);
             configSetter.setConfig(name, options, configs);
         }
+
+        if (prepareForBulkload) {
+            options.prepareForBulkLoad();
+        }
+
         this.dbDir = new File(new File(context.stateDir(), parentDir), 
this.name);
 
         try {


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Kafka Streams RocksDB bulk loading config may not be honored with customized 
> RocksDBConfigSetter 
> -------------------------------------------------------------------------------------------------
>
>                 Key: KAFKA-7023
>                 URL: https://issues.apache.org/jira/browse/KAFKA-7023
>             Project: Kafka
>          Issue Type: Bug
>          Components: streams
>    Affects Versions: 1.1.0
>            Reporter: Liquan Pei
>            Assignee: Liquan Pei
>            Priority: Major
>   Original Estimate: 24h
>  Remaining Estimate: 24h
>
> We observed frequent L0 -> L1 compaction during Kafka Streams state recovery. 
> Some sample log:
> {code:java}
> 2018/06/08-00:04:50.892331 7f8a6d7fa700 (Original Log Time 
> 2018/06/08-00:04:50.892298) [db/compaction_picker_universal.cc:270] [default] 
> Universal: sorted runs files(6): files[3 0 0 0 1 1 38] max score 1.00
> 2018/06/08-00:04:50.892336 7f8a6d7fa700 (Original Log Time 
> 2018/06/08-00:04:50.892300) [db/compaction_picker_universal.cc:655] [default] 
> Universal: First candidate file 134[0] to reduce size amp.
> 2018/06/08-00:04:50.892338 7f8a6d7fa700 (Original Log Time 
> 2018/06/08-00:04:50.892302) [db/compaction_picker_universal.cc:686] [default] 
> Universal: size amp not needed. newer-files-total-size 13023497 
> earliest-file-size 2541530372
> 2018/06/08-00:04:50.892339 7f8a6d7fa700 (Original Log Time 
> 2018/06/08-00:04:50.892303) [db/compaction_picker_universal.cc:473] [default] 
> Universal: Possible candidate file 134[0].
> 2018/06/08-00:04:50.892341 7f8a6d7fa700 (Original Log Time 
> 2018/06/08-00:04:50.892304) [db/compaction_picker_universal.cc:525] [default] 
> Universal: Skipping file 134[0] with size 1007 (compensated size 1287)
> 2018/06/08-00:04:50.892343 7f8a6d7fa700 (Original Log Time 
> 2018/06/08-00:04:50.892306) [db/compaction_picker_universal.cc:473] [default] 
> Universal: Possible candidate file 133[1].
> 2018/06/08-00:04:50.892344 7f8a6d7fa700 (Original Log Time 
> 2018/06/08-00:04:50.892307) [db/compaction_picker_universal.cc:525] [default] 
> Universal: Skipping file 133[1] with size 4644 (compensated size 16124)
> 2018/06/08-00:04:50.892346 7f8a6d7fa700 (Original Log Time 
> 2018/06/08-00:04:50.892307) [db/compaction_picker_universal.cc:473] [default] 
> Universal: Possible candidate file 126[2].
> 2018/06/08-00:04:50.892348 7f8a6d7fa700 (Original Log Time 
> 2018/06/08-00:04:50.892308) [db/compaction_picker_universal.cc:525] [default] 
> Universal: Skipping file 126[2] with size 319764 (compensated size 319764)
> 2018/06/08-00:04:50.892349 7f8a6d7fa700 (Original Log Time 
> 2018/06/08-00:04:50.892309) [db/compaction_picker_universal.cc:473] [default] 
> Universal: Possible candidate level 4[3].
> 2018/06/08-00:04:50.892351 7f8a6d7fa700 (Original Log Time 
> 2018/06/08-00:04:50.892310) [db/compaction_picker_universal.cc:525] [default] 
> Universal: Skipping level 4[3] with size 2815574 (compensated size 2815574)
> 2018/06/08-00:04:50.892352 7f8a6d7fa700 (Original Log Time 
> 2018/06/08-00:04:50.892311) [db/compaction_picker_universal.cc:473] [default] 
> Universal: Possible candidate level 5[4].
> 2018/06/08-00:04:50.892357 7f8a6d7fa700 (Original Log Time 
> 2018/06/08-00:04:50.892311) [db/compaction_picker_universal.cc:525] [default] 
> Universal: Skipping level 5[4] with size 9870748 (compensated size 9870748)
> 2018/06/08-00:04:50.892358 7f8a6d7fa700 (Original Log Time 
> 2018/06/08-00:04:50.892313) [db/compaction_picker_universal.cc:473] [default] 
> Universal: Possible candidate level 6[5].
> {code}
> In customized RocksDBConfigSetter, we set 
> {code:java}
> level0_file_num_compaction_trigger=6 {code}
> During bulk loading, the following options are set: 
> [https://github.com/facebook/rocksdb/blob/master/options/options.cc] 
> {code:java}
> Options*
> Options::PrepareForBulkLoad()
> {
> // never slowdown ingest.
> level0_file_num_compaction_trigger = (1<<30);
> level0_slowdown_writes_trigger = (1<<30);
> level0_stop_writes_trigger = (1<<30);
> soft_pending_compaction_bytes_limit = 0;
> hard_pending_compaction_bytes_limit = 0;
> // no auto compactions please. The application should issue a
> // manual compaction after all data is loaded into L0.
> disable_auto_compactions = true;
> // A manual compaction run should pick all files in L0 in
> // a single compaction run.
> max_compaction_bytes = (static_cast<uint64_t>(1) << 60);
> // It is better to have only 2 levels, otherwise a manual
> // compaction would compact at every possible level, thereby
> // increasing the total time needed for compactions.
> num_levels = 2;
> // Need to allow more write buffers to allow more parallism
> // of flushes.
> max_write_buffer_number = 6;
> min_write_buffer_number_to_merge = 1;
> // When compaction is disabled, more parallel flush threads can
> // help with write throughput.
> max_background_flushes = 4;
> // Prevent a memtable flush to automatically promote files
> // to L1. This is helpful so that all files that are
> // input to the manual compaction are all at L0.
> max_background_compactions = 2;
> // The compaction would create large files in L1.
> target_file_size_base = 256 * 1024 * 1024;
> return this;
> }
> {code}
> Especially, those values are set to a very large number to avoid compactions 
> and ensures files are all on L0. 
> {code:java}
> level0_file_num_compaction_trigger = (1<<30);
> level0_slowdown_writes_trigger = (1<<30);
> level0_stop_writes_trigger = (1<<30);
> {code}
> However, in RockDBStore.java, openDB code, we first call:
> options.prepareForBulkLoad() and then use the configs from the customized 
> customized RocksDBConfigSetter. This may overwrite the configs set in 
> prepareBulkLoad call. The fix is to move prepareBulkLoad call after applying 
> configs customized RocksDBConfigSetter. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to