This is an automated email from the ASF dual-hosted git repository.
srichter pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new a8580ca [FLINK-10912][rocksdb] Configurable RocksDBStateBackend
options
a8580ca is described below
commit a8580ca08dab19f83ee87ea83e0801c516687e62
Author: Yun Tang <[email protected]>
AuthorDate: Mon Jan 28 17:28:31 2019 +0800
[FLINK-10912][rocksdb] Configurable RocksDBStateBackend options
This closes #7586.
---
.../rocks_db_configurable_configuration.html | 66 ++++
.../generated/rocks_db_configuration.html | 10 +
docs/ops/config.md | 7 +
docs/ops/state/large_state_tuning.md | 73 ++--
.../state/ConfigurableOptionsFactory.java | 39 ++
.../state/DefaultConfigurableOptionsFactory.java | 425 +++++++++++++++++++++
.../state/RocksDBConfigurableOptions.java | 123 ++++++
.../contrib/streaming/state/RocksDBOptions.java | 25 ++
.../streaming/state/RocksDBStateBackend.java | 89 ++++-
.../state/RocksDBStateBackendConfigTest.java | 169 ++++++++
10 files changed, 991 insertions(+), 35 deletions(-)
diff --git a/docs/_includes/generated/rocks_db_configurable_configuration.html
b/docs/_includes/generated/rocks_db_configurable_configuration.html
new file mode 100644
index 0000000..638a3a5
--- /dev/null
+++ b/docs/_includes/generated/rocks_db_configurable_configuration.html
@@ -0,0 +1,66 @@
+<table class="table table-bordered">
+ <thead>
+ <tr>
+ <th class="text-left" style="width: 20%">Key</th>
+ <th class="text-left" style="width: 15%">Default</th>
+ <th class="text-left" style="width: 65%">Description</th>
+ </tr>
+ </thead>
+ <tbody>
+ <tr>
+ <td><h5>state.backend.rocksdb.block.blocksize</h5></td>
+ <td style="word-wrap: break-word;">(none)</td>
+ <td>The approximate size (in bytes) of user data packed per block.
RocksDB has default blocksize as '4KB'.</td>
+ </tr>
+ <tr>
+ <td><h5>state.backend.rocksdb.block.cache-size</h5></td>
+ <td style="word-wrap: break-word;">(none)</td>
+ <td>The amount of the cache for data blocks in RocksDB. RocksDB
has default block-cache size as '8MB'.</td>
+ </tr>
+ <tr>
+
<td><h5>state.backend.rocksdb.compaction.level.max-size-level-base</h5></td>
+ <td style="word-wrap: break-word;">(none)</td>
+ <td>The upper-bound of the total size of level base files in
bytes. RocksDB has default configuration as '10MB'.</td>
+ </tr>
+ <tr>
+
<td><h5>state.backend.rocksdb.compaction.level.target-file-size-base</h5></td>
+ <td style="word-wrap: break-word;">(none)</td>
+ <td>The target file size for compaction, which determines a
level-1 file size. RocksDB has default configuration as '2MB'.</td>
+ </tr>
+ <tr>
+
<td><h5>state.backend.rocksdb.compaction.level.use-dynamic-size</h5></td>
+ <td style="word-wrap: break-word;">(none)</td>
+ <td>If true, RocksDB will pick target size of each level
dynamically. From an empty DB, RocksDB would make last level the base level,
which means merging L0 data into the last level, until it exceeds
max_bytes_for_level_base. And then repeat this process for second last level
and so on. RocksDB has default configuration as 'false'. For more information,
please refer to <a
href="https://github.com/facebook/rocksdb/wiki/Leveled-Compaction#level_compaction_dynamic_level_bytes-is
[...]
+ </tr>
+ <tr>
+ <td><h5>state.backend.rocksdb.compaction.style</h5></td>
+ <td style="word-wrap: break-word;">(none)</td>
+ <td>The specified compaction style for DB. Candidate compaction
style is LEVEL, FIFO or UNIVERSAL, and RocksDB choose 'LEVEL' as default
style.</td>
+ </tr>
+ <tr>
+ <td><h5>state.backend.rocksdb.files.open</h5></td>
+ <td style="word-wrap: break-word;">(none)</td>
+ <td>The maximum number of open files that can be used by the DB,
'-1' means no limit. RocksDB has default configuration as '5000'.</td>
+ </tr>
+ <tr>
+ <td><h5>state.backend.rocksdb.thread.num</h5></td>
+ <td style="word-wrap: break-word;">(none)</td>
+ <td>The maximum number of concurrent background flush and
compaction jobs. RocksDB has default configuration as '1'.</td>
+ </tr>
+ <tr>
+ <td><h5>state.backend.rocksdb.writebuffer.count</h5></td>
+ <td style="word-wrap: break-word;">(none)</td>
+ <td>Tne maximum number of write buffers that are built up in
memory. RocksDB has default configuration as '2'.</td>
+ </tr>
+ <tr>
+ <td><h5>state.backend.rocksdb.writebuffer.number-to-merge</h5></td>
+ <td style="word-wrap: break-word;">(none)</td>
+ <td>The minimum number of write buffers that will be merged
together before writing to storage. RocksDB has default configuration as
'1'.</td>
+ </tr>
+ <tr>
+ <td><h5>state.backend.rocksdb.writebuffer.size</h5></td>
+ <td style="word-wrap: break-word;">(none)</td>
+ <td>The amount of data built up in memory (backed by an unsorted
log on disk) before converting to a sorted on-disk files. RocksDB has default
writebuffer size as '4MB'.</td>
+ </tr>
+ </tbody>
+</table>
diff --git a/docs/_includes/generated/rocks_db_configuration.html
b/docs/_includes/generated/rocks_db_configuration.html
index b88c18e..88e8c46 100644
--- a/docs/_includes/generated/rocks_db_configuration.html
+++ b/docs/_includes/generated/rocks_db_configuration.html
@@ -18,6 +18,16 @@
<td>The local directory (on the TaskManager) where RocksDB puts
its files.</td>
</tr>
<tr>
+ <td><h5>state.backend.rocksdb.options-factory</h5></td>
+ <td style="word-wrap:
break-word;">"org.apache.flink.contrib.streaming.state.DefaultConfigurableOptionsFactory"</td>
+ <td>The options factory class for RocksDB to create DBOptions and
ColumnFamilyOptions. The default options factory is
org.apache.flink.contrib.streaming.state.DefaultConfigurableOptionsFactory, and
it would read the configured options which provided in
'RocksDBConfigurableOptions'.</td>
+ </tr>
+ <tr>
+ <td><h5>state.backend.rocksdb.predefined-options</h5></td>
+ <td style="word-wrap: break-word;">"DEFAULT"</td>
+ <td>The predefined settings for RocksDB DBOptions and
ColumnFamilyOptions by Flink community. Current supported candidate
predefined-options are DEFAULT, SPINNING_DISK_OPTIMIZED,
SPINNING_DISK_OPTIMIZED_HIGH_MEM or FLASH_SSD_OPTIMIZED. Note that user
customized options and options from the OptionsFactory are applied on top of
these predefined ones.</td>
+ </tr>
+ <tr>
<td><h5>state.backend.rocksdb.timer-service.factory</h5></td>
<td style="word-wrap: break-word;">"HEAP"</td>
<td>This determines the factory for timer service state
implementation. Options are either HEAP (heap-based, default) or ROCKSDB for an
implementation based on RocksDB .</td>
diff --git a/docs/ops/config.md b/docs/ops/config.md
index 5c9ad82..17c0814 100644
--- a/docs/ops/config.md
+++ b/docs/ops/config.md
@@ -162,6 +162,13 @@ The configuration keys in this section are independent of
the used resource mana
{% include generated/rocks_db_configuration.html %}
+### RocksDB Configurable Options
+Specific RocksDB configurable options, provided by Flink, to create a
corresponding `ConfigurableOptionsFactory`.
+And the created one would be used as default `OptionsFactory` in
`RocksDBStateBackend`
+unless user define a `OptionsFactory` and set via
`RocksDBStateBackend.setOptions(optionsFactory)`
+
+{% include generated/rocks_db_configurable_configuration.html %}
+
### Queryable State
{% include generated/queryable_state_configuration.html %}
diff --git a/docs/ops/state/large_state_tuning.md
b/docs/ops/state/large_state_tuning.md
index d175ca6..effea99 100644
--- a/docs/ops/state/large_state_tuning.md
+++ b/docs/ops/state/large_state_tuning.md
@@ -153,36 +153,61 @@ Possible choices are `heap` (to store timers on the heap,
default) and `rocksdb`
<span class="label label-info">Note</span> *The combination RocksDB state
backend / with incremental checkpoint / with heap-based timers currently does
NOT support asynchronous snapshots for the timers state.
Other state like keyed state is still snapshotted asynchronously. Please note
that this is not a regression from previous versions and will be resolved with
`FLINK-10026`.*
-**Passing Options to RocksDB**
+**Predefined Options**
-{% highlight java %}
-RocksDBStateBackend.setOptions(new MyOptions());
+Flink provides some predefined collections of option for RocksDB for different
settings, and there existed two ways
+to pass these predefined options to RocksDB:
+ - Configure the predefined options through `flink-conf.yaml` via option key
`state.backend.rocksdb.predefined-options`.
+ The default value of this option is `DEFAULT` which means
`PredefinedOptions.DEFAULT`.
+ - Set the predefined options programmatically, e.g.
`RocksDBStateBackend.setPredefinedOptions(PredefinedOptions.SPINNING_DISK_OPTIMIZED_HIGH_MEM)`.
-public class MyOptions implements OptionsFactory {
+We expect to accumulate more such profiles over time. Feel free to contribute
such predefined option profiles when you
+found a set of options that work well and seem representative for certain
workloads.
- @Override
- public DBOptions createDBOptions(DBOptions currentOptions) {
- return currentOptions.setIncreaseParallelism(4)
- .setUseFsync(false);
- }
-
- @Override
- public ColumnFamilyOptions createColumnOptions(ColumnFamilyOptions
currentOptions) {
- return currentOptions.setTableFormatConfig(
- new BlockBasedTableConfig()
- .setBlockCacheSize(256 * 1024 * 1024) // 256 MB
- .setBlockSize(128 * 1024)); // 128 KB
- }
-}
-{% endhighlight %}
+<span class="label label-info">Note</span> Predefined options which set
programmatically would override the one configured via `flink-conf.yaml`.
-**Predefined Options**
+**Passing Options Factory to RocksDB**
-Flink provides some predefined collections of option for RocksDB for different
settings, which can be set for example via
-`RocksDBStateBackend.setPredefinedOptions(PredefinedOptions.SPINNING_DISK_OPTIMIZED_HIGH_MEM)`.
+There existed two ways to pass options factory to RocksDB in Flink:
-We expect to accumulate more such profiles over time. Feel free to contribute
such predefined option profiles when you
-found a set of options that work well and seem representative for certain
workloads.
+ - Configure options factory through `flink-conf.yaml`. You could set the
options factory class name via option key
`state.backend.rocksdb.options-factory`.
+ The default value for this option is
`org.apache.flink.contrib.streaming.state.DefaultConfigurableOptionsFactory`,
and all candidate configurable options are defined in
`RocksDBConfigurableOptions`.
+ Moreover, you could also define your customized and configurable options
factory class like below and pass the class name to
`state.backend.rocksdb.options-factory`.
+
+ {% highlight java %}
+
+ public class MyOptionsFactory implements ConfigurableOptionsFactory {
+
+ private static final long DEFAULT_SIZE = 256 * 1024 * 1024; // 256 MB
+ private long blockCacheSize = DEFAULT_SIZE;
+
+ @Override
+ public DBOptions createDBOptions(DBOptions currentOptions) {
+ return currentOptions.setIncreaseParallelism(4)
+ .setUseFsync(false);
+ }
+
+ @Override
+ public ColumnFamilyOptions createColumnOptions(ColumnFamilyOptions
currentOptions) {
+ return currentOptions.setTableFormatConfig(
+ new BlockBasedTableConfig()
+ .setBlockCacheSize(blockCacheSize)
+ .setBlockSize(128 * 1024)); // 128 KB
+ }
+
+ @Override
+ public OptionsFactory configure(Configuration configuration) {
+ this.blockCacheSize =
+ configuration.getLong("my.custom.rocksdb.block.cache.size",
DEFAULT_SIZE);
+ return this;
+ }
+ }
+ {% endhighlight %}
+
+ - Set the options factory programmatically, e.g.
`RocksDBStateBackend.setOptions(new MyOptionsFactory());`
+
+<span class="label label-info">Note</span> Options factory which set
programmatically would override the one configured via `flink-conf.yaml`,
+and options factory has a higher priority over the predefined options if ever
configured or set.
<span class="label label-info">Note</span> RocksDB is a native library that
allocates memory directly from the process,
and not from the JVM. Any memory you assign to RocksDB will have to be
accounted for, typically by decreasing the JVM heap size
diff --git
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/ConfigurableOptionsFactory.java
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/ConfigurableOptionsFactory.java
new file mode 100644
index 0000000..80d1a1f
--- /dev/null
+++
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/ConfigurableOptionsFactory.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.contrib.streaming.state;
+
+import org.apache.flink.configuration.Configuration;
+
+/**
+ * An interface for options factory that pick up additional parameters from a
configuration.
+ */
+public interface ConfigurableOptionsFactory extends OptionsFactory {
+
+ /**
+ * Creates a variant of the options factory that applies additional
configuration parameters.
+ *
+ * <p>If no configuration is applied, or if the method directly applies
configuration values to
+ * the (mutable) options factory object, this method may return the
original options factory object.
+ * Otherwise it typically returns a modified copy.
+ *
+ * @param configuration The configuration to pick the values from.
+ * @return A reconfigured options factory.
+ */
+ OptionsFactory configure(Configuration configuration);
+}
diff --git
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/DefaultConfigurableOptionsFactory.java
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/DefaultConfigurableOptionsFactory.java
new file mode 100644
index 0000000..5f5f7d1
--- /dev/null
+++
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/DefaultConfigurableOptionsFactory.java
@@ -0,0 +1,425 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.contrib.streaming.state;
+
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.MemorySize;
+import org.apache.flink.util.Preconditions;
+
+import org.rocksdb.BlockBasedTableConfig;
+import org.rocksdb.ColumnFamilyOptions;
+import org.rocksdb.CompactionStyle;
+import org.rocksdb.DBOptions;
+import org.rocksdb.PlainTableConfig;
+import org.rocksdb.TableFormatConfig;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import static
org.apache.flink.contrib.streaming.state.RocksDBConfigurableOptions.BLOCK_CACHE_SIZE;
+import static
org.apache.flink.contrib.streaming.state.RocksDBConfigurableOptions.BLOCK_SIZE;
+import static
org.apache.flink.contrib.streaming.state.RocksDBConfigurableOptions.COMPACTION_STYLE;
+import static
org.apache.flink.contrib.streaming.state.RocksDBConfigurableOptions.MAX_BACKGROUND_THREADS;
+import static
org.apache.flink.contrib.streaming.state.RocksDBConfigurableOptions.MAX_OPEN_FILES;
+import static
org.apache.flink.contrib.streaming.state.RocksDBConfigurableOptions.MAX_SIZE_LEVEL_BASE;
+import static
org.apache.flink.contrib.streaming.state.RocksDBConfigurableOptions.MAX_WRITE_BUFFER_NUMBER;
+import static
org.apache.flink.contrib.streaming.state.RocksDBConfigurableOptions.MIN_WRITE_BUFFER_NUMBER_TO_MERGE;
+import static
org.apache.flink.contrib.streaming.state.RocksDBConfigurableOptions.TARGET_FILE_SIZE_BASE;
+import static
org.apache.flink.contrib.streaming.state.RocksDBConfigurableOptions.USE_DYNAMIC_LEVEL_SIZE;
+import static
org.apache.flink.contrib.streaming.state.RocksDBConfigurableOptions.WRITE_BUFFER_SIZE;
+
+/**
+ * An implementation of {@link ConfigurableOptionsFactory} using options
provided by {@link RocksDBConfigurableOptions}
+ * and acted as the default options factory within {@link RocksDBStateBackend}
if user not defined a {@link OptionsFactory}.
+ *
+ * <p>This implementation also provide some setters to let user could create a
{@link OptionsFactory} conveniently。
+ */
+public class DefaultConfigurableOptionsFactory implements
ConfigurableOptionsFactory {
+
+ private final Map<String, String> configuredOptions = new HashMap<>();
+
+ @Override
+ public DBOptions createDBOptions(DBOptions currentOptions) {
+ if (isOptionConfigured(MAX_BACKGROUND_THREADS)) {
+
currentOptions.setIncreaseParallelism(getMaxBackgroundThreads());
+ }
+
+ if (isOptionConfigured(MAX_OPEN_FILES)) {
+ currentOptions.setMaxOpenFiles(getMaxOpenFiles());
+ }
+
+ return currentOptions;
+ }
+
+ @Override
+ public ColumnFamilyOptions createColumnOptions(ColumnFamilyOptions
currentOptions) {
+ if (isOptionConfigured(COMPACTION_STYLE)) {
+ currentOptions.setCompactionStyle(getCompactionStyle());
+ }
+
+ if (isOptionConfigured(USE_DYNAMIC_LEVEL_SIZE)) {
+
currentOptions.setLevelCompactionDynamicLevelBytes(getUseDynamicLevelSize());
+ }
+
+ if (isOptionConfigured(TARGET_FILE_SIZE_BASE)) {
+
currentOptions.setTargetFileSizeBase(getTargetFileSizeBase());
+ }
+
+ if (isOptionConfigured(MAX_SIZE_LEVEL_BASE)) {
+
currentOptions.setMaxBytesForLevelBase(getMaxSizeLevelBase());
+ }
+
+ if (isOptionConfigured(WRITE_BUFFER_SIZE)) {
+ currentOptions.setWriteBufferSize(getWriteBufferSize());
+ }
+
+ if (isOptionConfigured(MAX_WRITE_BUFFER_NUMBER)) {
+
currentOptions.setMaxWriteBufferNumber(getMaxWriteBufferNumber());
+ }
+
+ if (isOptionConfigured(MIN_WRITE_BUFFER_NUMBER_TO_MERGE)) {
+
currentOptions.setMinWriteBufferNumberToMerge(getMinWriteBufferNumberToMerge());
+ }
+
+ TableFormatConfig tableFormatConfig =
currentOptions.tableFormatConfig();
+
+ BlockBasedTableConfig blockBasedTableConfig;
+ if (tableFormatConfig == null) {
+ blockBasedTableConfig = new BlockBasedTableConfig();
+ } else {
+ if (tableFormatConfig instanceof PlainTableConfig) {
+ // if the table format config is
PlainTableConfig, we just return current column-family options
+ return currentOptions;
+ } else {
+ blockBasedTableConfig = (BlockBasedTableConfig)
tableFormatConfig;
+ }
+ }
+
+ if (isOptionConfigured(BLOCK_SIZE)) {
+ blockBasedTableConfig.setBlockSize(getBlockSize());
+ }
+
+ if (isOptionConfigured(BLOCK_CACHE_SIZE)) {
+
blockBasedTableConfig.setBlockCacheSize(getBlockCacheSize());
+ }
+
+ return
currentOptions.setTableFormatConfig(blockBasedTableConfig);
+ }
+
+ public Map<String, String> getConfiguredOptions() {
+ return new HashMap<>(configuredOptions);
+ }
+
+ private boolean isOptionConfigured(ConfigOption configOption) {
+ return configuredOptions.containsKey(configOption.key());
+ }
+
+
//--------------------------------------------------------------------------
+ // Maximum number of concurrent background flush and compaction threads
+
//--------------------------------------------------------------------------
+
+ private int getMaxBackgroundThreads() {
+ return
Integer.parseInt(getInternal(MAX_BACKGROUND_THREADS.key()));
+ }
+
+ public DefaultConfigurableOptionsFactory setMaxBackgroundThreads(int
totalThreadCount) {
+ Preconditions.checkArgument(totalThreadCount > 0);
+ configuredOptions.put(MAX_BACKGROUND_THREADS.key(),
String.valueOf(totalThreadCount));
+ return this;
+ }
+
+
//--------------------------------------------------------------------------
+ // Maximum number of open files
+
//--------------------------------------------------------------------------
+
+ private int getMaxOpenFiles() {
+ return Integer.parseInt(getInternal(MAX_OPEN_FILES.key()));
+ }
+
+ public DefaultConfigurableOptionsFactory setMaxOpenFiles(int
maxOpenFiles) {
+ configuredOptions.put(MAX_OPEN_FILES.key(),
String.valueOf(maxOpenFiles));
+ return this;
+ }
+
+
//--------------------------------------------------------------------------
+ // The style of compaction for DB.
+
//--------------------------------------------------------------------------
+
+ private CompactionStyle getCompactionStyle() {
+ return
CompactionStyle.valueOf(getInternal(COMPACTION_STYLE.key()).toUpperCase());
+ }
+
+ public DefaultConfigurableOptionsFactory
setCompactionStyle(CompactionStyle compactionStyle) {
+ setInternal(COMPACTION_STYLE.key(), compactionStyle.name());
+ return this;
+ }
+
+
//--------------------------------------------------------------------------
+ // Whether to configure RocksDB to pick target size of each level
dynamically.
+
//--------------------------------------------------------------------------
+
+ private boolean getUseDynamicLevelSize() {
+ return
getInternal(USE_DYNAMIC_LEVEL_SIZE.key()).compareToIgnoreCase("false") != 0;
+ }
+
+ public DefaultConfigurableOptionsFactory setUseDynamicLevelSize(boolean
value) {
+ configuredOptions.put(USE_DYNAMIC_LEVEL_SIZE.key(), value ?
"true" : "false");
+ return this;
+ }
+
+
//--------------------------------------------------------------------------
+ // The target file size for compaction, i.e., the per-file size for
level-1
+
//--------------------------------------------------------------------------
+
+ private long getTargetFileSizeBase() {
+ return
MemorySize.parseBytes(getInternal(TARGET_FILE_SIZE_BASE.key()));
+ }
+
+ public DefaultConfigurableOptionsFactory setTargetFileSizeBase(String
targetFileSizeBase) {
+
Preconditions.checkArgument(MemorySize.parseBytes(targetFileSizeBase) > 0,
+ "Invalid configuration " + targetFileSizeBase + " for
target file size base.");
+ setInternal(TARGET_FILE_SIZE_BASE.key(), targetFileSizeBase);
+ return this;
+ }
+
+
+
//--------------------------------------------------------------------------
+ // Maximum total data size for a level, i.e., the max total size for
level-1
+
//--------------------------------------------------------------------------
+
+ private long getMaxSizeLevelBase() {
+ return
MemorySize.parseBytes(getInternal(MAX_SIZE_LEVEL_BASE.key()));
+ }
+
+ public DefaultConfigurableOptionsFactory setMaxSizeLevelBase(String
maxSizeLevelBase) {
+
Preconditions.checkArgument(MemorySize.parseBytes(maxSizeLevelBase) > 0,
+ "Invalid configuration " + maxSizeLevelBase + " for max
size of level base.");
+ setInternal(MAX_SIZE_LEVEL_BASE.key(), maxSizeLevelBase);
+ return this;
+ }
+
+
//--------------------------------------------------------------------------
+ // Amount of data to build up in memory (backed by an unsorted log on
disk)
+ // before converting to a sorted on-disk file. Larger values increase
+ // performance, especially during bulk loads.
+
//--------------------------------------------------------------------------
+
+ private long getWriteBufferSize() {
+ return
MemorySize.parseBytes(getInternal(WRITE_BUFFER_SIZE.key()));
+ }
+
+ public DefaultConfigurableOptionsFactory setWriteBufferSize(String
writeBufferSize) {
+
Preconditions.checkArgument(MemorySize.parseBytes(writeBufferSize) > 0,
+ "Invalid configuration " + writeBufferSize + " for
write-buffer size.");
+
+ setInternal(WRITE_BUFFER_SIZE.key(), writeBufferSize);
+ return this;
+ }
+
+
//--------------------------------------------------------------------------
+ // The maximum number of write buffers that are built up in memory.
+
//--------------------------------------------------------------------------
+
+ private int getMaxWriteBufferNumber() {
+ return
Integer.parseInt(getInternal(MAX_WRITE_BUFFER_NUMBER.key()));
+ }
+
+ public DefaultConfigurableOptionsFactory setMaxWriteBufferNumber(int
writeBufferNumber) {
+ Preconditions.checkArgument(writeBufferNumber > 0,
+ "Invalid configuration " + writeBufferNumber + " for
max write-buffer number.");
+ setInternal(MAX_WRITE_BUFFER_NUMBER.key(),
Integer.toString(writeBufferNumber));
+ return this;
+ }
+
+
//--------------------------------------------------------------------------
+ // The minimum number that will be merged together before writing to
storage
+
//--------------------------------------------------------------------------
+
+ private int getMinWriteBufferNumberToMerge() {
+ return
Integer.parseInt(getInternal(MIN_WRITE_BUFFER_NUMBER_TO_MERGE.key()));
+ }
+
+ public DefaultConfigurableOptionsFactory
setMinWriteBufferNumberToMerge(int writeBufferNumber) {
+ Preconditions.checkArgument(writeBufferNumber > 0,
+ "Invalid configuration " + writeBufferNumber + " for
min write-buffer number to merge.");
+ setInternal(MIN_WRITE_BUFFER_NUMBER_TO_MERGE.key(),
Integer.toString(writeBufferNumber));
+ return this;
+ }
+
+
//--------------------------------------------------------------------------
+ // Approximate size of user data packed per block. Note that the block
size
+ // specified here corresponds to uncompressed data. The actual size of
the
+ // unit read from disk may be smaller if compression is enabled
+
//--------------------------------------------------------------------------
+
+ private long getBlockSize() {
+ return MemorySize.parseBytes(getInternal(BLOCK_SIZE.key()));
+ }
+
+ public DefaultConfigurableOptionsFactory setBlockSize(String blockSize)
{
+ Preconditions.checkArgument(MemorySize.parseBytes(blockSize) >
0,
+ "Invalid configuration " + blockSize + " for block
size.");
+ setInternal(BLOCK_SIZE.key(), blockSize);
+ return this;
+ }
+
+
//--------------------------------------------------------------------------
+ // The amount of the cache for data blocks in RocksDB
+
//--------------------------------------------------------------------------
+
+ private long getBlockCacheSize() {
+ return
MemorySize.parseBytes(getInternal(BLOCK_CACHE_SIZE.key()));
+ }
+
+ public DefaultConfigurableOptionsFactory setBlockCacheSize(String
blockCacheSize) {
+
Preconditions.checkArgument(MemorySize.parseBytes(blockCacheSize) > 0,
+ "Invalid configuration " + blockCacheSize + " for block
cache size.");
+ setInternal(BLOCK_CACHE_SIZE.key(), blockCacheSize);
+
+ return this;
+ }
+
+ private static final String[] CANDIDATE_CONFIGS = new String[]{
+ // configurable DBOptions
+ MAX_BACKGROUND_THREADS.key(),
+ MAX_OPEN_FILES.key(),
+
+ // configurable ColumnFamilyOptions
+ COMPACTION_STYLE.key(),
+ USE_DYNAMIC_LEVEL_SIZE.key(),
+ TARGET_FILE_SIZE_BASE.key(),
+ MAX_SIZE_LEVEL_BASE.key(),
+ WRITE_BUFFER_SIZE.key(),
+ MAX_WRITE_BUFFER_NUMBER.key(),
+ MIN_WRITE_BUFFER_NUMBER_TO_MERGE.key(),
+ BLOCK_SIZE.key(),
+ BLOCK_CACHE_SIZE.key()
+ };
+
+ private static final Set<String> POSITIVE_INT_CONFIG_SET = new
HashSet<>(Arrays.asList(
+ MAX_BACKGROUND_THREADS.key(),
+ MAX_WRITE_BUFFER_NUMBER.key(),
+ MIN_WRITE_BUFFER_NUMBER_TO_MERGE.key()
+ ));
+
+ private static final Set<String> SIZE_CONFIG_SET = new
HashSet<>(Arrays.asList(
+ TARGET_FILE_SIZE_BASE.key(),
+ MAX_SIZE_LEVEL_BASE.key(),
+ WRITE_BUFFER_SIZE.key(),
+ BLOCK_SIZE.key(),
+ BLOCK_CACHE_SIZE.key()
+ ));
+
+ private static final Set<String> BOOLEAN_CONFIG_SET = new
HashSet<>(Collections.singletonList(
+ USE_DYNAMIC_LEVEL_SIZE.key()
+ ));
+
+ private static final Set<String> COMPACTION_STYLE_SET =
Arrays.stream(CompactionStyle.values())
+ .map(c -> c.name().toLowerCase()).collect(Collectors.toSet());
+
+ /**
+ * Creates a {@link DefaultConfigurableOptionsFactory} instance from a
{@link Configuration}.
+ *
+ * <p>If no options within {@link RocksDBConfigurableOptions} has ever
been configured,
+ * the created OptionsFactory would not override anything defined in
{@link PredefinedOptions}.
+ *
+ * @param configuration Configuration to be used for the
ConfigurableOptionsFactory creation
+ * @return A ConfigurableOptionsFactory created from the given
configuration
+ */
+ @Override
+ public DefaultConfigurableOptionsFactory configure(Configuration
configuration) {
+ for (String key : CANDIDATE_CONFIGS) {
+ String newValue = configuration.getString(key, null);
+
+ if (newValue != null) {
+ if (checkArgumentValid(key, newValue)) {
+ this.configuredOptions.put(key,
newValue);
+ }
+ }
+ }
+ return this;
+ }
+
+ @Override
+ public String toString() {
+ return "DefaultConfigurableOptionsFactory{" +
+ "configuredOptions=" + configuredOptions +
+ '}';
+ }
+
+ /**
+ * Helper method to check whether the (key,value) is valid through
given configuration and returns the formatted value.
+ *
+ * @param key The configuration key which is configurable in {@link
RocksDBConfigurableOptions}.
+ * @param value The value within given configuration.
+ *
+ * @return whether the given key and value in string format is legal.
+ */
+ private static boolean checkArgumentValid(String key, String value) {
+ if (POSITIVE_INT_CONFIG_SET.contains(key)) {
+
+ Preconditions.checkArgument(Integer.parseInt(value) > 0,
+ "Configured value for key: " + key + " must be
larger than 0.");
+ } else if (SIZE_CONFIG_SET.contains(key)) {
+
+
Preconditions.checkArgument(MemorySize.parseBytes(value) > 0,
+ "Configured size for key" + key + " must be
larger than 0.");
+ } else if (BOOLEAN_CONFIG_SET.contains(key)) {
+
+
Preconditions.checkArgument("true".equalsIgnoreCase(value) ||
"false".equalsIgnoreCase(value),
+ "The configured boolean value: " + value + "
for key: " + key + " is illegal.");
+ } else if (key.equals(COMPACTION_STYLE.key())) {
+ value = value.toLowerCase();
+
Preconditions.checkArgument(COMPACTION_STYLE_SET.contains(value),
+ "Compression type: " + value + " is not
recognized with legal types: " + String.join(", ", COMPACTION_STYLE_SET));
+ }
+ return true;
+ }
+
+ /**
+ * Sets the configuration with (key, value) if the key is predefined,
otherwise throws IllegalArgumentException.
+ *
+ * @param key The configuration key, if key is not predefined, throws
IllegalArgumentException out.
+ * @param value The configuration value.
+ */
+ private void setInternal(String key, String value) {
+ Preconditions.checkArgument(value != null && !value.isEmpty(),
+ "The configuration value must not be empty.");
+
+ configuredOptions.put(key, value);
+ }
+
+ /**
+ * Returns the value in string format with the given key.
+ *
+ * @param key The configuration-key to query in string format.
+ */
+ private String getInternal(String key) {
+ Preconditions.checkArgument(configuredOptions.containsKey(key),
+ "The configuration " + key + " has not been
configured.");
+
+ return configuredOptions.get(key);
+ }
+}
diff --git
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBConfigurableOptions.java
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBConfigurableOptions.java
new file mode 100644
index 0000000..089f08c
--- /dev/null
+++
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBConfigurableOptions.java
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.contrib.streaming.state;
+
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.description.Description;
+
+import java.io.Serializable;
+
+import static org.apache.flink.configuration.ConfigOptions.key;
+import static org.apache.flink.configuration.description.LinkElement.link;
+import static org.rocksdb.CompactionStyle.FIFO;
+import static org.rocksdb.CompactionStyle.LEVEL;
+import static org.rocksdb.CompactionStyle.UNIVERSAL;
+
+/**
+ * This class contains the configuration options for the {@link
DefaultConfigurableOptionsFactory}.
+ *
+ * <p>If nothing specified, RocksDB's options would be configured by {@link
PredefinedOptions} and user-defined {@link OptionsFactory}.
+ *
+ * <p>If some options has been specifically configured, a corresponding {@link
DefaultConfigurableOptionsFactory} would be created
+ * and applied on top of {@link PredefinedOptions} except if a user-defined
{@link OptionsFactory} overrides it.
+ */
+public class RocksDBConfigurableOptions implements Serializable {
+
+
//--------------------------------------------------------------------------
+ // Provided configurable DBOptions within Flink
+
//--------------------------------------------------------------------------
+
+ public static final ConfigOption<String> MAX_BACKGROUND_THREADS =
+ key("state.backend.rocksdb.thread.num")
+ .noDefaultValue()
+ .withDescription("The maximum number of concurrent
background flush and compaction jobs. " +
+ "RocksDB has default configuration as '1'.");
+
+ public static final ConfigOption<String> MAX_OPEN_FILES =
+ key("state.backend.rocksdb.files.open")
+ .noDefaultValue()
+ .withDescription("The maximum number of open files that
can be used by the DB, '-1' means no limit. " +
+ "RocksDB has default configuration as '5000'.");
+
+
//--------------------------------------------------------------------------
+ // Provided configurable ColumnFamilyOptions within Flink
+
//--------------------------------------------------------------------------
+
+ public static final ConfigOption<String> COMPACTION_STYLE =
+ key("state.backend.rocksdb.compaction.style")
+ .noDefaultValue()
+ .withDescription(String.format("The specified
compaction style for DB. Candidate compaction style is %s, %s or %s, " +
+ "and RocksDB choose '%s' as default
style.", LEVEL.name(), FIFO.name(), UNIVERSAL.name(),
+ LEVEL.name()));
+
+ public static final ConfigOption<String> USE_DYNAMIC_LEVEL_SIZE =
+ key("state.backend.rocksdb.compaction.level.use-dynamic-size")
+ .noDefaultValue()
+ .withDescription(Description.builder().text("If true,
RocksDB will pick target size of each level dynamically. From an empty DB, ")
+ .text("RocksDB would make last level the base
level, which means merging L0 data into the last level, ")
+ .text("until it exceeds
max_bytes_for_level_base. And then repeat this process for second last level
and so on. ")
+ .text("RocksDB has default configuration as
'false'. ")
+ .text("For more information, please refer to
%s",
+
link("https://github.com/facebook/rocksdb/wiki/Leveled-Compaction#level_compaction_dynamic_level_bytes-is-true",
+ "RocksDB's doc."))
+ .build());
+
+ public static final ConfigOption<String> TARGET_FILE_SIZE_BASE =
+
key("state.backend.rocksdb.compaction.level.target-file-size-base")
+ .noDefaultValue()
+ .withDescription("The target file size for compaction,
which determines a level-1 file size. " +
+ "RocksDB has default configuration as '2MB'.");
+
+ public static final ConfigOption<String> MAX_SIZE_LEVEL_BASE =
+
key("state.backend.rocksdb.compaction.level.max-size-level-base")
+ .noDefaultValue()
+ .withDescription("The upper-bound of the total size of
level base files in bytes. " +
+ "RocksDB has default configuration as '10MB'.");
+
+ public static final ConfigOption<String> WRITE_BUFFER_SIZE =
+ key("state.backend.rocksdb.writebuffer.size")
+ .noDefaultValue()
+ .withDescription("The amount of data built up in memory
(backed by an unsorted log on disk) " +
+ "before converting to a sorted on-disk files.
RocksDB has default writebuffer size as '4MB'.");
+
+ public static final ConfigOption<String> MAX_WRITE_BUFFER_NUMBER =
+ key("state.backend.rocksdb.writebuffer.count")
+ .noDefaultValue()
+ .withDescription("Tne maximum number of write buffers
that are built up in memory. " +
+ "RocksDB has default configuration as '2'.");
+
+ public static final ConfigOption<String>
MIN_WRITE_BUFFER_NUMBER_TO_MERGE =
+ key("state.backend.rocksdb.writebuffer.number-to-merge")
+ .noDefaultValue()
+ .withDescription("The minimum number of write buffers
that will be merged together before writing to storage. " +
+ "RocksDB has default configuration as '1'.");
+
+ public static final ConfigOption<String> BLOCK_SIZE =
+ key("state.backend.rocksdb.block.blocksize")
+ .noDefaultValue()
+ .withDescription("The approximate size (in bytes) of
user data packed per block. " +
+ "RocksDB has default blocksize as '4KB'.");
+
+ public static final ConfigOption<String> BLOCK_CACHE_SIZE =
+ key("state.backend.rocksdb.block.cache-size")
+ .noDefaultValue()
+ .withDescription("The amount of the cache for data
blocks in RocksDB. " +
+ "RocksDB has default block-cache size as
'8MB'.");
+
+}
diff --git
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBOptions.java
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBOptions.java
index f7a5557..8fa497b 100644
---
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBOptions.java
+++
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBOptions.java
@@ -21,6 +21,10 @@ package org.apache.flink.contrib.streaming.state;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.ConfigOptions;
+import static
org.apache.flink.contrib.streaming.state.PredefinedOptions.DEFAULT;
+import static
org.apache.flink.contrib.streaming.state.PredefinedOptions.FLASH_SSD_OPTIMIZED;
+import static
org.apache.flink.contrib.streaming.state.PredefinedOptions.SPINNING_DISK_OPTIMIZED;
+import static
org.apache.flink.contrib.streaming.state.PredefinedOptions.SPINNING_DISK_OPTIMIZED_HIGH_MEM;
import static
org.apache.flink.contrib.streaming.state.RocksDBStateBackend.PriorityQueueStateType.HEAP;
import static
org.apache.flink.contrib.streaming.state.RocksDBStateBackend.PriorityQueueStateType.ROCKSDB;
@@ -61,4 +65,25 @@ public class RocksDBOptions {
.withDescription("This determines if compaction filter to
cleanup state with TTL is enabled for backend." +
"Note: User can still decide in state TTL configuration
in state descriptor " +
"whether the filter is active for particular state or
not.");
+ /**
+ * The predefined settings for RocksDB DBOptions and
ColumnFamilyOptions by Flink community.
+ */
+ public static final ConfigOption<String> PREDEFINED_OPTIONS =
ConfigOptions
+ .key("state.backend.rocksdb.predefined-options")
+ .defaultValue(DEFAULT.name())
+ .withDescription(String.format("The predefined settings for
RocksDB DBOptions and ColumnFamilyOptions by Flink community. " +
+ "Current supported candidate predefined-options are %s,
%s, %s or %s. Note that user customized options and options " +
+ "from the OptionsFactory are applied on top of these
predefined ones.",
+ DEFAULT.name(), SPINNING_DISK_OPTIMIZED.name(),
SPINNING_DISK_OPTIMIZED_HIGH_MEM.name(), FLASH_SSD_OPTIMIZED.name()));
+
+ /**
+ * The options factory class for RocksDB to create DBOptions and
ColumnFamilyOptions.
+ */
+ public static final ConfigOption<String> OPTIONS_FACTORY = ConfigOptions
+ .key("state.backend.rocksdb.options-factory")
+ .defaultValue(DefaultConfigurableOptionsFactory.class.getName())
+ .withDescription(String.format("The options factory class for
RocksDB to create DBOptions and ColumnFamilyOptions. " +
+ "The default options factory is %s, and it
would read the configured options which provided in
'RocksDBConfigurableOptions'.",
+
DefaultConfigurableOptionsFactory.class.getName()));
+
}
diff --git
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
index 5db2e9b..a1bc0c4 100644
---
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
+++
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
@@ -45,6 +45,8 @@ import
org.apache.flink.runtime.state.UncompressedStreamCompressionDecorator;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.runtime.state.ttl.TtlTimeProvider;
import org.apache.flink.util.AbstractID;
+import org.apache.flink.util.DynamicCodeLoadingException;
+import org.apache.flink.util.FlinkRuntimeException;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.TernaryBoolean;
@@ -123,7 +125,8 @@ public class RocksDBStateBackend extends
AbstractStateBackend implements Configu
private File[] localRocksDbDirectories;
/** The pre-configured option settings. */
- private PredefinedOptions predefinedOptions = PredefinedOptions.DEFAULT;
+ @Nullable
+ private PredefinedOptions predefinedOptions;
/** The options factory to create the RocksDB options in the cluster. */
@Nullable
@@ -338,9 +341,21 @@ public class RocksDBStateBackend extends
AbstractStateBackend implements Configu
// configure metric options
this.defaultMetricOptions =
RocksDBNativeMetricOptions.fromConfig(config);
- // copy remaining settings
- this.predefinedOptions = original.predefinedOptions;
- this.optionsFactory = original.optionsFactory;
+ // configure RocksDB predefined options
+ this.predefinedOptions = original.predefinedOptions == null ?
+
PredefinedOptions.valueOf(config.getString(RocksDBOptions.PREDEFINED_OPTIONS))
: original.predefinedOptions;
+ LOG.info("Using predefined options: {}.",
predefinedOptions.name());
+
+ // configure RocksDB options factory
+ try {
+ this.optionsFactory = configureOptionsFactory(
+ original.optionsFactory,
+
config.getString(RocksDBOptions.OPTIONS_FACTORY),
+ config,
+ classLoader);
+ } catch (DynamicCodeLoadingException e) {
+ throw new FlinkRuntimeException(e);
+ }
}
//
------------------------------------------------------------------------
@@ -524,6 +539,53 @@ public class RocksDBStateBackend extends
AbstractStateBackend implements Configu
asyncSnapshots);
}
+ private OptionsFactory configureOptionsFactory(
+ @Nullable OptionsFactory originalOptionsFactory,
+ String factoryClassName,
+ Configuration config,
+ ClassLoader classLoader) throws
DynamicCodeLoadingException {
+
+ if (originalOptionsFactory != null) {
+ if (originalOptionsFactory instanceof
ConfigurableOptionsFactory) {
+ originalOptionsFactory =
((ConfigurableOptionsFactory) originalOptionsFactory).configure(config);
+ }
+ LOG.info("Using application-defined options factory:
{}.", originalOptionsFactory);
+
+ return originalOptionsFactory;
+ }
+
+ // if using DefaultConfigurableOptionsFactory by default, we
could avoid reflection to speed up.
+ if
(factoryClassName.equalsIgnoreCase(DefaultConfigurableOptionsFactory.class.getName()))
{
+ DefaultConfigurableOptionsFactory optionsFactory = new
DefaultConfigurableOptionsFactory();
+ optionsFactory.configure(config);
+ LOG.info("Using default options factory: {}.",
optionsFactory);
+
+ return optionsFactory;
+ } else {
+ try {
+ @SuppressWarnings("rawtypes")
+ Class<? extends OptionsFactory> clazz =
+ Class.forName(factoryClassName, false,
classLoader)
+
.asSubclass(OptionsFactory.class);
+
+ OptionsFactory optionsFactory =
clazz.newInstance();
+ if (optionsFactory instanceof
ConfigurableOptionsFactory) {
+ optionsFactory =
((ConfigurableOptionsFactory) optionsFactory).configure(config);
+ }
+ LOG.info("Using configured options factory:
{}.", optionsFactory);
+
+ return optionsFactory;
+ } catch (ClassNotFoundException e) {
+ throw new DynamicCodeLoadingException(
+ "Cannot find configured options factory
class: " + factoryClassName, e);
+ } catch (ClassCastException | InstantiationException |
IllegalAccessException e) {
+ throw new DynamicCodeLoadingException("The
class configured under '" +
+ RocksDBOptions.OPTIONS_FACTORY.key() +
"' is not a valid options factory (" +
+ factoryClassName + ')', e);
+ }
+ }
+ }
+
//
------------------------------------------------------------------------
// Parameters
//
------------------------------------------------------------------------
@@ -663,9 +725,10 @@ public class RocksDBStateBackend extends
AbstractStateBackend implements Configu
/**
* Sets the predefined options for RocksDB.
*
- * <p>If a user-defined options factory is set (via {@link
#setOptions(OptionsFactory)}),
+ * <p>If user-configured options within {@link
RocksDBConfigurableOptions} is set (through flink-conf.yaml)
+ * or a user-defined options factory is set (via {@link
#setOptions(OptionsFactory)}),
* then the options from the factory are applied on top of the here
specified
- * predefined options.
+ * predefined options and customized options.
*
* @param options The options to set (must not be null).
*/
@@ -678,12 +741,16 @@ public class RocksDBStateBackend extends
AbstractStateBackend implements Configu
* The default options (if nothing was set via {@link
#setPredefinedOptions(PredefinedOptions)})
* are {@link PredefinedOptions#DEFAULT}.
*
- * <p>If a user-defined options factory is set (via {@link
#setOptions(OptionsFactory)}),
- * then the options from the factory are applied on top of the
predefined options.
+ * <p>If user-configured options within {@link
RocksDBConfigurableOptions} is set (through flink-conf.yaml)
+ * of a user-defined options factory is set (via {@link
#setOptions(OptionsFactory)}),
+ * then the options from the factory are applied on top of the
predefined and customized options.
*
* @return The currently set predefined options for RocksDB.
*/
public PredefinedOptions getPredefinedOptions() {
+ if (predefinedOptions == null) {
+ predefinedOptions = PredefinedOptions.DEFAULT;
+ }
return predefinedOptions;
}
@@ -718,9 +785,9 @@ public class RocksDBStateBackend extends
AbstractStateBackend implements Configu
*/
public DBOptions getDbOptions() {
// initial options from pre-defined profile
- DBOptions opt = predefinedOptions.createDBOptions();
+ DBOptions opt = getPredefinedOptions().createDBOptions();
- // add user-defined options, if specified
+ // add user-defined options factory, if specified
if (optionsFactory != null) {
opt = optionsFactory.createDBOptions(opt);
}
@@ -736,7 +803,7 @@ public class RocksDBStateBackend extends
AbstractStateBackend implements Configu
*/
public ColumnFamilyOptions getColumnOptions() {
// initial options from pre-defined profile
- ColumnFamilyOptions opt =
predefinedOptions.createColumnOptions();
+ ColumnFamilyOptions opt =
getPredefinedOptions().createColumnOptions();
// add user-defined options, if specified
if (optionsFactory != null) {
diff --git
a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java
b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java
index 7329646..8044a3a 100644
---
a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java
+++
b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java
@@ -22,6 +22,7 @@ import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.TaskInfo;
import org.apache.flink.api.common.typeutils.base.IntSerializer;
import org.apache.flink.configuration.CheckpointingOptions;
+import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.CoreOptions;
import org.apache.flink.core.fs.FileSystem;
@@ -45,11 +46,14 @@ import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
+import org.rocksdb.BlockBasedTableConfig;
import org.rocksdb.ColumnFamilyOptions;
import org.rocksdb.CompactionStyle;
import org.rocksdb.DBOptions;
+import org.rocksdb.util.SizeUnit;
import java.io.File;
+import java.io.IOException;
import static org.hamcrest.CoreMatchers.anyOf;
import static org.hamcrest.MatcherAssert.assertThat;
@@ -383,8 +387,17 @@ public class RocksDBStateBackendConfigTest {
String checkpointPath =
tempFolder.newFolder().toURI().toString();
RocksDBStateBackend rocksDbBackend = new
RocksDBStateBackend(checkpointPath);
+ // verify that we would use PredefinedOptions.DEFAULT by
default.
assertEquals(PredefinedOptions.DEFAULT,
rocksDbBackend.getPredefinedOptions());
+ // verify that user could configure predefined options via
flink-conf.yaml
+ Configuration configuration = new Configuration();
+ configuration.setString(RocksDBOptions.PREDEFINED_OPTIONS,
PredefinedOptions.FLASH_SSD_OPTIMIZED.name());
+ rocksDbBackend = new RocksDBStateBackend(checkpointPath);
+ rocksDbBackend = rocksDbBackend.configure(configuration,
getClass().getClassLoader());
+ assertEquals(PredefinedOptions.FLASH_SSD_OPTIMIZED,
rocksDbBackend.getPredefinedOptions());
+
+ // verify that predefined options could be set programmatically
and override pre-configured one.
rocksDbBackend.setPredefinedOptions(PredefinedOptions.SPINNING_DISK_OPTIMIZED);
assertEquals(PredefinedOptions.SPINNING_DISK_OPTIMIZED,
rocksDbBackend.getPredefinedOptions());
@@ -394,10 +407,125 @@ public class RocksDBStateBackendConfigTest {
}
@Test
+ public void testSetConfigurableOptions() throws Exception {
+ String checkpointPath =
tempFolder.newFolder().toURI().toString();
+ RocksDBStateBackend rocksDbBackend = new
RocksDBStateBackend(checkpointPath);
+
+ assertNull(rocksDbBackend.getOptions());
+
+ DefaultConfigurableOptionsFactory customizedOptions = new
DefaultConfigurableOptionsFactory()
+ .setMaxBackgroundThreads(4)
+ .setMaxOpenFiles(-1)
+ .setCompactionStyle(CompactionStyle.LEVEL)
+ .setUseDynamicLevelSize(true)
+ .setTargetFileSizeBase("4MB")
+ .setMaxSizeLevelBase("128 mb")
+ .setWriteBufferSize("128 MB")
+ .setMaxWriteBufferNumber(4)
+ .setMinWriteBufferNumberToMerge(3)
+ .setBlockSize("64KB")
+ .setBlockCacheSize("512mb");
+
+ rocksDbBackend.setOptions(customizedOptions);
+
+ try (DBOptions dbOptions = rocksDbBackend.getDbOptions()) {
+ assertEquals(-1, dbOptions.maxOpenFiles());
+ }
+
+ try (ColumnFamilyOptions columnOptions =
rocksDbBackend.getColumnOptions()) {
+ assertEquals(CompactionStyle.LEVEL,
columnOptions.compactionStyle());
+
assertTrue(columnOptions.levelCompactionDynamicLevelBytes());
+ assertEquals(4 * SizeUnit.MB,
columnOptions.targetFileSizeBase());
+ assertEquals(128 * SizeUnit.MB,
columnOptions.maxBytesForLevelBase());
+ assertEquals(4, columnOptions.maxWriteBufferNumber());
+ assertEquals(3,
columnOptions.minWriteBufferNumberToMerge());
+
+ BlockBasedTableConfig tableConfig =
(BlockBasedTableConfig) columnOptions.tableFormatConfig();
+ assertEquals(64 * SizeUnit.KB, tableConfig.blockSize());
+ assertEquals(512 * SizeUnit.MB,
tableConfig.blockCacheSize());
+ }
+ }
+
+ @Test
+ public void testConfigurableOptionsFromConfig() throws IOException {
+ Configuration configuration = new Configuration();
+ DefaultConfigurableOptionsFactory defaultOptionsFactory = new
DefaultConfigurableOptionsFactory();
+
assertTrue(defaultOptionsFactory.configure(configuration).getConfiguredOptions().isEmpty());
+
+ // verify illegal configuration
+ {
+
verifyIllegalArgument(RocksDBConfigurableOptions.MAX_BACKGROUND_THREADS, "-1");
+
verifyIllegalArgument(RocksDBConfigurableOptions.MAX_WRITE_BUFFER_NUMBER, "-1");
+
verifyIllegalArgument(RocksDBConfigurableOptions.MIN_WRITE_BUFFER_NUMBER_TO_MERGE,
"-1");
+
+
verifyIllegalArgument(RocksDBConfigurableOptions.TARGET_FILE_SIZE_BASE, "0KB");
+
verifyIllegalArgument(RocksDBConfigurableOptions.MAX_SIZE_LEVEL_BASE, "1BB");
+
verifyIllegalArgument(RocksDBConfigurableOptions.WRITE_BUFFER_SIZE, "-1KB");
+
verifyIllegalArgument(RocksDBConfigurableOptions.BLOCK_SIZE, "0MB");
+
verifyIllegalArgument(RocksDBConfigurableOptions.BLOCK_CACHE_SIZE, "0");
+
+
verifyIllegalArgument(RocksDBConfigurableOptions.USE_DYNAMIC_LEVEL_SIZE, "1");
+
+
verifyIllegalArgument(RocksDBConfigurableOptions.COMPACTION_STYLE, "LEV");
+ }
+
+ // verify legal configuration
+ {
+
configuration.setString(RocksDBConfigurableOptions.COMPACTION_STYLE, "level");
+
configuration.setString(RocksDBConfigurableOptions.USE_DYNAMIC_LEVEL_SIZE,
"TRUE");
+
configuration.setString(RocksDBConfigurableOptions.TARGET_FILE_SIZE_BASE, "8
mb");
+
configuration.setString(RocksDBConfigurableOptions.MAX_SIZE_LEVEL_BASE,
"128MB");
+
configuration.setString(RocksDBConfigurableOptions.MAX_BACKGROUND_THREADS, "4");
+
configuration.setString(RocksDBConfigurableOptions.MAX_WRITE_BUFFER_NUMBER,
"4");
+
configuration.setString(RocksDBConfigurableOptions.MIN_WRITE_BUFFER_NUMBER_TO_MERGE,
"2");
+
configuration.setString(RocksDBConfigurableOptions.WRITE_BUFFER_SIZE, "64 MB");
+
configuration.setString(RocksDBConfigurableOptions.BLOCK_SIZE, "4 kb");
+
configuration.setString(RocksDBConfigurableOptions.BLOCK_CACHE_SIZE, "512 mb");
+
+ DefaultConfigurableOptionsFactory optionsFactory = new
DefaultConfigurableOptionsFactory();
+ optionsFactory.configure(configuration);
+ String checkpointPath =
tempFolder.newFolder().toURI().toString();
+ RocksDBStateBackend rocksDbBackend = new
RocksDBStateBackend(checkpointPath);
+ rocksDbBackend.setOptions(optionsFactory);
+
+ try (DBOptions dbOptions =
rocksDbBackend.getDbOptions()) {
+ assertEquals(-1, dbOptions.maxOpenFiles());
+ }
+
+ try (ColumnFamilyOptions columnOptions =
rocksDbBackend.getColumnOptions()) {
+ assertEquals(CompactionStyle.LEVEL,
columnOptions.compactionStyle());
+
assertTrue(columnOptions.levelCompactionDynamicLevelBytes());
+ assertEquals(8 * SizeUnit.MB,
columnOptions.targetFileSizeBase());
+ assertEquals(128 * SizeUnit.MB,
columnOptions.maxBytesForLevelBase());
+ assertEquals(4,
columnOptions.maxWriteBufferNumber());
+ assertEquals(2,
columnOptions.minWriteBufferNumberToMerge());
+ assertEquals(64 * SizeUnit.MB,
columnOptions.writeBufferSize());
+
+ BlockBasedTableConfig tableConfig =
(BlockBasedTableConfig) columnOptions.tableFormatConfig();
+ assertEquals(4 * SizeUnit.KB,
tableConfig.blockSize());
+ assertEquals(512 * SizeUnit.MB,
tableConfig.blockCacheSize());
+ }
+ }
+ }
+
+ @Test
public void testOptionsFactory() throws Exception {
String checkpointPath =
tempFolder.newFolder().toURI().toString();
RocksDBStateBackend rocksDbBackend = new
RocksDBStateBackend(checkpointPath);
+ // verify that user-defined options factory could be configured
via flink-conf.yaml
+ Configuration config = new Configuration();
+ config.setString(RocksDBOptions.OPTIONS_FACTORY.key(),
TestOptionsFactory.class.getName());
+ config.setInteger(TestOptionsFactory.BACKGROUND_JOBS_OPTION, 4);
+
+ rocksDbBackend = rocksDbBackend.configure(config,
getClass().getClassLoader());
+
+ assertTrue(rocksDbBackend.getOptions() instanceof
TestOptionsFactory);
+ try (DBOptions dbOptions = rocksDbBackend.getDbOptions()) {
+ assertEquals(4, dbOptions.maxBackgroundJobs());
+ }
+
+ // verify that user-defined options factory could be set
programmatically and override pre-configured one.
rocksDbBackend.setOptions(new OptionsFactory() {
@Override
public DBOptions createDBOptions(DBOptions
currentOptions) {
@@ -543,4 +671,45 @@ public class RocksDBStateBackendConfigTest {
return env;
}
+
+ private void verifyIllegalArgument(
+ ConfigOption<String> configOption,
+ String configValue) {
+ Configuration configuration = new Configuration();
+ configuration.setString(configOption, configValue);
+
+ DefaultConfigurableOptionsFactory optionsFactory = new
DefaultConfigurableOptionsFactory();
+ try {
+ optionsFactory.configure(configuration);
+ fail("Not throwing expected IllegalArgumentException.");
+ } catch (IllegalArgumentException e) {
+ // ignored
+ }
+ }
+
+ /**
+ * An implementation of options factory for testing.
+ */
+ public static class TestOptionsFactory implements
ConfigurableOptionsFactory {
+ public static final String BACKGROUND_JOBS_OPTION =
"my.custom.rocksdb.backgroundJobs";
+
+ private static final int DEFAULT_BACKGROUND_JOBS = 2;
+ private int backgroundJobs = DEFAULT_BACKGROUND_JOBS;
+
+ @Override
+ public DBOptions createDBOptions(DBOptions currentOptions) {
+ return
currentOptions.setMaxBackgroundJobs(backgroundJobs);
+ }
+
+ @Override
+ public ColumnFamilyOptions
createColumnOptions(ColumnFamilyOptions currentOptions) {
+ return
currentOptions.setCompactionStyle(CompactionStyle.UNIVERSAL);
+ }
+
+ @Override
+ public OptionsFactory configure(Configuration configuration) {
+ this.backgroundJobs =
configuration.getInteger(BACKGROUND_JOBS_OPTION, DEFAULT_BACKGROUND_JOBS);
+ return this;
+ }
+ }
}