Myasuka commented on a change in pull request #17874:
URL: https://github.com/apache/flink/pull/17874#discussion_r757203945
##########
File path:
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/PredefinedOptions.java
##########
@@ -50,26 +56,15 @@
* <p>The following options are set:
*
* <ul>
- * <li>setUseFsync(false)
* <li>setInfoLogLevel(InfoLogLevel.HEADER_LEVEL)
- * <li>setStatsDumpPeriodSec(0)
* </ul>
*/
- DEFAULT {
-
- @Override
- public DBOptions createDBOptions(Collection<AutoCloseable>
handlesToClose) {
- return new DBOptions()
- .setUseFsync(false)
- .setInfoLogLevel(InfoLogLevel.HEADER_LEVEL)
- .setStatsDumpPeriodSec(0);
- }
-
- @Override
- public ColumnFamilyOptions
createColumnOptions(Collection<AutoCloseable> handlesToClose) {
- return new ColumnFamilyOptions();
- }
- },
+ DEFAULT(
+ new HashMap<ConfigOption<?>, Object>() {
+ {
+ put(RocksDBConfigurableOptions.LOG_LEVEL,
InfoLogLevel.HEADER_LEVEL);
+ }
+ }),
Review comment:
This could be simpilfied to
`Collections.singletonMap(RocksDBConfigurableOptions.LOG_LEVEL,
InfoLogLevel.HEADER_LEVEL)`
##########
File path:
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/PredefinedOptions.java
##########
@@ -131,55 +115,35 @@ public ColumnFamilyOptions
createColumnOptions(Collection<AutoCloseable> handles
* <li>setIncreaseParallelism(4)
* <li>setMinWriteBufferNumberToMerge(3)
* <li>setMaxWriteBufferNumber(4)
- * <li>setUseFsync(false)
* <li>setMaxOpenFiles(-1)
* <li>setInfoLogLevel(InfoLogLevel.HEADER_LEVEL)
- * <li>setStatsDumpPeriodSec(0)
* <li>BlockBasedTableConfig.setBlockCacheSize(256 MBytes)
* <li>BlockBasedTableConfigsetBlockSize(128 KBytes)
Review comment:
Maybe we can also fix this to `BlockBasedTableConfig.setBlockSize(128
KBytes)`
##########
File path:
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBResourceContainer.java
##########
@@ -245,6 +268,121 @@ private boolean
overwriteFilterIfExist(BlockBasedTableConfig blockBasedTableConf
return true;
}
+ /** Create a {@link DBOptions} for RocksDB, including some common
settings. */
+ DBOptions createDBOptions() {
Review comment:
I think this method should be renamed to `createBaseCommonDBOptions`
with `private` scope.
##########
File path:
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/EmbeddedRocksDBStateBackend.java
##########
@@ -782,6 +791,29 @@ public void setWriteBatchSize(long writeBatchSize) {
// utilities
// ------------------------------------------------------------------------
+ private ReadableConfig mergeConfigurableOptions(ReadableConfig base,
ReadableConfig onTop) {
+ if (base == null) {
+ base = new Configuration();
+ }
+ if (onTop == null) {
Review comment:
Actually, the `onTop` cannot be null.
##########
File path:
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBConfigurableOptions.java
##########
@@ -55,23 +57,23 @@
public static final ConfigOption<Integer> MAX_BACKGROUND_THREADS =
key("state.backend.rocksdb.thread.num")
.intType()
- .noDefaultValue()
+ .defaultValue(1)
Review comment:
I noticed that RocksDB has changed the default value to 2, I will create
a hotfix for this.
Once we fill `RocksDBConfigurableOptions` with default values, we should not
say "RocksDB has default configuration " as Flink control the configurations
totally.
##########
File path:
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBResourceContainer.java
##########
@@ -245,6 +268,121 @@ private boolean
overwriteFilterIfExist(BlockBasedTableConfig blockBasedTableConf
return true;
}
+ /** Create a {@link DBOptions} for RocksDB, including some common
settings. */
+ DBOptions createDBOptions() {
+ return new DBOptions().setUseFsync(false).setStatsDumpPeriodSec(0);
+ }
+
+ /** Create a {@link ColumnFamilyOptions} for RocksDB, including some
common settings. */
+ ColumnFamilyOptions createColumnOptions() {
Review comment:
I think this method should be renamed to `createBaseColumnOptions` with
`private` scope.
##########
File path:
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/EmbeddedRocksDBStateBackend.java
##########
@@ -782,6 +791,29 @@ public void setWriteBatchSize(long writeBatchSize) {
// utilities
// ------------------------------------------------------------------------
+ private ReadableConfig mergeConfigurableOptions(ReadableConfig base,
ReadableConfig onTop) {
+ if (base == null) {
+ base = new Configuration();
+ }
+ if (onTop == null) {
+ onTop = new Configuration();
+ }
+ Configuration configuration = new Configuration();
+ for (ConfigOption<?> option :
RocksDBConfigurableOptions.CANDIDATE_CONFIGS) {
+ Optional<?> baseValue = base.getOptional(option);
+ Optional<?> topValue = onTop.getOptional(option);
+
+ if (topValue.isPresent()) {
+ RocksDBConfigurableOptions.checkArgumentValid(option,
topValue.get());
+ configuration.setString(option.key(),
topValue.get().toString());
+ } else if (baseValue.isPresent()) {
+ RocksDBConfigurableOptions.checkArgumentValid(option,
baseValue.get());
+ configuration.setString(option.key(),
baseValue.get().toString());
+ }
Review comment:
I think code below looks better:
~~~ java
if (topValue.isPresent() || baseValue.isPresent()) {
Object validValue = topValue.isPresent() ? topValue.get() :
baseValue.get();
RocksDBConfigurableOptions.checkArgumentValid(option,
validValue);
configuration.setString(option.key(), validValue.toString());
}
~~~
##########
File path:
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/EmbeddedRocksDBStateBackend.java
##########
@@ -500,15 +509,15 @@ private RocksDBOptionsFactory configureOptionsFactory(
return originalOptionsFactory;
}
- // if using DefaultConfigurableOptionsFactory by default, we could
avoid reflection to speed
- // up.
- if
(factoryClassName.equalsIgnoreCase(DefaultConfigurableOptionsFactory.class.getName()))
{
- DefaultConfigurableOptionsFactory optionsFactory =
- new DefaultConfigurableOptionsFactory();
- optionsFactory.configure(config);
- LOG.info("Using default options factory: {}.", optionsFactory);
-
- return optionsFactory;
+ // From FLINK-24046, we deprecate the
DefaultConfigurableOptionsFactory.
+ if (factoryClassName == null) {
+ return null;
+ } else if (factoryClassName.equalsIgnoreCase(
+ DefaultConfigurableOptionsFactory.class.getName())) {
+ LOG.info(
+ "{} is deprecated. Please remove this value from the
configuration.",
Review comment:
I think we should tell whether it's safe for users to remove this from
configuration.
##########
File path:
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBConfigurableOptions.java
##########
@@ -248,4 +250,78 @@
.defaultValue(false)
.withDescription(
"If true, RocksDB will use block-based filter
instead of full filter, this only take effect when bloom filter is used.");
+
+ public static final ConfigOption<?>[] CANDIDATE_CONFIGS =
Review comment:
This could be a package public field.
##########
File path:
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/PredefinedOptions.java
##########
@@ -83,35 +78,24 @@ public ColumnFamilyOptions
createColumnOptions(Collection<AutoCloseable> handles
* <li>setCompactionStyle(CompactionStyle.LEVEL)
* <li>setLevelCompactionDynamicLevelBytes(true)
* <li>setIncreaseParallelism(4)
- * <li>setUseFsync(false)
* <li>setDisableDataSync(true)
* <li>setMaxOpenFiles(-1)
* <li>setInfoLogLevel(InfoLogLevel.HEADER_LEVEL)
- * <li>setStatsDumpPeriodSec(0)
* </ul>
*
* <p>Note: Because Flink does not rely on RocksDB data on disk for
recovery, there is no need
* to sync data to stable storage.
*/
- SPINNING_DISK_OPTIMIZED {
-
- @Override
- public DBOptions createDBOptions(Collection<AutoCloseable>
handlesToClose) {
- return new DBOptions()
- .setIncreaseParallelism(4)
- .setUseFsync(false)
- .setMaxOpenFiles(-1)
- .setInfoLogLevel(InfoLogLevel.HEADER_LEVEL)
- .setStatsDumpPeriodSec(0);
- }
-
- @Override
- public ColumnFamilyOptions
createColumnOptions(Collection<AutoCloseable> handlesToClose) {
- return new ColumnFamilyOptions()
- .setCompactionStyle(CompactionStyle.LEVEL)
- .setLevelCompactionDynamicLevelBytes(true);
- }
- },
+ SPINNING_DISK_OPTIMIZED(
+ new HashMap<ConfigOption<?>, Object>() {
+ {
+ put(RocksDBConfigurableOptions.MAX_BACKGROUND_THREADS, 4);
+ put(RocksDBConfigurableOptions.MAX_OPEN_FILES, -1);
+ put(RocksDBConfigurableOptions.LOG_LEVEL,
InfoLogLevel.HEADER_LEVEL);
+ put(RocksDBConfigurableOptions.COMPACTION_STYLE,
CompactionStyle.LEVEL);
+ put(RocksDBConfigurableOptions.USE_DYNAMIC_LEVEL_SIZE,
true);
+ }
Review comment:
If choose to use anonymous class way to create a new hash map, these
methdos should declare `serialVersionUID`
##########
File path:
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBResourceContainer.java
##########
@@ -127,10 +147,13 @@ public Long getWriteBufferManagerCapacity() {
/** Gets the RocksDB {@link ColumnFamilyOptions} to be used for all
RocksDB instances. */
public ColumnFamilyOptions getColumnOptions() {
- // initial options from pre-defined profile
- ColumnFamilyOptions opt =
predefinedOptions.createColumnOptions(handlesToClose);
+ // initial options from common profile
+ ColumnFamilyOptions opt = createColumnOptions();
handlesToClose.add(opt);
+ // load configurable options on top of pre-defined profile
+ opt = getColumnFamilyOptionsFromConfigurableOptions(opt,
handlesToClose);
Review comment:
There is no need to reassign `opt` here.
##########
File path:
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/DefaultConfigurableOptionsFactory.java
##########
@@ -65,7 +65,13 @@
* An implementation of {@link ConfigurableRocksDBOptionsFactory} using
options provided by {@link
* RocksDBConfigurableOptions}. It acts as the default options factory within
{@link
* EmbeddedRocksDBStateBackend} if the user did not define a {@link
RocksDBOptionsFactory}.
+ *
+ * <p>After FLINK-24046, we refactor the config procedure for RocksDB. User
could use {@link
+ * ConfigurableRocksDBOptionsFactory} to apply some customized options.
Besides this, we load the
+ * configurable options in {@link RocksDBResourceContainer} instead of {@link
+ * DefaultConfigurableOptionsFactory}. Thus, we mark this factory Deprecated.
*/
+@Deprecated
public class DefaultConfigurableOptionsFactory implements
ConfigurableRocksDBOptionsFactory {
Review comment:
Once we deprecate this class, we should also remove its usage as mush as
possible in the project.
I think we'd better remove the usage in `test_state_backend.py` and
`RocksDBStateBackendConfigTest`
##########
File path:
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBResourceContainer.java
##########
@@ -91,10 +108,13 @@ public RocksDBResourceContainer(
/** Gets the RocksDB {@link DBOptions} to be used for RocksDB instances. */
public DBOptions getDbOptions() {
- // initial options from pre-defined profile
- DBOptions opt = predefinedOptions.createDBOptions(handlesToClose);
+ // initial options from common profile
+ DBOptions opt = createDBOptions();
handlesToClose.add(opt);
+ // load configurable options on top of pre-defined profile
+ opt = getDBOptionsFromConfigurableOptions(opt, handlesToClose);
Review comment:
There is no need to reassign `opt` here. Moreover, there is no need to
pass `handlesToClose` in.
##########
File path:
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBConfigurableOptions.java
##########
@@ -248,4 +250,78 @@
.defaultValue(false)
.withDescription(
"If true, RocksDB will use block-based filter
instead of full filter, this only take effect when bloom filter is used.");
+
+ public static final ConfigOption<?>[] CANDIDATE_CONFIGS =
+ new ConfigOption<?>[] {
+ // configurable DBOptions
+ MAX_BACKGROUND_THREADS,
+ MAX_OPEN_FILES,
+ LOG_LEVEL,
+ LOG_MAX_FILE_SIZE,
+ LOG_FILE_NUM,
+ LOG_DIR,
+
+ // configurable ColumnFamilyOptions
+ COMPACTION_STYLE,
+ USE_DYNAMIC_LEVEL_SIZE,
+ TARGET_FILE_SIZE_BASE,
+ MAX_SIZE_LEVEL_BASE,
+ WRITE_BUFFER_SIZE,
+ MAX_WRITE_BUFFER_NUMBER,
+ MIN_WRITE_BUFFER_NUMBER_TO_MERGE,
+ BLOCK_SIZE,
+ METADATA_BLOCK_SIZE,
+ BLOCK_CACHE_SIZE,
+ USE_BLOOM_FILTER,
+ BLOOM_FILTER_BITS_PER_KEY,
+ BLOOM_FILTER_BLOCK_BASED_MODE
+ };
+
+ private static final Set<ConfigOption<?>> POSITIVE_INT_CONFIG_SET =
+ new HashSet<>(
+ Arrays.asList(
+ MAX_BACKGROUND_THREADS,
+ LOG_FILE_NUM,
+ MAX_WRITE_BUFFER_NUMBER,
+ MIN_WRITE_BUFFER_NUMBER_TO_MERGE));
+
+ private static final Set<ConfigOption<?>> SIZE_CONFIG_SET =
+ new HashSet<>(
+ Arrays.asList(
+ TARGET_FILE_SIZE_BASE,
+ MAX_SIZE_LEVEL_BASE,
+ WRITE_BUFFER_SIZE,
+ BLOCK_SIZE,
+ METADATA_BLOCK_SIZE,
+ BLOCK_CACHE_SIZE));
+
+ /**
+ * Helper method to check whether the (key,value) is valid through given
configuration and
+ * returns the formatted value.
+ *
+ * @param option The configuration key which is configurable in {@link
+ * RocksDBConfigurableOptions}.
+ * @param value The value within given configuration.
+ */
+ public static void checkArgumentValid(ConfigOption<?> option, Object
value) {
Review comment:
This method could be a package public method.
##########
File path:
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/EmbeddedRocksDBStateBackend.java
##########
@@ -500,15 +509,15 @@ private RocksDBOptionsFactory configureOptionsFactory(
return originalOptionsFactory;
}
- // if using DefaultConfigurableOptionsFactory by default, we could
avoid reflection to speed
- // up.
- if
(factoryClassName.equalsIgnoreCase(DefaultConfigurableOptionsFactory.class.getName()))
{
- DefaultConfigurableOptionsFactory optionsFactory =
- new DefaultConfigurableOptionsFactory();
- optionsFactory.configure(config);
- LOG.info("Using default options factory: {}.", optionsFactory);
-
- return optionsFactory;
+ // From FLINK-24046, we deprecate the
DefaultConfigurableOptionsFactory.
+ if (factoryClassName == null) {
+ return null;
+ } else if (factoryClassName.equalsIgnoreCase(
+ DefaultConfigurableOptionsFactory.class.getName())) {
+ LOG.info(
+ "{} is deprecated. Please remove this value from the
configuration.",
Review comment:
I think we should also tell users if the configured
`originalOptionsFactory` or `factoryClassName` is actually extending from
`DefaultConfigurableOptionsFactory`. If so, please print related warnings.
##########
File path:
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/DefaultConfigurableOptionsFactory.java
##########
@@ -65,7 +65,13 @@
* An implementation of {@link ConfigurableRocksDBOptionsFactory} using
options provided by {@link
* RocksDBConfigurableOptions}. It acts as the default options factory within
{@link
* EmbeddedRocksDBStateBackend} if the user did not define a {@link
RocksDBOptionsFactory}.
+ *
+ * <p>After FLINK-24046, we refactor the config procedure for RocksDB. User
could use {@link
+ * ConfigurableRocksDBOptionsFactory} to apply some customized options.
Besides this, we load the
+ * configurable options in {@link RocksDBResourceContainer} instead of {@link
+ * DefaultConfigurableOptionsFactory}. Thus, we mark this factory Deprecated.
Review comment:
I think we should tell user explictly that this class is ignored for
general case and still kept for backward compatibility if user still leverage
this class.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]