This is an automated email from the ASF dual-hosted git repository.

srichter pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new a8580ca  [FLINK-10912][rocksdb] Configurable RocksDBStateBackend 
options
a8580ca is described below

commit a8580ca08dab19f83ee87ea83e0801c516687e62
Author: Yun Tang <[email protected]>
AuthorDate: Mon Jan 28 17:28:31 2019 +0800

    [FLINK-10912][rocksdb] Configurable RocksDBStateBackend options
    
    This closes #7586.
---
 .../rocks_db_configurable_configuration.html       |  66 ++++
 .../generated/rocks_db_configuration.html          |  10 +
 docs/ops/config.md                                 |   7 +
 docs/ops/state/large_state_tuning.md               |  73 ++--
 .../state/ConfigurableOptionsFactory.java          |  39 ++
 .../state/DefaultConfigurableOptionsFactory.java   | 425 +++++++++++++++++++++
 .../state/RocksDBConfigurableOptions.java          | 123 ++++++
 .../contrib/streaming/state/RocksDBOptions.java    |  25 ++
 .../streaming/state/RocksDBStateBackend.java       |  89 ++++-
 .../state/RocksDBStateBackendConfigTest.java       | 169 ++++++++
 10 files changed, 991 insertions(+), 35 deletions(-)

diff --git a/docs/_includes/generated/rocks_db_configurable_configuration.html 
b/docs/_includes/generated/rocks_db_configurable_configuration.html
new file mode 100644
index 0000000..638a3a5
--- /dev/null
+++ b/docs/_includes/generated/rocks_db_configurable_configuration.html
@@ -0,0 +1,66 @@
+<table class="table table-bordered">
+    <thead>
+        <tr>
+            <th class="text-left" style="width: 20%">Key</th>
+            <th class="text-left" style="width: 15%">Default</th>
+            <th class="text-left" style="width: 65%">Description</th>
+        </tr>
+    </thead>
+    <tbody>
+        <tr>
+            <td><h5>state.backend.rocksdb.block.blocksize</h5></td>
+            <td style="word-wrap: break-word;">(none)</td>
+            <td>The approximate size (in bytes) of user data packed per block. 
RocksDB has default blocksize as '4KB'.</td>
+        </tr>
+        <tr>
+            <td><h5>state.backend.rocksdb.block.cache-size</h5></td>
+            <td style="word-wrap: break-word;">(none)</td>
+            <td>The amount of the cache for data blocks in RocksDB. RocksDB 
has default block-cache size as '8MB'.</td>
+        </tr>
+        <tr>
+            
<td><h5>state.backend.rocksdb.compaction.level.max-size-level-base</h5></td>
+            <td style="word-wrap: break-word;">(none)</td>
+            <td>The upper-bound of the total size of level base files in 
bytes. RocksDB has default configuration as '10MB'.</td>
+        </tr>
+        <tr>
+            
<td><h5>state.backend.rocksdb.compaction.level.target-file-size-base</h5></td>
+            <td style="word-wrap: break-word;">(none)</td>
+            <td>The target file size for compaction, which determines a 
level-1 file size. RocksDB has default configuration as '2MB'.</td>
+        </tr>
+        <tr>
+            
<td><h5>state.backend.rocksdb.compaction.level.use-dynamic-size</h5></td>
+            <td style="word-wrap: break-word;">(none)</td>
+            <td>If true, RocksDB will pick target size of each level 
dynamically. From an empty DB, RocksDB would make last level the base level, 
which means merging L0 data into the last level, until it exceeds 
max_bytes_for_level_base. And then repeat this process for second last level 
and so on. RocksDB has default configuration as 'false'. For more information, 
please refer to <a 
href="https://github.com/facebook/rocksdb/wiki/Leveled-Compaction#level_compaction_dynamic_level_bytes-is
 [...]
+        </tr>
+        <tr>
+            <td><h5>state.backend.rocksdb.compaction.style</h5></td>
+            <td style="word-wrap: break-word;">(none)</td>
+            <td>The specified compaction style for DB. Candidate compaction 
style is LEVEL, FIFO or UNIVERSAL, and RocksDB choose 'LEVEL' as default 
style.</td>
+        </tr>
+        <tr>
+            <td><h5>state.backend.rocksdb.files.open</h5></td>
+            <td style="word-wrap: break-word;">(none)</td>
+            <td>The maximum number of open files that can be used by the DB, 
'-1' means no limit. RocksDB has default configuration as '5000'.</td>
+        </tr>
+        <tr>
+            <td><h5>state.backend.rocksdb.thread.num</h5></td>
+            <td style="word-wrap: break-word;">(none)</td>
+            <td>The maximum number of concurrent background flush and 
compaction jobs. RocksDB has default configuration as '1'.</td>
+        </tr>
+        <tr>
+            <td><h5>state.backend.rocksdb.writebuffer.count</h5></td>
+            <td style="word-wrap: break-word;">(none)</td>
+            <td>Tne maximum number of write buffers that are built up in 
memory. RocksDB has default configuration as '2'.</td>
+        </tr>
+        <tr>
+            <td><h5>state.backend.rocksdb.writebuffer.number-to-merge</h5></td>
+            <td style="word-wrap: break-word;">(none)</td>
+            <td>The minimum number of write buffers that will be merged 
together before writing to storage. RocksDB has default configuration as 
'1'.</td>
+        </tr>
+        <tr>
+            <td><h5>state.backend.rocksdb.writebuffer.size</h5></td>
+            <td style="word-wrap: break-word;">(none)</td>
+            <td>The amount of data built up in memory (backed by an unsorted 
log on disk) before converting to a sorted on-disk files. RocksDB has default 
writebuffer size as '4MB'.</td>
+        </tr>
+    </tbody>
+</table>
diff --git a/docs/_includes/generated/rocks_db_configuration.html 
b/docs/_includes/generated/rocks_db_configuration.html
index b88c18e..88e8c46 100644
--- a/docs/_includes/generated/rocks_db_configuration.html
+++ b/docs/_includes/generated/rocks_db_configuration.html
@@ -18,6 +18,16 @@
             <td>The local directory (on the TaskManager) where RocksDB puts 
its files.</td>
         </tr>
         <tr>
+            <td><h5>state.backend.rocksdb.options-factory</h5></td>
+            <td style="word-wrap: 
break-word;">"org.apache.flink.contrib.streaming.state.DefaultConfigurableOptionsFactory"</td>
+            <td>The options factory class for RocksDB to create DBOptions and 
ColumnFamilyOptions. The default options factory is 
org.apache.flink.contrib.streaming.state.DefaultConfigurableOptionsFactory, and 
it would read the configured options which provided in 
'RocksDBConfigurableOptions'.</td>
+        </tr>
+        <tr>
+            <td><h5>state.backend.rocksdb.predefined-options</h5></td>
+            <td style="word-wrap: break-word;">"DEFAULT"</td>
+            <td>The predefined settings for RocksDB DBOptions and 
ColumnFamilyOptions by Flink community. Current supported candidate 
predefined-options are DEFAULT, SPINNING_DISK_OPTIMIZED, 
SPINNING_DISK_OPTIMIZED_HIGH_MEM or FLASH_SSD_OPTIMIZED. Note that user 
customized options and options from the OptionsFactory are applied on top of 
these predefined ones.</td>
+        </tr>
+        <tr>
             <td><h5>state.backend.rocksdb.timer-service.factory</h5></td>
             <td style="word-wrap: break-word;">"HEAP"</td>
             <td>This determines the factory for timer service state 
implementation. Options are either HEAP (heap-based, default) or ROCKSDB for an 
implementation based on RocksDB .</td>
diff --git a/docs/ops/config.md b/docs/ops/config.md
index 5c9ad82..17c0814 100644
--- a/docs/ops/config.md
+++ b/docs/ops/config.md
@@ -162,6 +162,13 @@ The configuration keys in this section are independent of 
the used resource mana
 
 {% include generated/rocks_db_configuration.html %}
 
+### RocksDB Configurable Options
+Specific RocksDB configurable options, provided by Flink, to create a 
corresponding `ConfigurableOptionsFactory`.
+And the created one would be used as default `OptionsFactory` in 
`RocksDBStateBackend`
+unless user define a `OptionsFactory` and set via 
`RocksDBStateBackend.setOptions(optionsFactory)`
+
+{% include generated/rocks_db_configurable_configuration.html %}
+
 ### Queryable State
 
 {% include generated/queryable_state_configuration.html %}
diff --git a/docs/ops/state/large_state_tuning.md 
b/docs/ops/state/large_state_tuning.md
index d175ca6..effea99 100644
--- a/docs/ops/state/large_state_tuning.md
+++ b/docs/ops/state/large_state_tuning.md
@@ -153,36 +153,61 @@ Possible choices are `heap` (to store timers on the heap, 
default) and `rocksdb`
 <span class="label label-info">Note</span> *The combination RocksDB state 
backend / with incremental checkpoint / with heap-based timers currently does 
NOT support asynchronous snapshots for the timers state.
 Other state like keyed state is still snapshotted asynchronously. Please note 
that this is not a regression from previous versions and will be resolved with 
`FLINK-10026`.*
 
-**Passing Options to RocksDB**
+**Predefined Options**
 
-{% highlight java %}
-RocksDBStateBackend.setOptions(new MyOptions());
+Flink provides some predefined collections of option for RocksDB for different 
settings, and there existed two ways
+to pass these predefined options to RocksDB:
+  - Configure the predefined options through `flink-conf.yaml` via option key 
`state.backend.rocksdb.predefined-options`.
+    The default value of this option is `DEFAULT` which means 
`PredefinedOptions.DEFAULT`.
+  - Set the predefined options programmatically, e.g. 
`RocksDBStateBackend.setPredefinedOptions(PredefinedOptions.SPINNING_DISK_OPTIMIZED_HIGH_MEM)`.
 
-public class MyOptions implements OptionsFactory {
+We expect to accumulate more such profiles over time. Feel free to contribute 
such predefined option profiles when you
+found a set of options that work well and seem representative for certain 
workloads.
 
-    @Override
-    public DBOptions createDBOptions(DBOptions currentOptions) {
-       return currentOptions.setIncreaseParallelism(4)
-                  .setUseFsync(false);
-    }
-               
-    @Override
-    public ColumnFamilyOptions createColumnOptions(ColumnFamilyOptions 
currentOptions) {
-       return currentOptions.setTableFormatConfig(
-               new BlockBasedTableConfig()
-                       .setBlockCacheSize(256 * 1024 * 1024)  // 256 MB
-                       .setBlockSize(128 * 1024));            // 128 KB
-    }
-}
-{% endhighlight %}
+<span class="label label-info">Note</span> Predefined options which set 
programmatically would override the one configured via `flink-conf.yaml`.
 
-**Predefined Options**
+**Passing Options Factory to RocksDB**
 
-Flink provides some predefined collections of option for RocksDB for different 
settings, which can be set for example via
-`RocksDBStateBackend.setPredefinedOptions(PredefinedOptions.SPINNING_DISK_OPTIMIZED_HIGH_MEM)`.
+There existed two ways to pass options factory to RocksDB in Flink:
 
-We expect to accumulate more such profiles over time. Feel free to contribute 
such predefined option profiles when you
-found a set of options that work well and seem representative for certain 
workloads.
+  - Configure options factory through `flink-conf.yaml`. You could set the 
options factory class name via option key 
`state.backend.rocksdb.options-factory`.
+    The default value for this option is 
`org.apache.flink.contrib.streaming.state.DefaultConfigurableOptionsFactory`, 
and all candidate configurable options are defined in 
`RocksDBConfigurableOptions`.
+    Moreover, you could also define your customized and configurable options 
factory class like below and pass the class name to 
`state.backend.rocksdb.options-factory`.
+
+    {% highlight java %}
+
+    public class MyOptionsFactory implements ConfigurableOptionsFactory {
+
+        private static final long DEFAULT_SIZE = 256 * 1024 * 1024;  // 256 MB
+        private long blockCacheSize = DEFAULT_SIZE;
+
+        @Override
+        public DBOptions createDBOptions(DBOptions currentOptions) {
+            return currentOptions.setIncreaseParallelism(4)
+                   .setUseFsync(false);
+        }
+
+        @Override
+        public ColumnFamilyOptions createColumnOptions(ColumnFamilyOptions 
currentOptions) {
+            return currentOptions.setTableFormatConfig(
+                new BlockBasedTableConfig()
+                    .setBlockCacheSize(blockCacheSize)
+                    .setBlockSize(128 * 1024));            // 128 KB
+        }
+
+        @Override
+        public OptionsFactory configure(Configuration configuration) {
+            this.blockCacheSize =
+                configuration.getLong("my.custom.rocksdb.block.cache.size", 
DEFAULT_SIZE);
+            return this;
+        }
+    }
+    {% endhighlight %}
+
+  - Set the options factory programmatically, e.g. 
`RocksDBStateBackend.setOptions(new MyOptionsFactory());`
+
+<span class="label label-info">Note</span> Options factory which set 
programmatically would override the one configured via `flink-conf.yaml`,
+and options factory has a higher priority over the predefined options if ever 
configured or set.
 
 <span class="label label-info">Note</span> RocksDB is a native library that 
allocates memory directly from the process,
 and not from the JVM. Any memory you assign to RocksDB will have to be 
accounted for, typically by decreasing the JVM heap size
diff --git 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/ConfigurableOptionsFactory.java
 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/ConfigurableOptionsFactory.java
new file mode 100644
index 0000000..80d1a1f
--- /dev/null
+++ 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/ConfigurableOptionsFactory.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.contrib.streaming.state;
+
+import org.apache.flink.configuration.Configuration;
+
+/**
+ * An interface for options factory that pick up additional parameters from a 
configuration.
+ */
+public interface ConfigurableOptionsFactory extends OptionsFactory {
+
+       /**
+        * Creates a variant of the options factory that applies additional 
configuration parameters.
+        *
+        * <p>If no configuration is applied, or if the method directly applies 
configuration values to
+        * the (mutable) options factory object, this method may return the 
original options factory object.
+        * Otherwise it typically returns a modified copy.
+        *
+        * @param configuration The configuration to pick the values from.
+        * @return A reconfigured options factory.
+        */
+       OptionsFactory configure(Configuration configuration);
+}
diff --git 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/DefaultConfigurableOptionsFactory.java
 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/DefaultConfigurableOptionsFactory.java
new file mode 100644
index 0000000..5f5f7d1
--- /dev/null
+++ 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/DefaultConfigurableOptionsFactory.java
@@ -0,0 +1,425 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.contrib.streaming.state;
+
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.MemorySize;
+import org.apache.flink.util.Preconditions;
+
+import org.rocksdb.BlockBasedTableConfig;
+import org.rocksdb.ColumnFamilyOptions;
+import org.rocksdb.CompactionStyle;
+import org.rocksdb.DBOptions;
+import org.rocksdb.PlainTableConfig;
+import org.rocksdb.TableFormatConfig;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import static 
org.apache.flink.contrib.streaming.state.RocksDBConfigurableOptions.BLOCK_CACHE_SIZE;
+import static 
org.apache.flink.contrib.streaming.state.RocksDBConfigurableOptions.BLOCK_SIZE;
+import static 
org.apache.flink.contrib.streaming.state.RocksDBConfigurableOptions.COMPACTION_STYLE;
+import static 
org.apache.flink.contrib.streaming.state.RocksDBConfigurableOptions.MAX_BACKGROUND_THREADS;
+import static 
org.apache.flink.contrib.streaming.state.RocksDBConfigurableOptions.MAX_OPEN_FILES;
+import static 
org.apache.flink.contrib.streaming.state.RocksDBConfigurableOptions.MAX_SIZE_LEVEL_BASE;
+import static 
org.apache.flink.contrib.streaming.state.RocksDBConfigurableOptions.MAX_WRITE_BUFFER_NUMBER;
+import static 
org.apache.flink.contrib.streaming.state.RocksDBConfigurableOptions.MIN_WRITE_BUFFER_NUMBER_TO_MERGE;
+import static 
org.apache.flink.contrib.streaming.state.RocksDBConfigurableOptions.TARGET_FILE_SIZE_BASE;
+import static 
org.apache.flink.contrib.streaming.state.RocksDBConfigurableOptions.USE_DYNAMIC_LEVEL_SIZE;
+import static 
org.apache.flink.contrib.streaming.state.RocksDBConfigurableOptions.WRITE_BUFFER_SIZE;
+
+/**
+ * An implementation of {@link ConfigurableOptionsFactory} using options 
provided by {@link RocksDBConfigurableOptions}
+ * and acted as the default options factory within {@link RocksDBStateBackend} 
if user not defined a {@link OptionsFactory}.
+ *
+ * <p>This implementation also provide some setters to let user could create a 
{@link OptionsFactory} conveniently。
+ */
+public class DefaultConfigurableOptionsFactory implements 
ConfigurableOptionsFactory {
+
+       private final Map<String, String> configuredOptions = new HashMap<>();
+
+       @Override
+       public DBOptions createDBOptions(DBOptions currentOptions) {
+               if (isOptionConfigured(MAX_BACKGROUND_THREADS)) {
+                       
currentOptions.setIncreaseParallelism(getMaxBackgroundThreads());
+               }
+
+               if (isOptionConfigured(MAX_OPEN_FILES)) {
+                       currentOptions.setMaxOpenFiles(getMaxOpenFiles());
+               }
+
+               return currentOptions;
+       }
+
+       @Override
+       public ColumnFamilyOptions createColumnOptions(ColumnFamilyOptions 
currentOptions) {
+               if (isOptionConfigured(COMPACTION_STYLE)) {
+                       currentOptions.setCompactionStyle(getCompactionStyle());
+               }
+
+               if (isOptionConfigured(USE_DYNAMIC_LEVEL_SIZE)) {
+                       
currentOptions.setLevelCompactionDynamicLevelBytes(getUseDynamicLevelSize());
+               }
+
+               if (isOptionConfigured(TARGET_FILE_SIZE_BASE)) {
+                       
currentOptions.setTargetFileSizeBase(getTargetFileSizeBase());
+               }
+
+               if (isOptionConfigured(MAX_SIZE_LEVEL_BASE)) {
+                       
currentOptions.setMaxBytesForLevelBase(getMaxSizeLevelBase());
+               }
+
+               if (isOptionConfigured(WRITE_BUFFER_SIZE)) {
+                       currentOptions.setWriteBufferSize(getWriteBufferSize());
+               }
+
+               if (isOptionConfigured(MAX_WRITE_BUFFER_NUMBER)) {
+                       
currentOptions.setMaxWriteBufferNumber(getMaxWriteBufferNumber());
+               }
+
+               if (isOptionConfigured(MIN_WRITE_BUFFER_NUMBER_TO_MERGE)) {
+                       
currentOptions.setMinWriteBufferNumberToMerge(getMinWriteBufferNumberToMerge());
+               }
+
+               TableFormatConfig tableFormatConfig = 
currentOptions.tableFormatConfig();
+
+               BlockBasedTableConfig blockBasedTableConfig;
+               if (tableFormatConfig == null) {
+                       blockBasedTableConfig = new BlockBasedTableConfig();
+               } else {
+                       if (tableFormatConfig instanceof PlainTableConfig) {
+                               // if the table format config is 
PlainTableConfig, we just return current column-family options
+                               return currentOptions;
+                       } else {
+                               blockBasedTableConfig = (BlockBasedTableConfig) 
tableFormatConfig;
+                       }
+               }
+
+               if (isOptionConfigured(BLOCK_SIZE)) {
+                       blockBasedTableConfig.setBlockSize(getBlockSize());
+               }
+
+               if (isOptionConfigured(BLOCK_CACHE_SIZE)) {
+                       
blockBasedTableConfig.setBlockCacheSize(getBlockCacheSize());
+               }
+
+               return 
currentOptions.setTableFormatConfig(blockBasedTableConfig);
+       }
+
+       public Map<String, String> getConfiguredOptions() {
+               return new HashMap<>(configuredOptions);
+       }
+
+       private boolean isOptionConfigured(ConfigOption configOption) {
+               return configuredOptions.containsKey(configOption.key());
+       }
+
+       
//--------------------------------------------------------------------------
+       // Maximum number of concurrent background flush and compaction threads
+       
//--------------------------------------------------------------------------
+
+       private int getMaxBackgroundThreads() {
+               return 
Integer.parseInt(getInternal(MAX_BACKGROUND_THREADS.key()));
+       }
+
+       public DefaultConfigurableOptionsFactory setMaxBackgroundThreads(int 
totalThreadCount) {
+               Preconditions.checkArgument(totalThreadCount > 0);
+               configuredOptions.put(MAX_BACKGROUND_THREADS.key(), 
String.valueOf(totalThreadCount));
+               return this;
+       }
+
+       
//--------------------------------------------------------------------------
+       // Maximum number of open files
+       
//--------------------------------------------------------------------------
+
+       private int getMaxOpenFiles() {
+               return Integer.parseInt(getInternal(MAX_OPEN_FILES.key()));
+       }
+
+       public DefaultConfigurableOptionsFactory setMaxOpenFiles(int 
maxOpenFiles) {
+               configuredOptions.put(MAX_OPEN_FILES.key(), 
String.valueOf(maxOpenFiles));
+               return this;
+       }
+
+       
//--------------------------------------------------------------------------
+       // The style of compaction for DB.
+       
//--------------------------------------------------------------------------
+
+       private CompactionStyle getCompactionStyle() {
+               return 
CompactionStyle.valueOf(getInternal(COMPACTION_STYLE.key()).toUpperCase());
+       }
+
+       public DefaultConfigurableOptionsFactory 
setCompactionStyle(CompactionStyle compactionStyle) {
+               setInternal(COMPACTION_STYLE.key(), compactionStyle.name());
+               return this;
+       }
+
+       
//--------------------------------------------------------------------------
+       // Whether to configure RocksDB to pick target size of each level 
dynamically.
+       
//--------------------------------------------------------------------------
+
+       private boolean getUseDynamicLevelSize() {
+               return 
getInternal(USE_DYNAMIC_LEVEL_SIZE.key()).compareToIgnoreCase("false") != 0;
+       }
+
+       public DefaultConfigurableOptionsFactory setUseDynamicLevelSize(boolean 
value) {
+               configuredOptions.put(USE_DYNAMIC_LEVEL_SIZE.key(), value ? 
"true" : "false");
+               return this;
+       }
+
+       
//--------------------------------------------------------------------------
+       // The target file size for compaction, i.e., the per-file size for 
level-1
+       
//--------------------------------------------------------------------------
+
+       private long getTargetFileSizeBase() {
+               return 
MemorySize.parseBytes(getInternal(TARGET_FILE_SIZE_BASE.key()));
+       }
+
+       public DefaultConfigurableOptionsFactory setTargetFileSizeBase(String 
targetFileSizeBase) {
+               
Preconditions.checkArgument(MemorySize.parseBytes(targetFileSizeBase) > 0,
+                       "Invalid configuration " + targetFileSizeBase + " for 
target file size base.");
+               setInternal(TARGET_FILE_SIZE_BASE.key(), targetFileSizeBase);
+               return this;
+       }
+
+
+       
//--------------------------------------------------------------------------
+       // Maximum total data size for a level, i.e., the max total size for 
level-1
+       
//--------------------------------------------------------------------------
+
+       private long getMaxSizeLevelBase() {
+               return 
MemorySize.parseBytes(getInternal(MAX_SIZE_LEVEL_BASE.key()));
+       }
+
+       public DefaultConfigurableOptionsFactory setMaxSizeLevelBase(String 
maxSizeLevelBase) {
+               
Preconditions.checkArgument(MemorySize.parseBytes(maxSizeLevelBase) > 0,
+                       "Invalid configuration " + maxSizeLevelBase + " for max 
size of level base.");
+               setInternal(MAX_SIZE_LEVEL_BASE.key(), maxSizeLevelBase);
+               return this;
+       }
+
+       
//--------------------------------------------------------------------------
+       // Amount of data to build up in memory (backed by an unsorted log on 
disk)
+       // before converting to a sorted on-disk file. Larger values increase
+       // performance, especially during bulk loads.
+       
//--------------------------------------------------------------------------
+
+       private long getWriteBufferSize() {
+               return 
MemorySize.parseBytes(getInternal(WRITE_BUFFER_SIZE.key()));
+       }
+
+       public DefaultConfigurableOptionsFactory setWriteBufferSize(String 
writeBufferSize) {
+               
Preconditions.checkArgument(MemorySize.parseBytes(writeBufferSize) > 0,
+                       "Invalid configuration " + writeBufferSize + " for 
write-buffer size.");
+
+               setInternal(WRITE_BUFFER_SIZE.key(), writeBufferSize);
+               return this;
+       }
+
+       
//--------------------------------------------------------------------------
+       // The maximum number of write buffers that are built up in memory.
+       
//--------------------------------------------------------------------------
+
+       private int getMaxWriteBufferNumber() {
+               return 
Integer.parseInt(getInternal(MAX_WRITE_BUFFER_NUMBER.key()));
+       }
+
+       public DefaultConfigurableOptionsFactory setMaxWriteBufferNumber(int 
writeBufferNumber) {
+               Preconditions.checkArgument(writeBufferNumber > 0,
+                       "Invalid configuration " + writeBufferNumber + " for 
max write-buffer number.");
+               setInternal(MAX_WRITE_BUFFER_NUMBER.key(), 
Integer.toString(writeBufferNumber));
+               return this;
+       }
+
+       
//--------------------------------------------------------------------------
+       // The minimum number that will be merged together before writing to 
storage
+       
//--------------------------------------------------------------------------
+
+       private int getMinWriteBufferNumberToMerge() {
+               return 
Integer.parseInt(getInternal(MIN_WRITE_BUFFER_NUMBER_TO_MERGE.key()));
+       }
+
+       public DefaultConfigurableOptionsFactory 
setMinWriteBufferNumberToMerge(int writeBufferNumber) {
+               Preconditions.checkArgument(writeBufferNumber > 0,
+                       "Invalid configuration " + writeBufferNumber + " for 
min write-buffer number to merge.");
+               setInternal(MIN_WRITE_BUFFER_NUMBER_TO_MERGE.key(), 
Integer.toString(writeBufferNumber));
+               return this;
+       }
+
+       
//--------------------------------------------------------------------------
+       // Approximate size of user data packed per block. Note that the block 
size
+       // specified here corresponds to uncompressed data. The actual size of 
the
+       // unit read from disk may be smaller if compression is enabled
+       
//--------------------------------------------------------------------------
+
+       private long getBlockSize() {
+               return MemorySize.parseBytes(getInternal(BLOCK_SIZE.key()));
+       }
+
+       public DefaultConfigurableOptionsFactory setBlockSize(String blockSize) 
{
+               Preconditions.checkArgument(MemorySize.parseBytes(blockSize) > 
0,
+                       "Invalid configuration " + blockSize + " for block 
size.");
+               setInternal(BLOCK_SIZE.key(), blockSize);
+               return this;
+       }
+
+       
//--------------------------------------------------------------------------
+       // The amount of the cache for data blocks in RocksDB
+       
//--------------------------------------------------------------------------
+
+       private long getBlockCacheSize() {
+               return 
MemorySize.parseBytes(getInternal(BLOCK_CACHE_SIZE.key()));
+       }
+
+       public DefaultConfigurableOptionsFactory setBlockCacheSize(String 
blockCacheSize) {
+               
Preconditions.checkArgument(MemorySize.parseBytes(blockCacheSize) > 0,
+                       "Invalid configuration " + blockCacheSize + " for block 
cache size.");
+               setInternal(BLOCK_CACHE_SIZE.key(), blockCacheSize);
+
+               return this;
+       }
+
+       private static final String[] CANDIDATE_CONFIGS = new String[]{
+               // configurable DBOptions
+               MAX_BACKGROUND_THREADS.key(),
+               MAX_OPEN_FILES.key(),
+
+               // configurable ColumnFamilyOptions
+               COMPACTION_STYLE.key(),
+               USE_DYNAMIC_LEVEL_SIZE.key(),
+               TARGET_FILE_SIZE_BASE.key(),
+               MAX_SIZE_LEVEL_BASE.key(),
+               WRITE_BUFFER_SIZE.key(),
+               MAX_WRITE_BUFFER_NUMBER.key(),
+               MIN_WRITE_BUFFER_NUMBER_TO_MERGE.key(),
+               BLOCK_SIZE.key(),
+               BLOCK_CACHE_SIZE.key()
+       };
+
+       private static final Set<String> POSITIVE_INT_CONFIG_SET = new 
HashSet<>(Arrays.asList(
+               MAX_BACKGROUND_THREADS.key(),
+               MAX_WRITE_BUFFER_NUMBER.key(),
+               MIN_WRITE_BUFFER_NUMBER_TO_MERGE.key()
+       ));
+
+       private static final Set<String> SIZE_CONFIG_SET = new 
HashSet<>(Arrays.asList(
+               TARGET_FILE_SIZE_BASE.key(),
+               MAX_SIZE_LEVEL_BASE.key(),
+               WRITE_BUFFER_SIZE.key(),
+               BLOCK_SIZE.key(),
+               BLOCK_CACHE_SIZE.key()
+       ));
+
+       private static final Set<String> BOOLEAN_CONFIG_SET = new 
HashSet<>(Collections.singletonList(
+               USE_DYNAMIC_LEVEL_SIZE.key()
+       ));
+
+       private static final Set<String> COMPACTION_STYLE_SET = 
Arrays.stream(CompactionStyle.values())
+               .map(c -> c.name().toLowerCase()).collect(Collectors.toSet());
+
+       /**
+        * Creates a {@link DefaultConfigurableOptionsFactory} instance from a 
{@link Configuration}.
+        *
+        * <p>If no options within {@link RocksDBConfigurableOptions} has ever 
been configured,
+        * the created OptionsFactory would not override anything defined in 
{@link PredefinedOptions}.
+        *
+        * @param configuration Configuration to be used for the 
ConfigurableOptionsFactory creation
+        * @return A ConfigurableOptionsFactory created from the given 
configuration
+        */
+       @Override
+       public DefaultConfigurableOptionsFactory configure(Configuration 
configuration) {
+               for (String key : CANDIDATE_CONFIGS) {
+                       String newValue = configuration.getString(key, null);
+
+                       if (newValue != null) {
+                               if (checkArgumentValid(key, newValue)) {
+                                       this.configuredOptions.put(key, 
newValue);
+                               }
+                       }
+               }
+               return this;
+       }
+
+       @Override
+       public String toString() {
+               return "DefaultConfigurableOptionsFactory{" +
+                       "configuredOptions=" + configuredOptions +
+                       '}';
+       }
+
+       /**
+        * Helper method to check whether the (key,value) is valid through 
given configuration and returns the formatted value.
+        *
+        * @param key The configuration key which is configurable in {@link 
RocksDBConfigurableOptions}.
+        * @param value The value within given configuration.
+        *
+        * @return whether the given key and value in string format is legal.
+        */
+       private static boolean checkArgumentValid(String key, String value) {
+               if (POSITIVE_INT_CONFIG_SET.contains(key)) {
+
+                       Preconditions.checkArgument(Integer.parseInt(value) > 0,
+                               "Configured value for key: " + key + " must be 
larger than 0.");
+               } else if (SIZE_CONFIG_SET.contains(key)) {
+
+                       
Preconditions.checkArgument(MemorySize.parseBytes(value) > 0,
+                               "Configured size for key" + key + " must be 
larger than 0.");
+               } else if (BOOLEAN_CONFIG_SET.contains(key)) {
+
+                       
Preconditions.checkArgument("true".equalsIgnoreCase(value) || 
"false".equalsIgnoreCase(value),
+                               "The configured boolean value: " + value + " 
for key: " + key + " is illegal.");
+               } else if (key.equals(COMPACTION_STYLE.key())) {
+                       value = value.toLowerCase();
+                       
Preconditions.checkArgument(COMPACTION_STYLE_SET.contains(value),
+                               "Compression type: " + value + " is not 
recognized with legal types: " + String.join(", ", COMPACTION_STYLE_SET));
+               }
+               return true;
+       }
+
+       /**
+        * Sets the configuration with (key, value) if the key is predefined, 
otherwise throws IllegalArgumentException.
+        *
+        * @param key The configuration key, if key is not predefined, throws 
IllegalArgumentException out.
+        * @param value The configuration value.
+        */
+       private void setInternal(String key, String value) {
+               Preconditions.checkArgument(value != null && !value.isEmpty(),
+                       "The configuration value must not be empty.");
+
+               configuredOptions.put(key, value);
+       }
+
+       /**
+        * Returns the value in string format with the given key.
+        *
+        * @param key The configuration-key to query in string format.
+        */
+       private String getInternal(String key) {
+               Preconditions.checkArgument(configuredOptions.containsKey(key),
+                       "The configuration " + key + " has not been 
configured.");
+
+               return configuredOptions.get(key);
+       }
+}
diff --git 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBConfigurableOptions.java
 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBConfigurableOptions.java
new file mode 100644
index 0000000..089f08c
--- /dev/null
+++ 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBConfigurableOptions.java
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.contrib.streaming.state;
+
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.description.Description;
+
+import java.io.Serializable;
+
+import static org.apache.flink.configuration.ConfigOptions.key;
+import static org.apache.flink.configuration.description.LinkElement.link;
+import static org.rocksdb.CompactionStyle.FIFO;
+import static org.rocksdb.CompactionStyle.LEVEL;
+import static org.rocksdb.CompactionStyle.UNIVERSAL;
+
+/**
+ * This class contains the configuration options for the {@link 
DefaultConfigurableOptionsFactory}.
+ *
+ * <p>If nothing specified, RocksDB's options would be configured by {@link 
PredefinedOptions} and user-defined {@link OptionsFactory}.
+ *
+ * <p>If some options has been specifically configured, a corresponding {@link 
DefaultConfigurableOptionsFactory} would be created
+ * and applied on top of {@link PredefinedOptions} except if a user-defined 
{@link OptionsFactory} overrides it.
+ */
+public class RocksDBConfigurableOptions implements Serializable {
+
+       
//--------------------------------------------------------------------------
+       // Provided configurable DBOptions within Flink
+       
//--------------------------------------------------------------------------
+
+       public static final ConfigOption<String> MAX_BACKGROUND_THREADS =
+               key("state.backend.rocksdb.thread.num")
+                       .noDefaultValue()
+                       .withDescription("The maximum number of concurrent 
background flush and compaction jobs. " +
+                               "RocksDB has default configuration as '1'.");
+
+       public static final ConfigOption<String> MAX_OPEN_FILES =
+               key("state.backend.rocksdb.files.open")
+                       .noDefaultValue()
+                       .withDescription("The maximum number of open files that 
can be used by the DB, '-1' means no limit. " +
+                               "RocksDB has default configuration as '5000'.");
+
+       
//--------------------------------------------------------------------------
+       // Provided configurable ColumnFamilyOptions within Flink
+       
//--------------------------------------------------------------------------
+
+       public static final ConfigOption<String> COMPACTION_STYLE =
+               key("state.backend.rocksdb.compaction.style")
+                       .noDefaultValue()
+                       .withDescription(String.format("The specified 
compaction style for DB. Candidate compaction style is %s, %s or %s, " +
+                                       "and RocksDB choose '%s' as default 
style.", LEVEL.name(), FIFO.name(), UNIVERSAL.name(),
+                               LEVEL.name()));
+
+       public static final ConfigOption<String> USE_DYNAMIC_LEVEL_SIZE =
+               key("state.backend.rocksdb.compaction.level.use-dynamic-size")
+                       .noDefaultValue()
+                       .withDescription(Description.builder().text("If true, 
RocksDB will pick target size of each level dynamically. From an empty DB, ")
+                               .text("RocksDB would make last level the base 
level, which means merging L0 data into the last level, ")
+                               .text("until it exceeds 
max_bytes_for_level_base. And then repeat this process for second last level 
and so on. ")
+                               .text("RocksDB has default configuration as 
'false'. ")
+                               .text("For more information, please refer to 
%s",
+                                       
link("https://github.com/facebook/rocksdb/wiki/Leveled-Compaction#level_compaction_dynamic_level_bytes-is-true";,
+                                               "RocksDB's doc."))
+                               .build());
+
+       public static final ConfigOption<String> TARGET_FILE_SIZE_BASE =
+               
key("state.backend.rocksdb.compaction.level.target-file-size-base")
+                       .noDefaultValue()
+                       .withDescription("The target file size for compaction, 
which determines a level-1 file size. " +
+                               "RocksDB has default configuration as '2MB'.");
+
+       public static final ConfigOption<String> MAX_SIZE_LEVEL_BASE =
+               
key("state.backend.rocksdb.compaction.level.max-size-level-base")
+                       .noDefaultValue()
+                       .withDescription("The upper-bound of the total size of 
level base files in bytes. " +
+                               "RocksDB has default configuration as '10MB'.");
+
+       public static final ConfigOption<String> WRITE_BUFFER_SIZE =
+               key("state.backend.rocksdb.writebuffer.size")
+                       .noDefaultValue()
+                       .withDescription("The amount of data built up in memory 
(backed by an unsorted log on disk) " +
+                               "before converting to a sorted on-disk files. 
RocksDB has default writebuffer size as '4MB'.");
+
+       public static final ConfigOption<String> MAX_WRITE_BUFFER_NUMBER =
+               key("state.backend.rocksdb.writebuffer.count")
+                       .noDefaultValue()
+                       .withDescription("Tne maximum number of write buffers 
that are built up in memory. " +
+                               "RocksDB has default configuration as '2'.");
+
+       public static final ConfigOption<String> 
MIN_WRITE_BUFFER_NUMBER_TO_MERGE =
+               key("state.backend.rocksdb.writebuffer.number-to-merge")
+                       .noDefaultValue()
+                       .withDescription("The minimum number of write buffers 
that will be merged together before writing to storage. " +
+                               "RocksDB has default configuration as '1'.");
+
+       public static final ConfigOption<String> BLOCK_SIZE =
+               key("state.backend.rocksdb.block.blocksize")
+                       .noDefaultValue()
+                       .withDescription("The approximate size (in bytes) of 
user data packed per block. " +
+                               "RocksDB has default blocksize as '4KB'.");
+
+       public static final ConfigOption<String> BLOCK_CACHE_SIZE =
+               key("state.backend.rocksdb.block.cache-size")
+                       .noDefaultValue()
+                       .withDescription("The amount of the cache for data 
blocks in RocksDB. " +
+                               "RocksDB has default block-cache size as 
'8MB'.");
+
+}
diff --git 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBOptions.java
 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBOptions.java
index f7a5557..8fa497b 100644
--- 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBOptions.java
+++ 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBOptions.java
@@ -21,6 +21,10 @@ package org.apache.flink.contrib.streaming.state;
 import org.apache.flink.configuration.ConfigOption;
 import org.apache.flink.configuration.ConfigOptions;
 
+import static 
org.apache.flink.contrib.streaming.state.PredefinedOptions.DEFAULT;
+import static 
org.apache.flink.contrib.streaming.state.PredefinedOptions.FLASH_SSD_OPTIMIZED;
+import static 
org.apache.flink.contrib.streaming.state.PredefinedOptions.SPINNING_DISK_OPTIMIZED;
+import static 
org.apache.flink.contrib.streaming.state.PredefinedOptions.SPINNING_DISK_OPTIMIZED_HIGH_MEM;
 import static 
org.apache.flink.contrib.streaming.state.RocksDBStateBackend.PriorityQueueStateType.HEAP;
 import static 
org.apache.flink.contrib.streaming.state.RocksDBStateBackend.PriorityQueueStateType.ROCKSDB;
 
@@ -61,4 +65,25 @@ public class RocksDBOptions {
                .withDescription("This determines if compaction filter to 
cleanup state with TTL is enabled for backend." +
                        "Note: User can still decide in state TTL configuration 
in state descriptor " +
                        "whether the filter is active for particular state or 
not.");
+       /**
+        * The predefined settings for RocksDB DBOptions and 
ColumnFamilyOptions by Flink community.
+        */
+       public static final ConfigOption<String> PREDEFINED_OPTIONS = 
ConfigOptions
+               .key("state.backend.rocksdb.predefined-options")
+               .defaultValue(DEFAULT.name())
+               .withDescription(String.format("The predefined settings for 
RocksDB DBOptions and ColumnFamilyOptions by Flink community. " +
+                       "Current supported candidate predefined-options are %s, 
%s, %s or %s. Note that user customized options and options " +
+                       "from the OptionsFactory are applied on top of these 
predefined ones.",
+                       DEFAULT.name(), SPINNING_DISK_OPTIMIZED.name(), 
SPINNING_DISK_OPTIMIZED_HIGH_MEM.name(), FLASH_SSD_OPTIMIZED.name()));
+
+       /**
+        * The options factory class for RocksDB to create DBOptions and 
ColumnFamilyOptions.
+        */
+       public static final ConfigOption<String> OPTIONS_FACTORY = ConfigOptions
+               .key("state.backend.rocksdb.options-factory")
+               .defaultValue(DefaultConfigurableOptionsFactory.class.getName())
+               .withDescription(String.format("The options factory class for 
RocksDB to create DBOptions and ColumnFamilyOptions. " +
+                               "The default options factory is %s, and it 
would read the configured options which provided in 
'RocksDBConfigurableOptions'.",
+                               
DefaultConfigurableOptionsFactory.class.getName()));
+
 }
diff --git 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
index 5db2e9b..a1bc0c4 100644
--- 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
+++ 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
@@ -45,6 +45,8 @@ import 
org.apache.flink.runtime.state.UncompressedStreamCompressionDecorator;
 import org.apache.flink.runtime.state.filesystem.FsStateBackend;
 import org.apache.flink.runtime.state.ttl.TtlTimeProvider;
 import org.apache.flink.util.AbstractID;
+import org.apache.flink.util.DynamicCodeLoadingException;
+import org.apache.flink.util.FlinkRuntimeException;
 import org.apache.flink.util.Preconditions;
 import org.apache.flink.util.TernaryBoolean;
 
@@ -123,7 +125,8 @@ public class RocksDBStateBackend extends 
AbstractStateBackend implements Configu
        private File[] localRocksDbDirectories;
 
        /** The pre-configured option settings. */
-       private PredefinedOptions predefinedOptions = PredefinedOptions.DEFAULT;
+       @Nullable
+       private PredefinedOptions predefinedOptions;
 
        /** The options factory to create the RocksDB options in the cluster. */
        @Nullable
@@ -338,9 +341,21 @@ public class RocksDBStateBackend extends 
AbstractStateBackend implements Configu
                // configure metric options
                this.defaultMetricOptions = 
RocksDBNativeMetricOptions.fromConfig(config);
 
-               // copy remaining settings
-               this.predefinedOptions = original.predefinedOptions;
-               this.optionsFactory = original.optionsFactory;
+               // configure RocksDB predefined options
+               this.predefinedOptions = original.predefinedOptions == null ?
+                       
PredefinedOptions.valueOf(config.getString(RocksDBOptions.PREDEFINED_OPTIONS)) 
: original.predefinedOptions;
+               LOG.info("Using predefined options: {}.", 
predefinedOptions.name());
+
+               // configure RocksDB options factory
+               try {
+                       this.optionsFactory = configureOptionsFactory(
+                               original.optionsFactory,
+                               
config.getString(RocksDBOptions.OPTIONS_FACTORY),
+                               config,
+                               classLoader);
+               } catch (DynamicCodeLoadingException e) {
+                       throw new FlinkRuntimeException(e);
+               }
        }
 
        // 
------------------------------------------------------------------------
@@ -524,6 +539,53 @@ public class RocksDBStateBackend extends 
AbstractStateBackend implements Configu
                                asyncSnapshots);
        }
 
+       private OptionsFactory configureOptionsFactory(
+                       @Nullable OptionsFactory originalOptionsFactory,
+                       String factoryClassName,
+                       Configuration config,
+                       ClassLoader classLoader) throws 
DynamicCodeLoadingException {
+
+               if (originalOptionsFactory != null) {
+                       if (originalOptionsFactory instanceof 
ConfigurableOptionsFactory) {
+                               originalOptionsFactory = 
((ConfigurableOptionsFactory) originalOptionsFactory).configure(config);
+                       }
+                       LOG.info("Using application-defined options factory: 
{}.", originalOptionsFactory);
+
+                       return originalOptionsFactory;
+               }
+
+               // if using DefaultConfigurableOptionsFactory by default, we 
could avoid reflection to speed up.
+               if 
(factoryClassName.equalsIgnoreCase(DefaultConfigurableOptionsFactory.class.getName()))
 {
+                       DefaultConfigurableOptionsFactory optionsFactory = new 
DefaultConfigurableOptionsFactory();
+                       optionsFactory.configure(config);
+                       LOG.info("Using default options factory: {}.", 
optionsFactory);
+
+                       return optionsFactory;
+               } else {
+                       try {
+                               @SuppressWarnings("rawtypes")
+                               Class<? extends OptionsFactory> clazz =
+                                       Class.forName(factoryClassName, false, 
classLoader)
+                                               
.asSubclass(OptionsFactory.class);
+
+                               OptionsFactory optionsFactory = 
clazz.newInstance();
+                               if (optionsFactory instanceof 
ConfigurableOptionsFactory) {
+                                       optionsFactory = 
((ConfigurableOptionsFactory) optionsFactory).configure(config);
+                               }
+                               LOG.info("Using configured options factory: 
{}.", optionsFactory);
+
+                               return optionsFactory;
+                       } catch (ClassNotFoundException e) {
+                               throw new DynamicCodeLoadingException(
+                                       "Cannot find configured options factory 
class: " + factoryClassName, e);
+                       } catch (ClassCastException | InstantiationException | 
IllegalAccessException e) {
+                               throw new DynamicCodeLoadingException("The 
class configured under '" +
+                                       RocksDBOptions.OPTIONS_FACTORY.key() + 
"' is not a valid options factory (" +
+                                       factoryClassName + ')', e);
+                       }
+               }
+       }
+
        // 
------------------------------------------------------------------------
        //  Parameters
        // 
------------------------------------------------------------------------
@@ -663,9 +725,10 @@ public class RocksDBStateBackend extends 
AbstractStateBackend implements Configu
        /**
         * Sets the predefined options for RocksDB.
         *
-        * <p>If a user-defined options factory is set (via {@link 
#setOptions(OptionsFactory)}),
+        * <p>If user-configured options within {@link 
RocksDBConfigurableOptions} is set (through flink-conf.yaml)
+        * or a user-defined options factory is set (via {@link 
#setOptions(OptionsFactory)}),
         * then the options from the factory are applied on top of the here 
specified
-        * predefined options.
+        * predefined options and customized options.
         *
         * @param options The options to set (must not be null).
         */
@@ -678,12 +741,16 @@ public class RocksDBStateBackend extends 
AbstractStateBackend implements Configu
         * The default options (if nothing was set via {@link 
#setPredefinedOptions(PredefinedOptions)})
         * are {@link PredefinedOptions#DEFAULT}.
         *
-        * <p>If a user-defined  options factory is set (via {@link 
#setOptions(OptionsFactory)}),
-        * then the options from the factory are applied on top of the 
predefined options.
+        * <p>If user-configured options within {@link 
RocksDBConfigurableOptions} is set (through flink-conf.yaml)
+        * of a user-defined options factory is set (via {@link 
#setOptions(OptionsFactory)}),
+        * then the options from the factory are applied on top of the 
predefined and customized options.
         *
         * @return The currently set predefined options for RocksDB.
         */
        public PredefinedOptions getPredefinedOptions() {
+               if (predefinedOptions == null) {
+                       predefinedOptions = PredefinedOptions.DEFAULT;
+               }
                return predefinedOptions;
        }
 
@@ -718,9 +785,9 @@ public class RocksDBStateBackend extends 
AbstractStateBackend implements Configu
         */
        public DBOptions getDbOptions() {
                // initial options from pre-defined profile
-               DBOptions opt = predefinedOptions.createDBOptions();
+               DBOptions opt = getPredefinedOptions().createDBOptions();
 
-               // add user-defined options, if specified
+               // add user-defined options factory, if specified
                if (optionsFactory != null) {
                        opt = optionsFactory.createDBOptions(opt);
                }
@@ -736,7 +803,7 @@ public class RocksDBStateBackend extends 
AbstractStateBackend implements Configu
         */
        public ColumnFamilyOptions getColumnOptions() {
                // initial options from pre-defined profile
-               ColumnFamilyOptions opt = 
predefinedOptions.createColumnOptions();
+               ColumnFamilyOptions opt = 
getPredefinedOptions().createColumnOptions();
 
                // add user-defined options, if specified
                if (optionsFactory != null) {
diff --git 
a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java
 
b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java
index 7329646..8044a3a 100644
--- 
a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java
+++ 
b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java
@@ -22,6 +22,7 @@ import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.TaskInfo;
 import org.apache.flink.api.common.typeutils.base.IntSerializer;
 import org.apache.flink.configuration.CheckpointingOptions;
+import org.apache.flink.configuration.ConfigOption;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.CoreOptions;
 import org.apache.flink.core.fs.FileSystem;
@@ -45,11 +46,14 @@ import org.junit.Assert;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.TemporaryFolder;
+import org.rocksdb.BlockBasedTableConfig;
 import org.rocksdb.ColumnFamilyOptions;
 import org.rocksdb.CompactionStyle;
 import org.rocksdb.DBOptions;
+import org.rocksdb.util.SizeUnit;
 
 import java.io.File;
+import java.io.IOException;
 
 import static org.hamcrest.CoreMatchers.anyOf;
 import static org.hamcrest.MatcherAssert.assertThat;
@@ -383,8 +387,17 @@ public class RocksDBStateBackendConfigTest {
                String checkpointPath = 
tempFolder.newFolder().toURI().toString();
                RocksDBStateBackend rocksDbBackend = new 
RocksDBStateBackend(checkpointPath);
 
+               // verify that we would use PredefinedOptions.DEFAULT by 
default.
                assertEquals(PredefinedOptions.DEFAULT, 
rocksDbBackend.getPredefinedOptions());
 
+               // verify that user could configure predefined options via 
flink-conf.yaml
+               Configuration configuration = new Configuration();
+               configuration.setString(RocksDBOptions.PREDEFINED_OPTIONS, 
PredefinedOptions.FLASH_SSD_OPTIMIZED.name());
+               rocksDbBackend = new RocksDBStateBackend(checkpointPath);
+               rocksDbBackend = rocksDbBackend.configure(configuration, 
getClass().getClassLoader());
+               assertEquals(PredefinedOptions.FLASH_SSD_OPTIMIZED, 
rocksDbBackend.getPredefinedOptions());
+
+               // verify that predefined options could be set programmatically 
and override pre-configured one.
                
rocksDbBackend.setPredefinedOptions(PredefinedOptions.SPINNING_DISK_OPTIMIZED);
                assertEquals(PredefinedOptions.SPINNING_DISK_OPTIMIZED, 
rocksDbBackend.getPredefinedOptions());
 
@@ -394,10 +407,125 @@ public class RocksDBStateBackendConfigTest {
        }
 
        @Test
+       public void testSetConfigurableOptions() throws Exception  {
+               String checkpointPath = 
tempFolder.newFolder().toURI().toString();
+               RocksDBStateBackend rocksDbBackend = new 
RocksDBStateBackend(checkpointPath);
+
+               assertNull(rocksDbBackend.getOptions());
+
+               DefaultConfigurableOptionsFactory customizedOptions = new 
DefaultConfigurableOptionsFactory()
+                       .setMaxBackgroundThreads(4)
+                       .setMaxOpenFiles(-1)
+                       .setCompactionStyle(CompactionStyle.LEVEL)
+                       .setUseDynamicLevelSize(true)
+                       .setTargetFileSizeBase("4MB")
+                       .setMaxSizeLevelBase("128 mb")
+                       .setWriteBufferSize("128 MB")
+                       .setMaxWriteBufferNumber(4)
+                       .setMinWriteBufferNumberToMerge(3)
+                       .setBlockSize("64KB")
+                       .setBlockCacheSize("512mb");
+
+               rocksDbBackend.setOptions(customizedOptions);
+
+               try (DBOptions dbOptions = rocksDbBackend.getDbOptions()) {
+                       assertEquals(-1, dbOptions.maxOpenFiles());
+               }
+
+               try (ColumnFamilyOptions columnOptions = 
rocksDbBackend.getColumnOptions()) {
+                       assertEquals(CompactionStyle.LEVEL, 
columnOptions.compactionStyle());
+                       
assertTrue(columnOptions.levelCompactionDynamicLevelBytes());
+                       assertEquals(4 * SizeUnit.MB, 
columnOptions.targetFileSizeBase());
+                       assertEquals(128 * SizeUnit.MB, 
columnOptions.maxBytesForLevelBase());
+                       assertEquals(4, columnOptions.maxWriteBufferNumber());
+                       assertEquals(3, 
columnOptions.minWriteBufferNumberToMerge());
+
+                       BlockBasedTableConfig tableConfig = 
(BlockBasedTableConfig) columnOptions.tableFormatConfig();
+                       assertEquals(64 * SizeUnit.KB, tableConfig.blockSize());
+                       assertEquals(512 * SizeUnit.MB, 
tableConfig.blockCacheSize());
+               }
+       }
+
+       @Test
+       public void testConfigurableOptionsFromConfig() throws IOException {
+               Configuration configuration = new Configuration();
+               DefaultConfigurableOptionsFactory defaultOptionsFactory = new 
DefaultConfigurableOptionsFactory();
+               
assertTrue(defaultOptionsFactory.configure(configuration).getConfiguredOptions().isEmpty());
+
+               // verify illegal configuration
+               {
+                       
verifyIllegalArgument(RocksDBConfigurableOptions.MAX_BACKGROUND_THREADS, "-1");
+                       
verifyIllegalArgument(RocksDBConfigurableOptions.MAX_WRITE_BUFFER_NUMBER, "-1");
+                       
verifyIllegalArgument(RocksDBConfigurableOptions.MIN_WRITE_BUFFER_NUMBER_TO_MERGE,
 "-1");
+
+                       
verifyIllegalArgument(RocksDBConfigurableOptions.TARGET_FILE_SIZE_BASE, "0KB");
+                       
verifyIllegalArgument(RocksDBConfigurableOptions.MAX_SIZE_LEVEL_BASE, "1BB");
+                       
verifyIllegalArgument(RocksDBConfigurableOptions.WRITE_BUFFER_SIZE, "-1KB");
+                       
verifyIllegalArgument(RocksDBConfigurableOptions.BLOCK_SIZE, "0MB");
+                       
verifyIllegalArgument(RocksDBConfigurableOptions.BLOCK_CACHE_SIZE, "0");
+
+                       
verifyIllegalArgument(RocksDBConfigurableOptions.USE_DYNAMIC_LEVEL_SIZE, "1");
+
+                       
verifyIllegalArgument(RocksDBConfigurableOptions.COMPACTION_STYLE, "LEV");
+               }
+
+               // verify legal configuration
+               {
+                       
configuration.setString(RocksDBConfigurableOptions.COMPACTION_STYLE, "level");
+                       
configuration.setString(RocksDBConfigurableOptions.USE_DYNAMIC_LEVEL_SIZE, 
"TRUE");
+                       
configuration.setString(RocksDBConfigurableOptions.TARGET_FILE_SIZE_BASE, "8 
mb");
+                       
configuration.setString(RocksDBConfigurableOptions.MAX_SIZE_LEVEL_BASE, 
"128MB");
+                       
configuration.setString(RocksDBConfigurableOptions.MAX_BACKGROUND_THREADS, "4");
+                       
configuration.setString(RocksDBConfigurableOptions.MAX_WRITE_BUFFER_NUMBER, 
"4");
+                       
configuration.setString(RocksDBConfigurableOptions.MIN_WRITE_BUFFER_NUMBER_TO_MERGE,
 "2");
+                       
configuration.setString(RocksDBConfigurableOptions.WRITE_BUFFER_SIZE, "64 MB");
+                       
configuration.setString(RocksDBConfigurableOptions.BLOCK_SIZE, "4 kb");
+                       
configuration.setString(RocksDBConfigurableOptions.BLOCK_CACHE_SIZE, "512 mb");
+
+                       DefaultConfigurableOptionsFactory optionsFactory = new 
DefaultConfigurableOptionsFactory();
+                       optionsFactory.configure(configuration);
+                       String checkpointPath = 
tempFolder.newFolder().toURI().toString();
+                       RocksDBStateBackend rocksDbBackend = new 
RocksDBStateBackend(checkpointPath);
+                       rocksDbBackend.setOptions(optionsFactory);
+
+                       try (DBOptions dbOptions = 
rocksDbBackend.getDbOptions()) {
+                               assertEquals(-1, dbOptions.maxOpenFiles());
+                       }
+
+                       try (ColumnFamilyOptions columnOptions = 
rocksDbBackend.getColumnOptions()) {
+                               assertEquals(CompactionStyle.LEVEL, 
columnOptions.compactionStyle());
+                               
assertTrue(columnOptions.levelCompactionDynamicLevelBytes());
+                               assertEquals(8 * SizeUnit.MB, 
columnOptions.targetFileSizeBase());
+                               assertEquals(128 * SizeUnit.MB, 
columnOptions.maxBytesForLevelBase());
+                               assertEquals(4, 
columnOptions.maxWriteBufferNumber());
+                               assertEquals(2, 
columnOptions.minWriteBufferNumberToMerge());
+                               assertEquals(64 * SizeUnit.MB, 
columnOptions.writeBufferSize());
+
+                               BlockBasedTableConfig tableConfig = 
(BlockBasedTableConfig) columnOptions.tableFormatConfig();
+                               assertEquals(4 * SizeUnit.KB, 
tableConfig.blockSize());
+                               assertEquals(512 * SizeUnit.MB, 
tableConfig.blockCacheSize());
+                       }
+               }
+       }
+
+       @Test
        public void testOptionsFactory() throws Exception {
                String checkpointPath = 
tempFolder.newFolder().toURI().toString();
                RocksDBStateBackend rocksDbBackend = new 
RocksDBStateBackend(checkpointPath);
 
+               // verify that user-defined options factory could be configured 
via flink-conf.yaml
+               Configuration config = new Configuration();
+               config.setString(RocksDBOptions.OPTIONS_FACTORY.key(), 
TestOptionsFactory.class.getName());
+               config.setInteger(TestOptionsFactory.BACKGROUND_JOBS_OPTION, 4);
+
+               rocksDbBackend = rocksDbBackend.configure(config, 
getClass().getClassLoader());
+
+               assertTrue(rocksDbBackend.getOptions() instanceof 
TestOptionsFactory);
+               try (DBOptions dbOptions = rocksDbBackend.getDbOptions()) {
+                       assertEquals(4, dbOptions.maxBackgroundJobs());
+               }
+
+               // verify that user-defined options factory could be set 
programmatically and override pre-configured one.
                rocksDbBackend.setOptions(new OptionsFactory() {
                        @Override
                        public DBOptions createDBOptions(DBOptions 
currentOptions) {
@@ -543,4 +671,45 @@ public class RocksDBStateBackendConfigTest {
 
                return env;
        }
+
+       private void verifyIllegalArgument(
+                       ConfigOption<String> configOption,
+                       String configValue) {
+               Configuration configuration = new Configuration();
+               configuration.setString(configOption, configValue);
+
+               DefaultConfigurableOptionsFactory optionsFactory = new 
DefaultConfigurableOptionsFactory();
+               try {
+                       optionsFactory.configure(configuration);
+                       fail("Not throwing expected IllegalArgumentException.");
+               } catch (IllegalArgumentException e) {
+                       // ignored
+               }
+       }
+
+       /**
+        * An implementation of options factory for testing.
+        */
+       public static class TestOptionsFactory implements 
ConfigurableOptionsFactory {
+               public static final String BACKGROUND_JOBS_OPTION = 
"my.custom.rocksdb.backgroundJobs";
+
+               private static final int DEFAULT_BACKGROUND_JOBS = 2;
+               private int backgroundJobs = DEFAULT_BACKGROUND_JOBS;
+
+               @Override
+               public DBOptions createDBOptions(DBOptions currentOptions) {
+                       return 
currentOptions.setMaxBackgroundJobs(backgroundJobs);
+               }
+
+               @Override
+               public ColumnFamilyOptions 
createColumnOptions(ColumnFamilyOptions currentOptions) {
+                       return 
currentOptions.setCompactionStyle(CompactionStyle.UNIVERSAL);
+               }
+
+               @Override
+               public OptionsFactory configure(Configuration configuration) {
+                       this.backgroundJobs = 
configuration.getInteger(BACKGROUND_JOBS_OPTION, DEFAULT_BACKGROUND_JOBS);
+                       return this;
+               }
+       }
 }

Reply via email to