This is an automated email from the ASF dual-hosted git repository.

tangyun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 2d2d92e  [FLINK-24046][state] Refactor the EmbeddedRocksDBStateBackend 
configuration logic
2d2d92e is described below

commit 2d2d92e9812851091d7cee9c9c1764a0f7b4fdc8
Author: Zakelly <[email protected]>
AuthorDate: Fri Nov 26 17:34:11 2021 +0800

    [FLINK-24046][state] Refactor the EmbeddedRocksDBStateBackend configuration 
logic
    
    This closes #17874.
---
 docs/content.zh/docs/ops/state/state_backends.md   |  12 +-
 docs/content/docs/ops/state/state_backends.md      |  10 +-
 .../generated/expert_rocksdb_section.html          |   4 +-
 .../rocksdb_configurable_configuration.html        |  66 +++----
 .../generated/rocksdb_configuration.html           |   4 +-
 flink-python/pyflink/datastream/state_backend.py   |   6 +-
 .../pyflink/datastream/tests/test_state_backend.py |   5 +-
 flink-python/pyflink/pyflink_gateway_server.py     |   2 +
 .../flink-statebackend-rocksdb/pom.xml             |   2 +
 .../state/DefaultConfigurableOptionsFactory.java   |   7 +
 .../state/EmbeddedRocksDBStateBackend.java         | 123 ++++++++----
 .../contrib/streaming/state/PredefinedOptions.java | 213 ++++++++-------------
 .../state/RocksDBConfigurableOptions.java          | 159 +++++++++++----
 .../contrib/streaming/state/RocksDBOptions.java    |   9 +-
 .../streaming/state/RocksDBResourceContainer.java  | 149 +++++++++++++-
 .../contrib/streaming/state/RocksDBResource.java   |  22 ++-
 .../state/RocksDBStateBackendConfigTest.java       | 105 +++-------
 17 files changed, 538 insertions(+), 360 deletions(-)

diff --git a/docs/content.zh/docs/ops/state/state_backends.md 
b/docs/content.zh/docs/ops/state/state_backends.md
index e738b8f..0cf29c8 100644
--- a/docs/content.zh/docs/ops/state/state_backends.md
+++ b/docs/content.zh/docs/ops/state/state_backends.md
@@ -241,6 +241,11 @@ Flink还提供了两个参数来控制*写路径*(MemTable)和*读路径*(
 
 该选项的默认值是 `DEFAULT` ,对应 `PredefinedOptions.DEFAULT` 。
 
+#### 从 flink-conf.yaml 中读取列族选项
+
+RocksDB State Backend 会将 [这里定义]({{< ref "docs/deployment/config" 
>}}#advanced-rocksdb-state-backends-options) 的所有配置项全部加载。
+因此您可以简单的通过关闭 RocksDB 使用托管内存的功能并将需要的设置选项加入配置文件来配置底层的列族选项。
+
 ### 通过 RocksDBOptionsFactory 配置 RocksDB 选项
 
 <span class="label label-info">注意</span> 在引入 [RocksDB 
使用托管内存](#memory-management) 功能后,此机制应限于在*专家调优*或*故障处理*中使用。
@@ -259,13 +264,6 @@ Flink还提供了两个参数来控制*写路径*(MemTable)和*读路径*(
 而不是从JVM分配内存。分配给 RocksDB 的任何内存都必须被考虑在内,通常需要将这部分内存从任务管理器(`TaskManager`)的JVM堆中减去。
 不这样做可能会导致JVM进程由于分配的内存超过申请值而被 YARN 等资源管理框架终止。
 
-**从 flink-conf.yaml 中读取列族选项**
-
-一个实现了 `ConfigurableRocksDBOptionsFactory` 接口的 `RocksDBOptionsFactory` 
可以直接从配置文件(`flink-conf.yaml`)中读取设定。
-
-`state.backend.rocksdb.options-factory` 的默认配置是 
`org.apache.flink.contrib.streaming.state.DefaultConfigurableOptionsFactory`,它默认会将
 [这里定义]({{< ref "docs/deployment/config" 
>}}#advanced-rocksdb-state-backends-options) 的所有配置项全部加载。
-因此您可以简单的通过关闭 RocksDB 使用托管内存的功能并将需要的设置选项加入配置文件来配置底层的列族选项。
-
 下面是自定义 `ConfigurableRocksDBOptionsFactory` 的一个示例 (开发完成后,请将您的实现类全名设置到 
`state.backend.rocksdb.options-factory`).
 
 ```java
diff --git a/docs/content/docs/ops/state/state_backends.md 
b/docs/content/docs/ops/state/state_backends.md
index ee9861c..d3bb157 100644
--- a/docs/content/docs/ops/state/state_backends.md
+++ b/docs/content/docs/ops/state/state_backends.md
@@ -264,6 +264,10 @@ The default value for this option is `DEFAULT` which 
translates to `PredefinedOp
 
 Predefined options set programmatically would override the ones configured via 
`flink-conf.yaml`.
 
+#### Reading Column Family Options from flink-conf.yaml
+
+RocksDB State Backend picks up all config options [defined here]({{< ref 
"docs/deployment/config" >}}#advanced-rocksdb-state-backends-options). Hence, 
you can configure low-level Column Family options simply by turning off managed 
memory for RocksDB and putting the relevant entries in the configuration.
+
 #### Passing Options Factory to RocksDB
 
 To manually control RocksDB's options, you need to configure an 
`RocksDBOptionsFactory`. This mechanism gives you fine-grained control over the 
settings of the Column Families, for example memory use, thread, compaction 
settings, etc. There is currently one Column Family per each state in each 
operator.
@@ -282,12 +286,6 @@ and not from the JVM. Any memory you assign to RocksDB 
will have to be accounted
 of the TaskManagers by the same amount. Not doing that may result in YARN/etc 
terminating the JVM processes for
 allocating more memory than configured.
 
-**Reading Column Family Options from flink-conf.yaml**
-
-When a `RocksDBOptionsFactory` implements the 
`ConfigurableRocksDBOptionsFactory` interface, it can directly read settings 
from the configuration (`flink-conf.yaml`).
-
-The default value for `state.backend.rocksdb.options-factory` is in fact 
`org.apache.flink.contrib.streaming.state.DefaultConfigurableOptionsFactory` 
which picks up all config options [defined here]({{< ref 
"docs/deployment/config" >}}#advanced-rocksdb-state-backends-options) by 
default. Hence, you can configure low-level Column Family options simply by 
turning off managed memory for RocksDB and putting the relevant entries in the 
configuration.
-
 Below is an example how to define a custom ConfigurableOptionsFactory (set 
class name under `state.backend.rocksdb.options-factory`).
 
 ```java
diff --git a/docs/layouts/shortcodes/generated/expert_rocksdb_section.html 
b/docs/layouts/shortcodes/generated/expert_rocksdb_section.html
index ce02013..aeece4c 100644
--- a/docs/layouts/shortcodes/generated/expert_rocksdb_section.html
+++ b/docs/layouts/shortcodes/generated/expert_rocksdb_section.html
@@ -22,9 +22,9 @@
         </tr>
         <tr>
             <td><h5>state.backend.rocksdb.options-factory</h5></td>
-            <td style="word-wrap: 
break-word;">"org.apache.flink.contrib.streaming.state.DefaultConfigurableOptionsFactory"</td>
+            <td style="word-wrap: break-word;">(none)</td>
             <td>String</td>
-            <td>The options factory class for RocksDB to create DBOptions and 
ColumnFamilyOptions. The default options factory is 
org.apache.flink.contrib.streaming.state.DefaultConfigurableOptionsFactory, and 
it would read the configured options which provided in 
'RocksDBConfigurableOptions'.</td>
+            <td>The options factory class for users to add customized options 
in DBOptions and ColumnFamilyOptions for RocksDB. If set, the RocksDB state 
backend will load the class and apply configs to DBOptions and 
ColumnFamilyOptions after loading ones from 'RocksDBConfigurableOptions' and 
pre-defined options.</td>
         </tr>
         <tr>
             <td><h5>state.backend.rocksdb.predefined-options</h5></td>
diff --git 
a/docs/layouts/shortcodes/generated/rocksdb_configurable_configuration.html 
b/docs/layouts/shortcodes/generated/rocksdb_configurable_configuration.html
index 56b2f9c..a8a974b 100644
--- a/docs/layouts/shortcodes/generated/rocksdb_configurable_configuration.html
+++ b/docs/layouts/shortcodes/generated/rocksdb_configurable_configuration.html
@@ -10,99 +10,99 @@
     <tbody>
         <tr>
             <td><h5>state.backend.rocksdb.block.blocksize</h5></td>
-            <td style="word-wrap: break-word;">(none)</td>
+            <td style="word-wrap: break-word;">4 kb</td>
             <td>MemorySize</td>
-            <td>The approximate size (in bytes) of user data packed per block. 
RocksDB has default blocksize as '4KB'.</td>
+            <td>The approximate size (in bytes) of user data packed per block. 
The default blocksize is '4KB'.</td>
         </tr>
         <tr>
             <td><h5>state.backend.rocksdb.block.cache-size</h5></td>
-            <td style="word-wrap: break-word;">(none)</td>
+            <td style="word-wrap: break-word;">8 mb</td>
             <td>MemorySize</td>
-            <td>The amount of the cache for data blocks in RocksDB. RocksDB 
has default block-cache size as '8MB'.</td>
+            <td>The amount of the cache for data blocks in RocksDB. The 
default block-cache size is '8MB'.</td>
         </tr>
         <tr>
             <td><h5>state.backend.rocksdb.block.metadata-blocksize</h5></td>
-            <td style="word-wrap: break-word;">(none)</td>
+            <td style="word-wrap: break-word;">4 kb</td>
             <td>MemorySize</td>
-            <td>Approximate size of partitioned metadata packed per block. 
Currently applied to indexes block when partitioned index/filters option is 
enabled. RocksDB has default metadata blocksize as '4KB'.</td>
+            <td>Approximate size of partitioned metadata packed per block. 
Currently applied to indexes block when partitioned index/filters option is 
enabled. The default blocksize is '4KB'.</td>
         </tr>
         <tr>
             <td><h5>state.backend.rocksdb.bloom-filter.bits-per-key</h5></td>
             <td style="word-wrap: break-word;">10.0</td>
             <td>Double</td>
-            <td>Bits per key that bloom filter will use, this only take effect 
when bloom filter is used.</td>
+            <td>Bits per key that bloom filter will use, this only take effect 
when bloom filter is used. The default value is 10.0.</td>
         </tr>
         <tr>
             
<td><h5>state.backend.rocksdb.bloom-filter.block-based-mode</h5></td>
             <td style="word-wrap: break-word;">false</td>
             <td>Boolean</td>
-            <td>If true, RocksDB will use block-based filter instead of full 
filter, this only take effect when bloom filter is used.</td>
+            <td>If true, RocksDB will use block-based filter instead of full 
filter, this only take effect when bloom filter is used. The default value is 
'false'.</td>
         </tr>
         <tr>
             
<td><h5>state.backend.rocksdb.compaction.level.max-size-level-base</h5></td>
-            <td style="word-wrap: break-word;">(none)</td>
+            <td style="word-wrap: break-word;">256 mb</td>
             <td>MemorySize</td>
-            <td>The upper-bound of the total size of level base files in 
bytes. RocksDB has default configuration as '256MB'.</td>
+            <td>The upper-bound of the total size of level base files in 
bytes. The default value is '256MB'.</td>
         </tr>
         <tr>
             
<td><h5>state.backend.rocksdb.compaction.level.target-file-size-base</h5></td>
-            <td style="word-wrap: break-word;">(none)</td>
+            <td style="word-wrap: break-word;">64 mb</td>
             <td>MemorySize</td>
-            <td>The target file size for compaction, which determines a 
level-1 file size. RocksDB has default configuration as '64MB'.</td>
+            <td>The target file size for compaction, which determines a 
level-1 file size. The default value is '64MB'.</td>
         </tr>
         <tr>
             
<td><h5>state.backend.rocksdb.compaction.level.use-dynamic-size</h5></td>
-            <td style="word-wrap: break-word;">(none)</td>
+            <td style="word-wrap: break-word;">false</td>
             <td>Boolean</td>
-            <td>If true, RocksDB will pick target size of each level 
dynamically. From an empty DB, RocksDB would make last level the base level, 
which means merging L0 data into the last level, until it exceeds 
max_bytes_for_level_base. And then repeat this process for second last level 
and so on. RocksDB has default configuration as 'false'. For more information, 
please refer to <a 
href="https://github.com/facebook/rocksdb/wiki/Leveled-Compaction#level_compaction_dynamic_level_bytes-is
 [...]
+            <td>If true, RocksDB will pick target size of each level 
dynamically. From an empty DB, RocksDB would make last level the base level, 
which means merging L0 data into the last level, until it exceeds 
max_bytes_for_level_base. And then repeat this process for second last level 
and so on. The default value is 'false'. For more information, please refer to 
<a 
href="https://github.com/facebook/rocksdb/wiki/Leveled-Compaction#level_compaction_dynamic_level_bytes-is-true";>RocksDB's
 [...]
         </tr>
         <tr>
             <td><h5>state.backend.rocksdb.compaction.style</h5></td>
-            <td style="word-wrap: break-word;">(none)</td>
+            <td style="word-wrap: break-word;">LEVEL</td>
             <td><p>Enum</p></td>
-            <td>The specified compaction style for DB. Candidate compaction 
style is LEVEL, FIFO, UNIVERSAL or NONE, and RocksDB choose 'LEVEL' as default 
style.<br /><br />Possible 
values:<ul><li>"LEVEL"</li><li>"UNIVERSAL"</li><li>"FIFO"</li><li>"NONE"</li></ul></td>
+            <td>The specified compaction style for DB. Candidate compaction 
style is LEVEL, FIFO, UNIVERSAL or NONE, and Flink chooses 'LEVEL' as default 
style.<br /><br />Possible 
values:<ul><li>"LEVEL"</li><li>"UNIVERSAL"</li><li>"FIFO"</li><li>"NONE"</li></ul></td>
         </tr>
         <tr>
             <td><h5>state.backend.rocksdb.files.open</h5></td>
-            <td style="word-wrap: break-word;">(none)</td>
+            <td style="word-wrap: break-word;">-1</td>
             <td>Integer</td>
-            <td>The maximum number of open files (per stateful operator) that 
can be used by the DB, '-1' means no limit. RocksDB has default configuration 
as '-1'.</td>
+            <td>The maximum number of open files (per stateful operator) that 
can be used by the DB, '-1' means no limit. The default value is '-1'.</td>
         </tr>
         <tr>
             <td><h5>state.backend.rocksdb.log.dir</h5></td>
             <td style="word-wrap: break-word;">(none)</td>
             <td>String</td>
-            <td>The directory for RocksDB's information logging files. If 
empty (RocksDB default setting), log files will be in the same directory as 
data files. If non-empty, this directory will be used and the data directory's 
absolute path will be used as the prefix of the log file name.</td>
+            <td>The directory for RocksDB's information logging files. If 
empty (Flink default setting), log files will be in the same directory as data 
files. If non-empty, this directory will be used and the data directory's 
absolute path will be used as the prefix of the log file name.</td>
         </tr>
         <tr>
             <td><h5>state.backend.rocksdb.log.file-num</h5></td>
-            <td style="word-wrap: break-word;">(none)</td>
+            <td style="word-wrap: break-word;">1000</td>
             <td>Integer</td>
-            <td>The maximum number of files RocksDB should keep for 
information logging (RocksDB default setting: 1000).</td>
+            <td>The maximum number of files RocksDB should keep for 
information logging (Default setting: 1000).</td>
         </tr>
         <tr>
             <td><h5>state.backend.rocksdb.log.level</h5></td>
-            <td style="word-wrap: break-word;">(none)</td>
+            <td style="word-wrap: break-word;">HEADER_LEVEL</td>
             <td><p>Enum</p></td>
             <td>The specified information logging level for RocksDB. If unset, 
Flink will use <code class="highlighter-rouge">HEADER_LEVEL</code>.<br />Note: 
RocksDB info logs will not be written to the TaskManager logs and there is no 
rolling strategy, unless you configure <code 
class="highlighter-rouge">state.backend.rocksdb.log.dir</code>, <code 
class="highlighter-rouge">state.backend.rocksdb.log.max-file-size</code>, and 
<code class="highlighter-rouge">state.backend.rocksdb.log.file- [...]
         </tr>
         <tr>
             <td><h5>state.backend.rocksdb.log.max-file-size</h5></td>
-            <td style="word-wrap: break-word;">(none)</td>
+            <td style="word-wrap: break-word;">0 bytes</td>
             <td>MemorySize</td>
-            <td>The maximum size of RocksDB's file used for information 
logging. If the log files becomes larger than this, a new file will be created. 
If 0 (RocksDB default setting), all logs will be written to one log file.</td>
+            <td>The maximum size of RocksDB's file used for information 
logging. If the log files becomes larger than this, a new file will be created. 
If 0 (Flink default setting), all logs will be written to one log file.</td>
         </tr>
         <tr>
             <td><h5>state.backend.rocksdb.thread.num</h5></td>
-            <td style="word-wrap: break-word;">(none)</td>
+            <td style="word-wrap: break-word;">2</td>
             <td>Integer</td>
-            <td>The maximum number of concurrent background flush and 
compaction jobs (per stateful operator). RocksDB has default configuration as 
'2'.</td>
+            <td>The maximum number of concurrent background flush and 
compaction jobs (per stateful operator). The default value is '2'.</td>
         </tr>
         <tr>
             <td><h5>state.backend.rocksdb.use-bloom-filter</h5></td>
             <td style="word-wrap: break-word;">false</td>
             <td>Boolean</td>
-            <td>If true, every newly created SST file will contain a Bloom 
filter. RocksDB disables it by default.</td>
+            <td>If true, every newly created SST file will contain a Bloom 
filter. It is disabled by default.</td>
         </tr>
         <tr>
             <td><h5>state.backend.rocksdb.write-batch-size</h5></td>
@@ -112,21 +112,21 @@
         </tr>
         <tr>
             <td><h5>state.backend.rocksdb.writebuffer.count</h5></td>
-            <td style="word-wrap: break-word;">(none)</td>
+            <td style="word-wrap: break-word;">2</td>
             <td>Integer</td>
-            <td>The maximum number of write buffers that are built up in 
memory. RocksDB has default configuration as '2'.</td>
+            <td>The maximum number of write buffers that are built up in 
memory. The default value is '2'.</td>
         </tr>
         <tr>
             <td><h5>state.backend.rocksdb.writebuffer.number-to-merge</h5></td>
-            <td style="word-wrap: break-word;">(none)</td>
+            <td style="word-wrap: break-word;">1</td>
             <td>Integer</td>
-            <td>The minimum number of write buffers that will be merged 
together before writing to storage. RocksDB has default configuration as 
'1'.</td>
+            <td>The minimum number of write buffers that will be merged 
together before writing to storage. The default value is '1'.</td>
         </tr>
         <tr>
             <td><h5>state.backend.rocksdb.writebuffer.size</h5></td>
-            <td style="word-wrap: break-word;">(none)</td>
+            <td style="word-wrap: break-word;">64 mb</td>
             <td>MemorySize</td>
-            <td>The amount of data built up in memory (backed by an unsorted 
log on disk) before converting to a sorted on-disk files. RocksDB has default 
writebuffer size as '64MB'.</td>
+            <td>The amount of data built up in memory (backed by an unsorted 
log on disk) before converting to a sorted on-disk files. The default 
writebuffer size is '64MB'.</td>
         </tr>
     </tbody>
 </table>
diff --git a/docs/layouts/shortcodes/generated/rocksdb_configuration.html 
b/docs/layouts/shortcodes/generated/rocksdb_configuration.html
index 0bc696c..1f5c817 100644
--- a/docs/layouts/shortcodes/generated/rocksdb_configuration.html
+++ b/docs/layouts/shortcodes/generated/rocksdb_configuration.html
@@ -52,9 +52,9 @@
         </tr>
         <tr>
             <td><h5>state.backend.rocksdb.options-factory</h5></td>
-            <td style="word-wrap: 
break-word;">"org.apache.flink.contrib.streaming.state.DefaultConfigurableOptionsFactory"</td>
+            <td style="word-wrap: break-word;">(none)</td>
             <td>String</td>
-            <td>The options factory class for RocksDB to create DBOptions and 
ColumnFamilyOptions. The default options factory is 
org.apache.flink.contrib.streaming.state.DefaultConfigurableOptionsFactory, and 
it would read the configured options which provided in 
'RocksDBConfigurableOptions'.</td>
+            <td>The options factory class for users to add customized options 
in DBOptions and ColumnFamilyOptions for RocksDB. If set, the RocksDB state 
backend will load the class and apply configs to DBOptions and 
ColumnFamilyOptions after loading ones from 'RocksDBConfigurableOptions' and 
pre-defined options.</td>
         </tr>
         <tr>
             <td><h5>state.backend.rocksdb.predefined-options</h5></td>
diff --git a/flink-python/pyflink/datastream/state_backend.py 
b/flink-python/pyflink/datastream/state_backend.py
index d61523f..6b04308 100644
--- a/flink-python/pyflink/datastream/state_backend.py
+++ b/flink-python/pyflink/datastream/state_backend.py
@@ -319,9 +319,9 @@ class EmbeddedRocksDBStateBackend(StateBackend):
         they must be specified through a factory.
 
         The options created by the factory here are applied on top of the 
pre-defined
-        options profile selected via :func:`set_predefined_options`.
-        If the pre-defined options profile is the default 
(:data:`PredefinedOptions.DEFAULT`),
-        then the factory fully controls the RocksDB options.
+        options profile selected via :func:`set_predefined_options`  and 
user-configured
+        options from configuration set through flink-conf.yaml with keys in
+        ``RocksDBConfigurableOptions``.
 
         :param options_factory_class_name: The fully-qualified class name of 
the options
                                            factory in Java that lazily creates 
the RocksDB options.
diff --git a/flink-python/pyflink/datastream/tests/test_state_backend.py 
b/flink-python/pyflink/datastream/tests/test_state_backend.py
index 2b6985e..951b8a9 100644
--- a/flink-python/pyflink/datastream/tests/test_state_backend.py
+++ b/flink-python/pyflink/datastream/tests/test_state_backend.py
@@ -159,11 +159,12 @@ class EmbeddedRocksDBStateBackendTests(PyFlinkTestCase):
         self.assertIsNone(state_backend.get_options())
 
         state_backend.set_options(
-            
"org.apache.flink.contrib.streaming.state.DefaultConfigurableOptionsFactory")
+            "org.apache.flink.contrib.streaming.state."
+            "RocksDBStateBackendConfigTest$TestOptionsFactory")
 
         self.assertEqual(state_backend.get_options(),
                          "org.apache.flink.contrib.streaming.state."
-                         "DefaultConfigurableOptionsFactory")
+                         "RocksDBStateBackendConfigTest$TestOptionsFactory")
 
     def test_get_set_number_of_transfer_threads(self):
 
diff --git a/flink-python/pyflink/pyflink_gateway_server.py 
b/flink-python/pyflink/pyflink_gateway_server.py
index a551f83..7493d53 100644
--- a/flink-python/pyflink/pyflink_gateway_server.py
+++ b/flink-python/pyflink/pyflink_gateway_server.py
@@ -256,6 +256,8 @@ def construct_test_classpath():
         "flink-formats/flink-json/target/flink-json*.jar",
         "flink-python/target/artifacts/testDataStream.jar",
         "flink-python/target/flink-python*-tests.jar",
+        ("flink-state-backends/flink-statebackend-rocksdb/target/"
+         "flink-statebackend-rocksdb*tests.jar"),
     ]
     test_jars = []
     flink_source_root = _find_flink_source_root()
diff --git a/flink-state-backends/flink-statebackend-rocksdb/pom.xml 
b/flink-state-backends/flink-statebackend-rocksdb/pom.xml
index 43d5751..07f8ba4 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/pom.xml
+++ b/flink-state-backends/flink-statebackend-rocksdb/pom.xml
@@ -98,6 +98,8 @@ under the License.
                                                        <includes>
                                                                
<include>**/org/apache/flink/contrib/streaming/state/RocksDBTestUtils*</include>
                                                                
<include>**/org/apache/flink/contrib/streaming/state/EmbeddedRocksDBStateBackendTest*</include>
+                                                               <!-- Exporting 
RocksDBStateBackendConfigTest$TestOptionsFactory for pyflink tests -->
+                                                               
<include>**/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest*</include>
                                                                
<include>**/org/apache/flink/contrib/streaming/state/benchmark/StateBackendBenchmarkUtils*</include>
                                                                
<include>META-INF/LICENSE</include>
                                                                
<include>META-INF/NOTICE</include>
diff --git 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/DefaultConfigurableOptionsFactory.java
 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/DefaultConfigurableOptionsFactory.java
index 7308720..0f1f708 100644
--- 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/DefaultConfigurableOptionsFactory.java
+++ 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/DefaultConfigurableOptionsFactory.java
@@ -65,7 +65,14 @@ import static 
org.apache.flink.contrib.streaming.state.RocksDBConfigurableOption
  * An implementation of {@link ConfigurableRocksDBOptionsFactory} using 
options provided by {@link
  * RocksDBConfigurableOptions}. It acts as the default options factory within 
{@link
  * EmbeddedRocksDBStateBackend} if the user did not define a {@link 
RocksDBOptionsFactory}.
+ *
+ * <p>After FLINK-24046, we refactor the config procedure for RocksDB. User 
could use {@link
+ * ConfigurableRocksDBOptionsFactory} to apply some customized options. 
Besides this, we load the
+ * configurable options in {@link RocksDBResourceContainer} instead of {@link
+ * DefaultConfigurableOptionsFactory}. It is ignored for general case and 
still kept for backward
+ * compatibility if user still leverage this class. Thus, we mark this factory 
Deprecated.
  */
+@Deprecated
 public class DefaultConfigurableOptionsFactory implements 
ConfigurableRocksDBOptionsFactory {
 
     private static final long serialVersionUID = 1L;
diff --git 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/EmbeddedRocksDBStateBackend.java
 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/EmbeddedRocksDBStateBackend.java
index ba71736..bc1da60 100644
--- 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/EmbeddedRocksDBStateBackend.java
+++ 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/EmbeddedRocksDBStateBackend.java
@@ -24,6 +24,8 @@ import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.configuration.CheckpointingOptions;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.DescribedEnum;
 import org.apache.flink.configuration.IllegalConfigurationException;
 import org.apache.flink.configuration.ReadableConfig;
@@ -69,6 +71,7 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.List;
+import java.util.Optional;
 import java.util.Random;
 import java.util.UUID;
 
@@ -122,6 +125,9 @@ public class EmbeddedRocksDBStateBackend extends 
AbstractManagedMemoryStateBacke
     /** The pre-configured option settings. */
     @Nullable private PredefinedOptions predefinedOptions;
 
+    /** The configurable options. */
+    @Nullable private ReadableConfig configurableOptions;
+
     /** The options factory to create the RocksDB options in the cluster. */
     @Nullable private RocksDBOptionsFactory rocksDbOptionsFactory;
 
@@ -256,6 +262,9 @@ public class EmbeddedRocksDBStateBackend extends 
AbstractManagedMemoryStateBacke
                         : original.predefinedOptions;
         LOG.info("Using predefined options: {}.", predefinedOptions.name());
 
+        // configurable options
+        this.configurableOptions = 
mergeConfigurableOptions(original.configurableOptions, config);
+
         // configure RocksDB options factory
         try {
             rocksDbOptionsFactory =
@@ -484,11 +493,13 @@ public class EmbeddedRocksDBStateBackend extends 
AbstractManagedMemoryStateBacke
 
     private RocksDBOptionsFactory configureOptionsFactory(
             @Nullable RocksDBOptionsFactory originalOptionsFactory,
-            String factoryClassName,
+            @Nullable String factoryClassName,
             ReadableConfig config,
             ClassLoader classLoader)
             throws DynamicCodeLoadingException {
 
+        RocksDBOptionsFactory optionsFactory = null;
+
         if (originalOptionsFactory != null) {
             if (originalOptionsFactory instanceof 
ConfigurableRocksDBOptionsFactory) {
                 originalOptionsFactory =
@@ -497,45 +508,58 @@ public class EmbeddedRocksDBStateBackend extends 
AbstractManagedMemoryStateBacke
             }
             LOG.info("Using application-defined options factory: {}.", 
originalOptionsFactory);
 
-            return originalOptionsFactory;
-        }
-
-        // if using DefaultConfigurableOptionsFactory by default, we could 
avoid reflection to speed
-        // up.
-        if 
(factoryClassName.equalsIgnoreCase(DefaultConfigurableOptionsFactory.class.getName()))
 {
-            DefaultConfigurableOptionsFactory optionsFactory =
-                    new DefaultConfigurableOptionsFactory();
-            optionsFactory.configure(config);
-            LOG.info("Using default options factory: {}.", optionsFactory);
-
-            return optionsFactory;
-        } else {
-            try {
-                Class<? extends RocksDBOptionsFactory> clazz =
-                        Class.forName(factoryClassName, false, classLoader)
-                                .asSubclass(RocksDBOptionsFactory.class);
-
-                RocksDBOptionsFactory optionsFactory = clazz.newInstance();
-                if (optionsFactory instanceof 
ConfigurableRocksDBOptionsFactory) {
-                    optionsFactory =
-                            ((ConfigurableRocksDBOptionsFactory) 
optionsFactory).configure(config);
+            optionsFactory = originalOptionsFactory;
+        } else if (factoryClassName != null) {
+            // Do nothing if user does not define any factory class.
+            if (factoryClassName.equalsIgnoreCase(
+                    DefaultConfigurableOptionsFactory.class.getName())) {
+                // From FLINK-24046, we deprecate the 
DefaultConfigurableOptionsFactory.
+                LOG.warn(
+                        "{} is deprecated. Please remove this value from the 
configuration."
+                                + "It is safe to do so since the configurable 
options will be loaded "
+                                + "in other place. For more information, 
please refer to FLINK-24046.",
+                        DefaultConfigurableOptionsFactory.class.getName());
+            } else {
+                try {
+                    Class<? extends RocksDBOptionsFactory> clazz =
+                            Class.forName(factoryClassName, false, classLoader)
+                                    .asSubclass(RocksDBOptionsFactory.class);
+
+                    optionsFactory = clazz.newInstance();
+                    if (optionsFactory instanceof 
ConfigurableRocksDBOptionsFactory) {
+                        optionsFactory =
+                                ((ConfigurableRocksDBOptionsFactory) 
optionsFactory)
+                                        .configure(config);
+                    }
+                    LOG.info("Using configured options factory: {}.", 
optionsFactory);
+
+                } catch (ClassNotFoundException e) {
+                    throw new DynamicCodeLoadingException(
+                            "Cannot find configured options factory class: " + 
factoryClassName, e);
+                } catch (ClassCastException | InstantiationException | 
IllegalAccessException e) {
+                    throw new DynamicCodeLoadingException(
+                            "The class configured under '"
+                                    + RocksDBOptions.OPTIONS_FACTORY.key()
+                                    + "' is not a valid options factory ("
+                                    + factoryClassName
+                                    + ')',
+                            e);
                 }
-                LOG.info("Using configured options factory: {}.", 
optionsFactory);
-
-                return optionsFactory;
-            } catch (ClassNotFoundException e) {
-                throw new DynamicCodeLoadingException(
-                        "Cannot find configured options factory class: " + 
factoryClassName, e);
-            } catch (ClassCastException | InstantiationException | 
IllegalAccessException e) {
-                throw new DynamicCodeLoadingException(
-                        "The class configured under '"
-                                + RocksDBOptions.OPTIONS_FACTORY.key()
-                                + "' is not a valid options factory ("
-                                + factoryClassName
-                                + ')',
-                        e);
             }
         }
+
+        if (optionsFactory instanceof DefaultConfigurableOptionsFactory) {
+            LOG.warn(
+                    "{} is extending from {}, which is deprecated and will be 
removed in the "
+                            + "future. It is highly recommended to directly 
implement the "
+                            + "ConfigurableRocksDBOptionsFactory without 
extending the {}. "
+                            + "For more information, please refer to 
FLINK-24046.",
+                    optionsFactory,
+                    DefaultConfigurableOptionsFactory.class.getName(),
+                    DefaultConfigurableOptionsFactory.class.getName());
+        }
+
+        return optionsFactory;
     }
 
     // ------------------------------------------------------------------------
@@ -717,9 +741,9 @@ public class EmbeddedRocksDBStateBackend extends 
AbstractManagedMemoryStateBacke
      * serializable and hold native code references, they must be specified 
through a factory.
      *
      * <p>The options created by the factory here are applied on top of the 
pre-defined options
-     * profile selected via {@link #setPredefinedOptions(PredefinedOptions)}. 
If the pre-defined
-     * options profile is the default ({@link PredefinedOptions#DEFAULT}), 
then the factory fully
-     * controls the RocksDB options.
+     * profile selected via {@link #setPredefinedOptions(PredefinedOptions)} 
and user-configured
+     * options from configuration set by {@link #configure(ReadableConfig, 
ClassLoader)} with keys
+     * in {@link RocksDBConfigurableOptions}.
      *
      * @param optionsFactory The options factory that lazily creates the 
RocksDB options.
      */
@@ -782,6 +806,24 @@ public class EmbeddedRocksDBStateBackend extends 
AbstractManagedMemoryStateBacke
     //  utilities
     // ------------------------------------------------------------------------
 
+    private ReadableConfig mergeConfigurableOptions(ReadableConfig base, 
ReadableConfig onTop) {
+        if (base == null) {
+            base = new Configuration();
+        }
+        Configuration configuration = new Configuration();
+        for (ConfigOption<?> option : 
RocksDBConfigurableOptions.CANDIDATE_CONFIGS) {
+            Optional<?> baseValue = base.getOptional(option);
+            Optional<?> topValue = onTop.getOptional(option);
+
+            if (topValue.isPresent() || baseValue.isPresent()) {
+                Object validValue = topValue.isPresent() ? topValue.get() : 
baseValue.get();
+                RocksDBConfigurableOptions.checkArgumentValid(option, 
validValue);
+                configuration.setString(option.key(), validValue.toString());
+            }
+        }
+        return configuration;
+    }
+
     @VisibleForTesting
     RocksDBResourceContainer createOptionsAndResourceContainer() {
         return createOptionsAndResourceContainer(null);
@@ -792,6 +834,7 @@ public class EmbeddedRocksDBStateBackend extends 
AbstractManagedMemoryStateBacke
             @Nullable OpaqueMemoryResource<RocksDBSharedResources> 
sharedResources) {
 
         return new RocksDBResourceContainer(
+                configurableOptions != null ? configurableOptions : new 
Configuration(),
                 predefinedOptions != null ? predefinedOptions : 
PredefinedOptions.DEFAULT,
                 rocksDbOptionsFactory,
                 sharedResources);
diff --git 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/PredefinedOptions.java
 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/PredefinedOptions.java
index 11d5363..c6d2e18 100644
--- 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/PredefinedOptions.java
+++ 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/PredefinedOptions.java
@@ -18,15 +18,17 @@
 
 package org.apache.flink.contrib.streaming.state;
 
-import org.rocksdb.BlockBasedTableConfig;
-import org.rocksdb.BloomFilter;
-import org.rocksdb.ColumnFamilyOptions;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.MemorySize;
+
 import org.rocksdb.CompactionStyle;
-import org.rocksdb.DBOptions;
 import org.rocksdb.InfoLogLevel;
 
-import java.util.ArrayList;
-import java.util.Collection;
+import javax.annotation.Nullable;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
 
 /**
  * The {@code PredefinedOptions} are configuration settings for the {@link
@@ -38,6 +40,11 @@ import java.util.Collection;
  *
  * <p>All of them effectively disable the RocksDB log by default because this 
file would grow
  * indefinitely and will be deleted with the TM anyway.
+ *
+ * <p>The {@code PredefinedOptions} are designed to cope with different 
situations. If some
+ * configurations should be enabled unconditionally, they are not included in 
any of the pre-defined
+ * options. Please check {@link 
RocksDBResourceContainer#createBaseCommonDBOptions()} and {@link
+ * RocksDBResourceContainer#createBaseCommonColumnOptions()} for common 
settings.
  */
 public enum PredefinedOptions {
 
@@ -50,26 +57,12 @@ public enum PredefinedOptions {
      * <p>The following options are set:
      *
      * <ul>
-     *   <li>setUseFsync(false)
      *   <li>setInfoLogLevel(InfoLogLevel.HEADER_LEVEL)
-     *   <li>setStatsDumpPeriodSec(0)
      * </ul>
      */
-    DEFAULT {
-
-        @Override
-        public DBOptions createDBOptions(Collection<AutoCloseable> 
handlesToClose) {
-            return new DBOptions()
-                    .setUseFsync(false)
-                    .setInfoLogLevel(InfoLogLevel.HEADER_LEVEL)
-                    .setStatsDumpPeriodSec(0);
-        }
-
-        @Override
-        public ColumnFamilyOptions 
createColumnOptions(Collection<AutoCloseable> handlesToClose) {
-            return new ColumnFamilyOptions();
-        }
-    },
+    DEFAULT(
+            Collections.singletonMap(
+                    RocksDBConfigurableOptions.LOG_LEVEL, 
InfoLogLevel.HEADER_LEVEL)),
 
     /**
      * Pre-defined options for regular spinning hard disks.
@@ -83,35 +76,26 @@ public enum PredefinedOptions {
      *   <li>setCompactionStyle(CompactionStyle.LEVEL)
      *   <li>setLevelCompactionDynamicLevelBytes(true)
      *   <li>setIncreaseParallelism(4)
-     *   <li>setUseFsync(false)
      *   <li>setDisableDataSync(true)
      *   <li>setMaxOpenFiles(-1)
      *   <li>setInfoLogLevel(InfoLogLevel.HEADER_LEVEL)
-     *   <li>setStatsDumpPeriodSec(0)
      * </ul>
      *
      * <p>Note: Because Flink does not rely on RocksDB data on disk for 
recovery, there is no need
      * to sync data to stable storage.
      */
-    SPINNING_DISK_OPTIMIZED {
-
-        @Override
-        public DBOptions createDBOptions(Collection<AutoCloseable> 
handlesToClose) {
-            return new DBOptions()
-                    .setIncreaseParallelism(4)
-                    .setUseFsync(false)
-                    .setMaxOpenFiles(-1)
-                    .setInfoLogLevel(InfoLogLevel.HEADER_LEVEL)
-                    .setStatsDumpPeriodSec(0);
-        }
-
-        @Override
-        public ColumnFamilyOptions 
createColumnOptions(Collection<AutoCloseable> handlesToClose) {
-            return new ColumnFamilyOptions()
-                    .setCompactionStyle(CompactionStyle.LEVEL)
-                    .setLevelCompactionDynamicLevelBytes(true);
-        }
-    },
+    SPINNING_DISK_OPTIMIZED(
+            new HashMap<ConfigOption<?>, Object>() {
+                private static final long serialVersionUID = 1L;
+
+                {
+                    put(RocksDBConfigurableOptions.MAX_BACKGROUND_THREADS, 4);
+                    put(RocksDBConfigurableOptions.MAX_OPEN_FILES, -1);
+                    put(RocksDBConfigurableOptions.LOG_LEVEL, 
InfoLogLevel.HEADER_LEVEL);
+                    put(RocksDBConfigurableOptions.COMPACTION_STYLE, 
CompactionStyle.LEVEL);
+                    put(RocksDBConfigurableOptions.USE_DYNAMIC_LEVEL_SIZE, 
true);
+                }
+            }),
 
     /**
      * Pre-defined options for better performance on regular spinning hard 
disks, at the cost of a
@@ -131,55 +115,37 @@ public enum PredefinedOptions {
      *   <li>setIncreaseParallelism(4)
      *   <li>setMinWriteBufferNumberToMerge(3)
      *   <li>setMaxWriteBufferNumber(4)
-     *   <li>setUseFsync(false)
      *   <li>setMaxOpenFiles(-1)
      *   <li>setInfoLogLevel(InfoLogLevel.HEADER_LEVEL)
-     *   <li>setStatsDumpPeriodSec(0)
      *   <li>BlockBasedTableConfig.setBlockCacheSize(256 MBytes)
-     *   <li>BlockBasedTableConfigsetBlockSize(128 KBytes)
+     *   <li>BlockBasedTableConfig.setBlockSize(128 KBytes)
      * </ul>
      *
      * <p>Note: Because Flink does not rely on RocksDB data on disk for 
recovery, there is no need
      * to sync data to stable storage.
      */
-    SPINNING_DISK_OPTIMIZED_HIGH_MEM {
-
-        @Override
-        public DBOptions createDBOptions(Collection<AutoCloseable> 
handlesToClose) {
-            return new DBOptions()
-                    .setIncreaseParallelism(4)
-                    .setUseFsync(false)
-                    .setMaxOpenFiles(-1)
-                    .setInfoLogLevel(InfoLogLevel.HEADER_LEVEL)
-                    .setStatsDumpPeriodSec(0);
-        }
-
-        @Override
-        public ColumnFamilyOptions 
createColumnOptions(Collection<AutoCloseable> handlesToClose) {
-
-            final long blockCacheSize = 256 * 1024 * 1024;
-            final long blockSize = 128 * 1024;
-            final long targetFileSize = 256 * 1024 * 1024;
-            final long writeBufferSize = 64 * 1024 * 1024;
-
-            BloomFilter bloomFilter = new BloomFilter();
-            handlesToClose.add(bloomFilter);
-
-            return new ColumnFamilyOptions()
-                    .setCompactionStyle(CompactionStyle.LEVEL)
-                    .setLevelCompactionDynamicLevelBytes(true)
-                    .setTargetFileSizeBase(targetFileSize)
-                    .setMaxBytesForLevelBase(4 * targetFileSize)
-                    .setWriteBufferSize(writeBufferSize)
-                    .setMinWriteBufferNumberToMerge(3)
-                    .setMaxWriteBufferNumber(4)
-                    .setTableFormatConfig(
-                            new BlockBasedTableConfig()
-                                    .setBlockCacheSize(blockCacheSize)
-                                    .setBlockSize(blockSize)
-                                    .setFilter(bloomFilter));
-        }
-    },
+    SPINNING_DISK_OPTIMIZED_HIGH_MEM(
+            new HashMap<ConfigOption<?>, Object>() {
+                private static final long serialVersionUID = 1L;
+
+                {
+                    put(RocksDBConfigurableOptions.MAX_BACKGROUND_THREADS, 4);
+                    put(RocksDBConfigurableOptions.MAX_OPEN_FILES, -1);
+                    put(RocksDBConfigurableOptions.LOG_LEVEL, 
InfoLogLevel.HEADER_LEVEL);
+                    put(RocksDBConfigurableOptions.COMPACTION_STYLE, 
CompactionStyle.LEVEL);
+                    put(RocksDBConfigurableOptions.USE_DYNAMIC_LEVEL_SIZE, 
true);
+                    put(
+                            RocksDBConfigurableOptions.TARGET_FILE_SIZE_BASE,
+                            MemorySize.parse("256mb"));
+                    put(RocksDBConfigurableOptions.MAX_SIZE_LEVEL_BASE, 
MemorySize.parse("1gb"));
+                    put(RocksDBConfigurableOptions.WRITE_BUFFER_SIZE, 
MemorySize.parse("64mb"));
+                    
put(RocksDBConfigurableOptions.MIN_WRITE_BUFFER_NUMBER_TO_MERGE, 3);
+                    put(RocksDBConfigurableOptions.MAX_WRITE_BUFFER_NUMBER, 4);
+                    put(RocksDBConfigurableOptions.BLOCK_CACHE_SIZE, 
MemorySize.parse("256mb"));
+                    put(RocksDBConfigurableOptions.BLOCK_SIZE, 
MemorySize.parse("128kb"));
+                    put(RocksDBConfigurableOptions.USE_BLOOM_FILTER, true);
+                }
+            }),
 
     /**
      * Pre-defined options for Flash SSDs.
@@ -191,68 +157,55 @@ public enum PredefinedOptions {
      *
      * <ul>
      *   <li>setIncreaseParallelism(4)
-     *   <li>setUseFsync(false)
      *   <li>setDisableDataSync(true)
      *   <li>setMaxOpenFiles(-1)
      *   <li>setInfoLogLevel(InfoLogLevel.HEADER_LEVEL)
-     *   <li>setStatsDumpPeriodSec(0)
      * </ul>
      *
      * <p>Note: Because Flink does not rely on RocksDB data on disk for 
recovery, there is no need
      * to sync data to stable storage.
      */
-    FLASH_SSD_OPTIMIZED {
-
-        @Override
-        public DBOptions createDBOptions(Collection<AutoCloseable> 
handlesToClose) {
-            return new DBOptions()
-                    .setIncreaseParallelism(4)
-                    .setUseFsync(false)
-                    .setMaxOpenFiles(-1)
-                    .setInfoLogLevel(InfoLogLevel.HEADER_LEVEL)
-                    .setStatsDumpPeriodSec(0);
-        }
+    FLASH_SSD_OPTIMIZED(
+            new HashMap<ConfigOption<?>, Object>() {
+                private static final long serialVersionUID = 1L;
 
-        @Override
-        public ColumnFamilyOptions 
createColumnOptions(Collection<AutoCloseable> handlesToClose) {
-            return new ColumnFamilyOptions();
-        }
-    };
+                {
+                    put(RocksDBConfigurableOptions.MAX_BACKGROUND_THREADS, 4);
+                    put(RocksDBConfigurableOptions.MAX_OPEN_FILES, -1);
+                    put(RocksDBConfigurableOptions.LOG_LEVEL, 
InfoLogLevel.HEADER_LEVEL);
+                }
+            });
 
     // ------------------------------------------------------------------------
 
-    /**
-     * Creates the {@link DBOptions}for this pre-defined setting.
-     *
-     * @param handlesToClose The collection to register newly created {@link
-     *     org.rocksdb.RocksObject}s.
-     * @return The pre-defined options object.
-     */
-    public abstract DBOptions createDBOptions(Collection<AutoCloseable> 
handlesToClose);
+    /** Settings kept in this pre-defined options. */
+    private final Map<String, Object> options;
 
-    /**
-     * @return The pre-defined options object.
-     * @deprecated use {@link #createColumnOptions(Collection)} instead.
-     */
-    public DBOptions createDBOptions() {
-        return createDBOptions(new ArrayList<>());
+    PredefinedOptions(Map<ConfigOption<?>, Object> initMap) {
+        options = new HashMap<>(initMap.size());
+        for (Map.Entry<ConfigOption<?>, Object> entry : initMap.entrySet()) {
+            options.put(entry.getKey().key(), entry.getValue());
+        }
     }
 
     /**
-     * Creates the {@link org.rocksdb.ColumnFamilyOptions}for this pre-defined 
setting.
+     * Get a option value according to the pre-defined values. If not defined, 
return the default
+     * value.
      *
-     * @param handlesToClose The collection to register newly created {@link
-     *     org.rocksdb.RocksObject}s.
-     * @return The pre-defined options object.
-     */
-    public abstract ColumnFamilyOptions createColumnOptions(
-            Collection<AutoCloseable> handlesToClose);
-
-    /**
-     * @return The pre-defined options object.
-     * @deprecated use {@link #createColumnOptions(Collection)} instead.
+     * @param option the option.
+     * @param <T> the option value type.
+     * @return the value if defined, otherwise return the default value.
      */
-    public ColumnFamilyOptions createColumnOptions() {
-        return createColumnOptions(new ArrayList<>());
+    @Nullable
+    @SuppressWarnings("unchecked")
+    <T> T getValue(ConfigOption<T> option) {
+        Object value = options.get(option.key());
+        if (value == null) {
+            value = option.defaultValue();
+        }
+        if (value == null) {
+            return null;
+        }
+        return (T) value;
     }
 }
diff --git 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBConfigurableOptions.java
 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBConfigurableOptions.java
index 955a86c..8084ef9 100644
--- 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBConfigurableOptions.java
+++ 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBConfigurableOptions.java
@@ -21,11 +21,16 @@ package org.apache.flink.contrib.streaming.state;
 import org.apache.flink.configuration.ConfigOption;
 import org.apache.flink.configuration.MemorySize;
 import org.apache.flink.configuration.description.Description;
+import org.apache.flink.util.Preconditions;
 
 import org.rocksdb.CompactionStyle;
 import org.rocksdb.InfoLogLevel;
 
+import java.io.File;
 import java.io.Serializable;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.Set;
 
 import static org.apache.flink.configuration.ConfigOptions.key;
 import static org.apache.flink.configuration.description.LinkElement.link;
@@ -37,14 +42,11 @@ import static org.rocksdb.CompactionStyle.UNIVERSAL;
 import static org.rocksdb.InfoLogLevel.HEADER_LEVEL;
 
 /**
- * This class contains the configuration options for the {@link 
DefaultConfigurableOptionsFactory}.
+ * This class contains the configuration options for the {@link 
EmbeddedRocksDBStateBackend}.
  *
- * <p>If nothing specified, RocksDB's options would be configured by {@link 
PredefinedOptions} and
- * user-defined {@link RocksDBOptionsFactory}.
- *
- * <p>If some options has been specifically configured, a corresponding {@link
- * DefaultConfigurableOptionsFactory} would be created and applied on top of 
{@link
- * PredefinedOptions} except if a user-defined {@link RocksDBOptionsFactory} 
overrides it.
+ * <p>Currently, RocksDB's options would be configured by values here on top 
of {@link
+ * PredefinedOptions}, and then a user-defined {@link RocksDBOptionsFactory} 
may override the
+ * configurations here.
  */
 public class RocksDBConfigurableOptions implements Serializable {
 
@@ -55,34 +57,34 @@ public class RocksDBConfigurableOptions implements 
Serializable {
     public static final ConfigOption<Integer> MAX_BACKGROUND_THREADS =
             key("state.backend.rocksdb.thread.num")
                     .intType()
-                    .noDefaultValue()
+                    .defaultValue(2)
                     .withDescription(
                             "The maximum number of concurrent background flush 
and compaction jobs (per stateful operator). "
-                                    + "RocksDB has default configuration as 
'2'.");
+                                    + "The default value is '2'.");
 
     public static final ConfigOption<Integer> MAX_OPEN_FILES =
             key("state.backend.rocksdb.files.open")
                     .intType()
-                    .noDefaultValue()
+                    .defaultValue(-1)
                     .withDescription(
                             "The maximum number of open files (per stateful 
operator) that can be used by the DB, '-1' means no limit. "
-                                    + "RocksDB has default configuration as 
'-1'.");
+                                    + "The default value is '-1'.");
 
     public static final ConfigOption<MemorySize> LOG_MAX_FILE_SIZE =
             key("state.backend.rocksdb.log.max-file-size")
                     .memoryType()
-                    .noDefaultValue()
+                    .defaultValue(MemorySize.ZERO)
                     .withDescription(
                             "The maximum size of RocksDB's file used for 
information logging. "
                                     + "If the log files becomes larger than 
this, a new file will be created. "
-                                    + "If 0 (RocksDB default setting), all 
logs will be written to one log file.");
+                                    + "If 0 (Flink default setting), all logs 
will be written to one log file.");
 
     public static final ConfigOption<Integer> LOG_FILE_NUM =
             key("state.backend.rocksdb.log.file-num")
                     .intType()
-                    .noDefaultValue()
+                    .defaultValue(1000)
                     .withDescription(
-                            "The maximum number of files RocksDB should keep 
for information logging (RocksDB default setting: 1000).");
+                            "The maximum number of files RocksDB should keep 
for information logging (Default setting: 1000).");
 
     public static final ConfigOption<String> LOG_DIR =
             key("state.backend.rocksdb.log.dir")
@@ -90,13 +92,13 @@ public class RocksDBConfigurableOptions implements 
Serializable {
                     .noDefaultValue()
                     .withDescription(
                             "The directory for RocksDB's information logging 
files. "
-                                    + "If empty (RocksDB default setting), log 
files will be in the same directory as data files. "
+                                    + "If empty (Flink default setting), log 
files will be in the same directory as data files. "
                                     + "If non-empty, this directory will be 
used and the data directory's absolute path will be used as the prefix of the 
log file name.");
 
     public static final ConfigOption<InfoLogLevel> LOG_LEVEL =
             key("state.backend.rocksdb.log.level")
                     .enumType(InfoLogLevel.class)
-                    .noDefaultValue()
+                    .defaultValue(HEADER_LEVEL)
                     .withDescription(
                             Description.builder()
                                     .text(
@@ -124,11 +126,11 @@ public class RocksDBConfigurableOptions implements 
Serializable {
     public static final ConfigOption<CompactionStyle> COMPACTION_STYLE =
             key("state.backend.rocksdb.compaction.style")
                     .enumType(CompactionStyle.class)
-                    .noDefaultValue()
+                    .defaultValue(LEVEL)
                     .withDescription(
                             String.format(
                                     "The specified compaction style for DB. 
Candidate compaction style is %s, %s, %s or %s, "
-                                            + "and RocksDB choose '%s' as 
default style.",
+                                            + "and Flink chooses '%s' as 
default style.",
                                     LEVEL.name(),
                                     FIFO.name(),
                                     UNIVERSAL.name(),
@@ -138,7 +140,7 @@ public class RocksDBConfigurableOptions implements 
Serializable {
     public static final ConfigOption<Boolean> USE_DYNAMIC_LEVEL_SIZE =
             key("state.backend.rocksdb.compaction.level.use-dynamic-size")
                     .booleanType()
-                    .noDefaultValue()
+                    .defaultValue(false)
                     .withDescription(
                             Description.builder()
                                     .text(
@@ -147,7 +149,7 @@ public class RocksDBConfigurableOptions implements 
Serializable {
                                             "RocksDB would make last level the 
base level, which means merging L0 data into the last level, ")
                                     .text(
                                             "until it exceeds 
max_bytes_for_level_base. And then repeat this process for second last level 
and so on. ")
-                                    .text("RocksDB has default configuration 
as 'false'. ")
+                                    .text("The default value is 'false'. ")
                                     .text(
                                             "For more information, please 
refer to %s",
                                             link(
@@ -158,67 +160,67 @@ public class RocksDBConfigurableOptions implements 
Serializable {
     public static final ConfigOption<MemorySize> TARGET_FILE_SIZE_BASE =
             key("state.backend.rocksdb.compaction.level.target-file-size-base")
                     .memoryType()
-                    .noDefaultValue()
+                    .defaultValue(MemorySize.parse("64mb"))
                     .withDescription(
                             "The target file size for compaction, which 
determines a level-1 file size. "
-                                    + "RocksDB has default configuration as 
'64MB'.");
+                                    + "The default value is '64MB'.");
 
     public static final ConfigOption<MemorySize> MAX_SIZE_LEVEL_BASE =
             key("state.backend.rocksdb.compaction.level.max-size-level-base")
                     .memoryType()
-                    .noDefaultValue()
+                    .defaultValue(MemorySize.parse("256mb"))
                     .withDescription(
                             "The upper-bound of the total size of level base 
files in bytes. "
-                                    + "RocksDB has default configuration as 
'256MB'.");
+                                    + "The default value is '256MB'.");
 
     public static final ConfigOption<MemorySize> WRITE_BUFFER_SIZE =
             key("state.backend.rocksdb.writebuffer.size")
                     .memoryType()
-                    .noDefaultValue()
+                    .defaultValue(MemorySize.parse("64mb"))
                     .withDescription(
                             "The amount of data built up in memory (backed by 
an unsorted log on disk) "
-                                    + "before converting to a sorted on-disk 
files. RocksDB has default writebuffer size as '64MB'.");
+                                    + "before converting to a sorted on-disk 
files. The default writebuffer size is '64MB'.");
 
     public static final ConfigOption<Integer> MAX_WRITE_BUFFER_NUMBER =
             key("state.backend.rocksdb.writebuffer.count")
                     .intType()
-                    .noDefaultValue()
+                    .defaultValue(2)
                     .withDescription(
                             "The maximum number of write buffers that are 
built up in memory. "
-                                    + "RocksDB has default configuration as 
'2'.");
+                                    + "The default value is '2'.");
 
     public static final ConfigOption<Integer> MIN_WRITE_BUFFER_NUMBER_TO_MERGE 
=
             key("state.backend.rocksdb.writebuffer.number-to-merge")
                     .intType()
-                    .noDefaultValue()
+                    .defaultValue(1)
                     .withDescription(
                             "The minimum number of write buffers that will be 
merged together before writing to storage. "
-                                    + "RocksDB has default configuration as 
'1'.");
+                                    + "The default value is '1'.");
 
     public static final ConfigOption<MemorySize> BLOCK_SIZE =
             key("state.backend.rocksdb.block.blocksize")
                     .memoryType()
-                    .noDefaultValue()
+                    .defaultValue(MemorySize.parse("4kb"))
                     .withDescription(
                             "The approximate size (in bytes) of user data 
packed per block. "
-                                    + "RocksDB has default blocksize as 
'4KB'.");
+                                    + "The default blocksize is '4KB'.");
 
     public static final ConfigOption<MemorySize> METADATA_BLOCK_SIZE =
             key("state.backend.rocksdb.block.metadata-blocksize")
                     .memoryType()
-                    .noDefaultValue()
+                    .defaultValue(MemorySize.parse("4kb"))
                     .withDescription(
                             "Approximate size of partitioned metadata packed 
per block. "
                                     + "Currently applied to indexes block when 
partitioned index/filters option is enabled. "
-                                    + "RocksDB has default metadata blocksize 
as '4KB'.");
+                                    + "The default blocksize is '4KB'.");
 
     public static final ConfigOption<MemorySize> BLOCK_CACHE_SIZE =
             key("state.backend.rocksdb.block.cache-size")
                     .memoryType()
-                    .noDefaultValue()
+                    .defaultValue(MemorySize.parse("8mb"))
                     .withDescription(
                             "The amount of the cache for data blocks in 
RocksDB. "
-                                    + "RocksDB has default block-cache size as 
'8MB'.");
+                                    + "The default block-cache size is 
'8MB'.");
 
     public static final ConfigOption<MemorySize> WRITE_BATCH_SIZE =
             key("state.backend.rocksdb.write-batch-size")
@@ -233,19 +235,96 @@ public class RocksDBConfigurableOptions implements 
Serializable {
                     .booleanType()
                     .defaultValue(false)
                     .withDescription(
-                            "If true, every newly created SST file will 
contain a Bloom filter. RocksDB disables it by default.");
+                            "If true, every newly created SST file will 
contain a Bloom filter. "
+                                    + "It is disabled by default.");
 
     public static final ConfigOption<Double> BLOOM_FILTER_BITS_PER_KEY =
             key("state.backend.rocksdb.bloom-filter.bits-per-key")
                     .doubleType()
                     .defaultValue(10.0)
                     .withDescription(
-                            "Bits per key that bloom filter will use, this 
only take effect when bloom filter is used.");
+                            "Bits per key that bloom filter will use, this 
only take effect when bloom filter is used. "
+                                    + "The default value is 10.0.");
 
     public static final ConfigOption<Boolean> BLOOM_FILTER_BLOCK_BASED_MODE =
             key("state.backend.rocksdb.bloom-filter.block-based-mode")
                     .booleanType()
                     .defaultValue(false)
                     .withDescription(
-                            "If true, RocksDB will use block-based filter 
instead of full filter, this only take effect when bloom filter is used.");
+                            "If true, RocksDB will use block-based filter 
instead of full filter, this only take effect when bloom filter is used. "
+                                    + "The default value is 'false'.");
+
+    static final ConfigOption<?>[] CANDIDATE_CONFIGS =
+            new ConfigOption<?>[] {
+                // configurable DBOptions
+                MAX_BACKGROUND_THREADS,
+                MAX_OPEN_FILES,
+                LOG_LEVEL,
+                LOG_MAX_FILE_SIZE,
+                LOG_FILE_NUM,
+                LOG_DIR,
+
+                // configurable ColumnFamilyOptions
+                COMPACTION_STYLE,
+                USE_DYNAMIC_LEVEL_SIZE,
+                TARGET_FILE_SIZE_BASE,
+                MAX_SIZE_LEVEL_BASE,
+                WRITE_BUFFER_SIZE,
+                MAX_WRITE_BUFFER_NUMBER,
+                MIN_WRITE_BUFFER_NUMBER_TO_MERGE,
+                BLOCK_SIZE,
+                METADATA_BLOCK_SIZE,
+                BLOCK_CACHE_SIZE,
+                USE_BLOOM_FILTER,
+                BLOOM_FILTER_BITS_PER_KEY,
+                BLOOM_FILTER_BLOCK_BASED_MODE
+            };
+
+    private static final Set<ConfigOption<?>> POSITIVE_INT_CONFIG_SET =
+            new HashSet<>(
+                    Arrays.asList(
+                            MAX_BACKGROUND_THREADS,
+                            LOG_FILE_NUM,
+                            MAX_WRITE_BUFFER_NUMBER,
+                            MIN_WRITE_BUFFER_NUMBER_TO_MERGE));
+
+    private static final Set<ConfigOption<?>> SIZE_CONFIG_SET =
+            new HashSet<>(
+                    Arrays.asList(
+                            TARGET_FILE_SIZE_BASE,
+                            MAX_SIZE_LEVEL_BASE,
+                            WRITE_BUFFER_SIZE,
+                            BLOCK_SIZE,
+                            METADATA_BLOCK_SIZE,
+                            BLOCK_CACHE_SIZE));
+
+    /**
+     * Helper method to check whether the (key,value) is valid through given 
configuration and
+     * returns the formatted value.
+     *
+     * @param option The configuration key which is configurable in {@link
+     *     RocksDBConfigurableOptions}.
+     * @param value The value within given configuration.
+     */
+    static void checkArgumentValid(ConfigOption<?> option, Object value) {
+        final String key = option.key();
+
+        if (POSITIVE_INT_CONFIG_SET.contains(option)) {
+            Preconditions.checkArgument(
+                    (Integer) value > 0,
+                    "Configured value for key: " + key + " must be larger than 
0.");
+        } else if (SIZE_CONFIG_SET.contains(option)) {
+            Preconditions.checkArgument(
+                    ((MemorySize) value).getBytes() > 0,
+                    "Configured size for key" + key + " must be larger than 
0.");
+        } else if (LOG_MAX_FILE_SIZE.equals(option)) {
+            Preconditions.checkArgument(
+                    ((MemorySize) value).getBytes() >= 0,
+                    "Configured size for key " + key + " must be larger than 
or equal to 0.");
+        } else if (LOG_DIR.equals(option)) {
+            Preconditions.checkArgument(
+                    new File((String) value).isAbsolute(),
+                    "Configured path for key " + key + " is not absolute.");
+        }
+    }
 }
diff --git 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBOptions.java
 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBOptions.java
index 8ec535a..0cf3602 100644
--- 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBOptions.java
+++ 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBOptions.java
@@ -84,12 +84,11 @@ public class RocksDBOptions {
     public static final ConfigOption<String> OPTIONS_FACTORY =
             ConfigOptions.key("state.backend.rocksdb.options-factory")
                     .stringType()
-                    
.defaultValue(DefaultConfigurableOptionsFactory.class.getName())
+                    .noDefaultValue()
                     .withDescription(
-                            String.format(
-                                    "The options factory class for RocksDB to 
create DBOptions and ColumnFamilyOptions. "
-                                            + "The default options factory is 
%s, and it would read the configured options which provided in 
'RocksDBConfigurableOptions'.",
-                                    
DefaultConfigurableOptionsFactory.class.getName()));
+                            "The options factory class for users to add 
customized options in DBOptions and ColumnFamilyOptions for RocksDB. "
+                                    + "If set, the RocksDB state backend will 
load the class and apply configs to DBOptions and ColumnFamilyOptions "
+                                    + "after loading ones from 
'RocksDBConfigurableOptions' and pre-defined options.");
 
     @Documentation.Section(Documentation.Sections.STATE_BACKEND_ROCKSDB)
     public static final ConfigOption<Boolean> USE_MANAGED_MEMORY =
diff --git 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBResourceContainer.java
 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBResourceContainer.java
index d5917be..3cdb855 100644
--- 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBResourceContainer.java
+++ 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBResourceContainer.java
@@ -18,6 +18,9 @@
 
 package org.apache.flink.contrib.streaming.state;
 
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.ReadableConfig;
 import org.apache.flink.runtime.memory.OpaqueMemoryResource;
 import org.apache.flink.util.IOUtils;
 import org.apache.flink.util.Preconditions;
@@ -29,6 +32,7 @@ import org.rocksdb.ColumnFamilyOptions;
 import org.rocksdb.DBOptions;
 import org.rocksdb.Filter;
 import org.rocksdb.IndexType;
+import org.rocksdb.PlainTableConfig;
 import org.rocksdb.ReadOptions;
 import org.rocksdb.TableFormatConfig;
 import org.rocksdb.WriteOptions;
@@ -38,6 +42,7 @@ import org.slf4j.LoggerFactory;
 import javax.annotation.Nullable;
 
 import java.util.ArrayList;
+import java.util.Collection;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
@@ -51,6 +56,9 @@ import static 
org.apache.flink.util.Preconditions.checkNotNull;
 public final class RocksDBResourceContainer implements AutoCloseable {
     private static final Logger LOG = 
LoggerFactory.getLogger(RocksDBResourceContainer.class);
 
+    /** The configurations from file. */
+    private final ReadableConfig configuration;
+
     /** The pre-configured option settings. */
     private final PredefinedOptions predefinedOptions;
 
@@ -68,19 +76,28 @@ public final class RocksDBResourceContainer implements 
AutoCloseable {
     private final ArrayList<AutoCloseable> handlesToClose;
 
     public RocksDBResourceContainer() {
-        this(PredefinedOptions.DEFAULT, null, null);
+        this(new Configuration(), PredefinedOptions.DEFAULT, null, null);
     }
 
     public RocksDBResourceContainer(
             PredefinedOptions predefinedOptions, @Nullable 
RocksDBOptionsFactory optionsFactory) {
-        this(predefinedOptions, optionsFactory, null);
+        this(new Configuration(), predefinedOptions, optionsFactory, null);
+    }
+
+    public RocksDBResourceContainer(
+            PredefinedOptions predefinedOptions,
+            @Nullable RocksDBOptionsFactory optionsFactory,
+            @Nullable OpaqueMemoryResource<RocksDBSharedResources> 
sharedResources) {
+        this(new Configuration(), predefinedOptions, optionsFactory, 
sharedResources);
     }
 
     public RocksDBResourceContainer(
+            ReadableConfig configuration,
             PredefinedOptions predefinedOptions,
             @Nullable RocksDBOptionsFactory optionsFactory,
             @Nullable OpaqueMemoryResource<RocksDBSharedResources> 
sharedResources) {
 
+        this.configuration = configuration;
         this.predefinedOptions = checkNotNull(predefinedOptions);
         this.optionsFactory = optionsFactory;
         this.sharedResources = sharedResources;
@@ -89,10 +106,13 @@ public final class RocksDBResourceContainer implements 
AutoCloseable {
 
     /** Gets the RocksDB {@link DBOptions} to be used for RocksDB instances. */
     public DBOptions getDbOptions() {
-        // initial options from pre-defined profile
-        DBOptions opt = predefinedOptions.createDBOptions(handlesToClose);
+        // initial options from common profile
+        DBOptions opt = createBaseCommonDBOptions();
         handlesToClose.add(opt);
 
+        // load configurable options on top of pre-defined profile
+        setDBOptionsFromConfigurableOptions(opt);
+
         // add user-defined options factory, if specified
         if (optionsFactory != null) {
             opt = optionsFactory.createDBOptions(opt, handlesToClose);
@@ -125,10 +145,13 @@ public final class RocksDBResourceContainer implements 
AutoCloseable {
 
     /** Gets the RocksDB {@link ColumnFamilyOptions} to be used for all 
RocksDB instances. */
     public ColumnFamilyOptions getColumnOptions() {
-        // initial options from pre-defined profile
-        ColumnFamilyOptions opt = 
predefinedOptions.createColumnOptions(handlesToClose);
+        // initial options from common profile
+        ColumnFamilyOptions opt = createBaseCommonColumnOptions();
         handlesToClose.add(opt);
 
+        // load configurable options on top of pre-defined profile
+        setColumnFamilyOptionsFromConfigurableOptions(opt, handlesToClose);
+
         // add user-defined options, if specified
         if (optionsFactory != null) {
             opt = optionsFactory.createColumnOptions(opt, handlesToClose);
@@ -234,4 +257,118 @@ public final class RocksDBResourceContainer implements 
AutoCloseable {
         }
         return true;
     }
+
+    /** Create a {@link DBOptions} for RocksDB, including some common 
settings. */
+    DBOptions createBaseCommonDBOptions() {
+        return new DBOptions().setUseFsync(false).setStatsDumpPeriodSec(0);
+    }
+
+    /** Create a {@link ColumnFamilyOptions} for RocksDB, including some 
common settings. */
+    ColumnFamilyOptions createBaseCommonColumnOptions() {
+        return new ColumnFamilyOptions();
+    }
+
+    /**
+     * Get a value for option from pre-defined option and configurable option 
settings. The priority
+     * relationship is as below.
+     *
+     * <p>Configured value > pre-defined value > default value.
+     *
+     * @param option the wanted option
+     * @param <T> the value type
+     * @return the final value for the option according to the priority above.
+     */
+    @Nullable
+    private <T> T internalGetOption(ConfigOption<T> option) {
+        return configuration
+                .getOptional(option)
+                .orElseGet(() -> predefinedOptions.getValue(option));
+    }
+
+    @SuppressWarnings("ConstantConditions")
+    private DBOptions setDBOptionsFromConfigurableOptions(DBOptions 
currentOptions) {
+
+        currentOptions.setIncreaseParallelism(
+                
internalGetOption(RocksDBConfigurableOptions.MAX_BACKGROUND_THREADS));
+
+        currentOptions.setMaxOpenFiles(
+                internalGetOption(RocksDBConfigurableOptions.MAX_OPEN_FILES));
+
+        
currentOptions.setInfoLogLevel(internalGetOption(RocksDBConfigurableOptions.LOG_LEVEL));
+
+        String logDir = internalGetOption(RocksDBConfigurableOptions.LOG_DIR);
+        if (logDir != null && !logDir.isEmpty()) {
+            currentOptions.setDbLogDir(logDir);
+        }
+
+        currentOptions.setMaxLogFileSize(
+                
internalGetOption(RocksDBConfigurableOptions.LOG_MAX_FILE_SIZE).getBytes());
+
+        currentOptions.setKeepLogFileNum(
+                internalGetOption(RocksDBConfigurableOptions.LOG_FILE_NUM));
+
+        return currentOptions;
+    }
+
+    @SuppressWarnings("ConstantConditions")
+    private ColumnFamilyOptions setColumnFamilyOptionsFromConfigurableOptions(
+            ColumnFamilyOptions currentOptions, Collection<AutoCloseable> 
handlesToClose) {
+
+        currentOptions.setCompactionStyle(
+                
internalGetOption(RocksDBConfigurableOptions.COMPACTION_STYLE));
+
+        currentOptions.setLevelCompactionDynamicLevelBytes(
+                
internalGetOption(RocksDBConfigurableOptions.USE_DYNAMIC_LEVEL_SIZE));
+
+        currentOptions.setTargetFileSizeBase(
+                
internalGetOption(RocksDBConfigurableOptions.TARGET_FILE_SIZE_BASE).getBytes());
+
+        currentOptions.setMaxBytesForLevelBase(
+                
internalGetOption(RocksDBConfigurableOptions.MAX_SIZE_LEVEL_BASE).getBytes());
+
+        currentOptions.setWriteBufferSize(
+                
internalGetOption(RocksDBConfigurableOptions.WRITE_BUFFER_SIZE).getBytes());
+
+        currentOptions.setMaxWriteBufferNumber(
+                
internalGetOption(RocksDBConfigurableOptions.MAX_WRITE_BUFFER_NUMBER));
+
+        currentOptions.setMinWriteBufferNumberToMerge(
+                
internalGetOption(RocksDBConfigurableOptions.MIN_WRITE_BUFFER_NUMBER_TO_MERGE));
+
+        TableFormatConfig tableFormatConfig = 
currentOptions.tableFormatConfig();
+
+        BlockBasedTableConfig blockBasedTableConfig;
+        if (tableFormatConfig == null) {
+            blockBasedTableConfig = new BlockBasedTableConfig();
+        } else {
+            if (tableFormatConfig instanceof PlainTableConfig) {
+                // if the table format config is PlainTableConfig, we just 
return current
+                // column-family options
+                return currentOptions;
+            } else {
+                blockBasedTableConfig = (BlockBasedTableConfig) 
tableFormatConfig;
+            }
+        }
+
+        blockBasedTableConfig.setBlockSize(
+                
internalGetOption(RocksDBConfigurableOptions.BLOCK_SIZE).getBytes());
+
+        blockBasedTableConfig.setMetadataBlockSize(
+                
internalGetOption(RocksDBConfigurableOptions.METADATA_BLOCK_SIZE).getBytes());
+
+        blockBasedTableConfig.setBlockCacheSize(
+                
internalGetOption(RocksDBConfigurableOptions.BLOCK_CACHE_SIZE).getBytes());
+
+        if (internalGetOption(RocksDBConfigurableOptions.USE_BLOOM_FILTER)) {
+            final double bitsPerKey =
+                    
internalGetOption(RocksDBConfigurableOptions.BLOOM_FILTER_BITS_PER_KEY);
+            final boolean blockBasedMode =
+                    
internalGetOption(RocksDBConfigurableOptions.BLOOM_FILTER_BLOCK_BASED_MODE);
+            BloomFilter bloomFilter = new BloomFilter(bitsPerKey, 
blockBasedMode);
+            handlesToClose.add(bloomFilter);
+            blockBasedTableConfig.setFilterPolicy(bloomFilter);
+        }
+
+        return currentOptions.setTableFormatConfig(blockBasedTableConfig);
+    }
 }
diff --git 
a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBResource.java
 
b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBResource.java
index 63c4afa..71d3ba0 100644
--- 
a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBResource.java
+++ 
b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBResource.java
@@ -27,6 +27,7 @@ import org.rocksdb.ColumnFamilyDescriptor;
 import org.rocksdb.ColumnFamilyHandle;
 import org.rocksdb.ColumnFamilyOptions;
 import org.rocksdb.DBOptions;
+import org.rocksdb.InfoLogLevel;
 import org.rocksdb.ReadOptions;
 import org.rocksdb.RocksDB;
 import org.rocksdb.WriteOptions;
@@ -89,8 +90,12 @@ public class RocksDBResource extends ExternalResource {
                             LOG.error("Close previous DBOptions's instance 
failed.", e);
                         }
 
-                        return 
PredefinedOptions.FLASH_SSD_OPTIMIZED.createDBOptions(
-                                handlesToClose);
+                        return new DBOptions()
+                                .setIncreaseParallelism(4)
+                                .setUseFsync(false)
+                                .setMaxOpenFiles(-1)
+                                .setInfoLogLevel(InfoLogLevel.HEADER_LEVEL)
+                                .setStatsDumpPeriodSec(0);
                     }
 
                     @Override
@@ -104,9 +109,7 @@ public class RocksDBResource extends ExternalResource {
                             LOG.error("Close previous ColumnOptions's instance 
failed.", e);
                         }
 
-                        return PredefinedOptions.FLASH_SSD_OPTIMIZED
-                                .createColumnOptions(handlesToClose)
-                                .optimizeForPointLookup(40960);
+                        return new 
ColumnFamilyOptions().optimizeForPointLookup(40960);
                     }
                 });
     }
@@ -156,13 +159,14 @@ public class RocksDBResource extends ExternalResource {
         this.dbOptions =
                 optionsFactory
                         .createDBOptions(
-                                
PredefinedOptions.DEFAULT.createDBOptions(handlesToClose),
+                                new DBOptions()
+                                        .setUseFsync(false)
+                                        
.setInfoLogLevel(InfoLogLevel.HEADER_LEVEL)
+                                        .setStatsDumpPeriodSec(0),
                                 handlesToClose)
                         .setCreateIfMissing(true);
         this.columnFamilyOptions =
-                optionsFactory.createColumnOptions(
-                        
PredefinedOptions.DEFAULT.createColumnOptions(handlesToClose),
-                        handlesToClose);
+                optionsFactory.createColumnOptions(new ColumnFamilyOptions(), 
handlesToClose);
         this.writeOptions = new WriteOptions();
         this.writeOptions.disableWAL();
         this.readOptions = new ReadOptions();
diff --git 
a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java
 
b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java
index 5fcfe3e..4e494ee 100644
--- 
a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java
+++ 
b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java
@@ -59,7 +59,6 @@ import org.rocksdb.InfoLogLevel;
 import org.rocksdb.util.SizeUnit;
 
 import java.io.File;
-import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
 
@@ -464,63 +463,8 @@ public class RocksDBStateBackendConfigTest {
     }
 
     @Test
-    public void testSetConfigurableOptions() throws Exception {
-        DefaultConfigurableOptionsFactory customizedOptions =
-                new DefaultConfigurableOptionsFactory()
-                        .setMaxBackgroundThreads(4)
-                        .setMaxOpenFiles(-1)
-                        .setLogLevel(InfoLogLevel.DEBUG_LEVEL)
-                        .setLogDir("/tmp/rocksdb-logs/")
-                        .setLogFileNum(10)
-                        .setMaxLogFileSize("2MB")
-                        .setCompactionStyle(CompactionStyle.LEVEL)
-                        .setUseDynamicLevelSize(true)
-                        .setTargetFileSizeBase("4MB")
-                        .setMaxSizeLevelBase("128 mb")
-                        .setWriteBufferSize("128 MB")
-                        .setMaxWriteBufferNumber(4)
-                        .setMinWriteBufferNumberToMerge(3)
-                        .setBlockSize("64KB")
-                        .setMetadataBlockSize("16KB")
-                        .setBlockCacheSize("512mb")
-                        .setUseBloomFilter(true)
-                        .setBloomFilterBitsPerKey(12.0)
-                        .setBloomFilterBlockBasedMode(false);
-
-        try (RocksDBResourceContainer optionsContainer =
-                new RocksDBResourceContainer(PredefinedOptions.DEFAULT, 
customizedOptions)) {
-
-            DBOptions dbOptions = optionsContainer.getDbOptions();
-            assertEquals(-1, dbOptions.maxOpenFiles());
-
-            assertEquals(InfoLogLevel.DEBUG_LEVEL, dbOptions.infoLogLevel());
-            assertEquals("/tmp/rocksdb-logs/", dbOptions.dbLogDir());
-            assertEquals(10, dbOptions.keepLogFileNum());
-            assertEquals(2 * SizeUnit.MB, dbOptions.maxLogFileSize());
-
-            ColumnFamilyOptions columnOptions = 
optionsContainer.getColumnOptions();
-            assertEquals(CompactionStyle.LEVEL, 
columnOptions.compactionStyle());
-            assertTrue(columnOptions.levelCompactionDynamicLevelBytes());
-            assertEquals(4 * SizeUnit.MB, columnOptions.targetFileSizeBase());
-            assertEquals(128 * SizeUnit.MB, 
columnOptions.maxBytesForLevelBase());
-            assertEquals(4, columnOptions.maxWriteBufferNumber());
-            assertEquals(3, columnOptions.minWriteBufferNumberToMerge());
-
-            BlockBasedTableConfig tableConfig =
-                    (BlockBasedTableConfig) columnOptions.tableFormatConfig();
-            assertEquals(64 * SizeUnit.KB, tableConfig.blockSize());
-            assertEquals(16 * SizeUnit.KB, tableConfig.metadataBlockSize());
-            assertEquals(512 * SizeUnit.MB, tableConfig.blockCacheSize());
-            assertTrue(tableConfig.filterPolicy() instanceof BloomFilter);
-        }
-    }
-
-    @Test
     public void testConfigurableOptionsFromConfig() throws Exception {
         Configuration configuration = new Configuration();
-        DefaultConfigurableOptionsFactory defaultOptionsFactory =
-                new DefaultConfigurableOptionsFactory();
-        
assertTrue(defaultOptionsFactory.configure(configuration).getConfiguredOptions().isEmpty());
 
         // verify illegal configuration
         {
@@ -570,12 +514,9 @@ public class RocksDBStateBackendConfigTest {
             
configuration.setString(RocksDBConfigurableOptions.BLOCK_CACHE_SIZE.key(), "512 
mb");
             
configuration.setString(RocksDBConfigurableOptions.USE_BLOOM_FILTER.key(), 
"TRUE");
 
-            DefaultConfigurableOptionsFactory optionsFactory =
-                    new DefaultConfigurableOptionsFactory();
-            optionsFactory.configure(configuration);
-
             try (RocksDBResourceContainer optionsContainer =
-                    new RocksDBResourceContainer(PredefinedOptions.DEFAULT, 
optionsFactory)) {
+                    new RocksDBResourceContainer(
+                            configuration, PredefinedOptions.DEFAULT, null, 
null)) {
 
                 DBOptions dbOptions = optionsContainer.getDbOptions();
                 assertEquals(-1, dbOptions.maxOpenFiles());
@@ -649,6 +590,32 @@ public class RocksDBStateBackendConfigTest {
     }
 
     @Test
+    public void testPredefinedAndConfigurableOptions() throws Exception {
+        Configuration configuration = new Configuration();
+        configuration.set(RocksDBConfigurableOptions.COMPACTION_STYLE, 
CompactionStyle.UNIVERSAL);
+        try (final RocksDBResourceContainer optionsContainer =
+                new RocksDBResourceContainer(
+                        configuration, 
PredefinedOptions.SPINNING_DISK_OPTIMIZED, null, null)) {
+
+            final ColumnFamilyOptions columnFamilyOptions = 
optionsContainer.getColumnOptions();
+            assertNotNull(columnFamilyOptions);
+            assertEquals(CompactionStyle.UNIVERSAL, 
columnFamilyOptions.compactionStyle());
+        }
+
+        try (final RocksDBResourceContainer optionsContainer =
+                new RocksDBResourceContainer(
+                        new Configuration(),
+                        PredefinedOptions.SPINNING_DISK_OPTIMIZED,
+                        null,
+                        null)) {
+
+            final ColumnFamilyOptions columnFamilyOptions = 
optionsContainer.getColumnOptions();
+            assertNotNull(columnFamilyOptions);
+            assertEquals(CompactionStyle.LEVEL, 
columnFamilyOptions.compactionStyle());
+        }
+    }
+
+    @Test
     public void testPredefinedAndOptionsFactory() throws Exception {
         final RocksDBOptionsFactory optionsFactory =
                 new RocksDBOptionsFactory() {
@@ -676,18 +643,6 @@ public class RocksDBStateBackendConfigTest {
         }
     }
 
-    @Test
-    public void testPredefinedOptionsEnum() {
-        ArrayList<AutoCloseable> handlesToClose = new ArrayList<>();
-        for (PredefinedOptions o : PredefinedOptions.values()) {
-            try (DBOptions opt = o.createDBOptions(handlesToClose)) {
-                assertNotNull(opt);
-            }
-        }
-        handlesToClose.forEach(IOUtils::closeQuietly);
-        handlesToClose.clear();
-    }
-
     // ------------------------------------------------------------------------
     //  Reconfiguration
     // ------------------------------------------------------------------------
@@ -846,9 +801,9 @@ public class RocksDBStateBackendConfigTest {
         Configuration configuration = new Configuration();
         configuration.setString(configOption.key(), configValue);
 
-        DefaultConfigurableOptionsFactory optionsFactory = new 
DefaultConfigurableOptionsFactory();
+        EmbeddedRocksDBStateBackend stateBackend = new 
EmbeddedRocksDBStateBackend();
         try {
-            optionsFactory.configure(configuration);
+            stateBackend.configure(configuration, null);
             fail("Not throwing expected IllegalArgumentException.");
         } catch (IllegalArgumentException e) {
             // ignored

Reply via email to