Myasuka commented on a change in pull request #17874:
URL: https://github.com/apache/flink/pull/17874#discussion_r758898982
##########
File path:
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/PredefinedOptions.java
##########
@@ -191,68 +157,55 @@ public ColumnFamilyOptions
createColumnOptions(Collection<AutoCloseable> handles
*
* <ul>
* <li>setIncreaseParallelism(4)
- * <li>setUseFsync(false)
* <li>setDisableDataSync(true)
* <li>setMaxOpenFiles(-1)
* <li>setInfoLogLevel(InfoLogLevel.HEADER_LEVEL)
- * <li>setStatsDumpPeriodSec(0)
* </ul>
*
* <p>Note: Because Flink does not rely on RocksDB data on disk for
recovery, there is no need
* to sync data to stable storage.
*/
- FLASH_SSD_OPTIMIZED {
-
- @Override
- public DBOptions createDBOptions(Collection<AutoCloseable>
handlesToClose) {
- return new DBOptions()
- .setIncreaseParallelism(4)
- .setUseFsync(false)
- .setMaxOpenFiles(-1)
- .setInfoLogLevel(InfoLogLevel.HEADER_LEVEL)
- .setStatsDumpPeriodSec(0);
- }
+ FLASH_SSD_OPTIMIZED(
+ new HashMap<ConfigOption<?>, Object>() {
+ private static final long serialVersionUID =
462493450763181265L;
Review comment:
According to the [code
guide](https://flink.apache.org/contributing/code-style-and-quality-java.html#java-serialization),
we should make the `serialVersionUID` as `1L`. Same for all predefined options.
##########
File path:
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/EmbeddedRocksDBStateBackend.java
##########
@@ -497,45 +508,58 @@ private RocksDBOptionsFactory configureOptionsFactory(
}
LOG.info("Using application-defined options factory: {}.",
originalOptionsFactory);
- return originalOptionsFactory;
- }
-
- // if using DefaultConfigurableOptionsFactory by default, we could
avoid reflection to speed
- // up.
- if
(factoryClassName.equalsIgnoreCase(DefaultConfigurableOptionsFactory.class.getName()))
{
- DefaultConfigurableOptionsFactory optionsFactory =
- new DefaultConfigurableOptionsFactory();
- optionsFactory.configure(config);
- LOG.info("Using default options factory: {}.", optionsFactory);
-
- return optionsFactory;
- } else {
- try {
- Class<? extends RocksDBOptionsFactory> clazz =
- Class.forName(factoryClassName, false, classLoader)
- .asSubclass(RocksDBOptionsFactory.class);
-
- RocksDBOptionsFactory optionsFactory = clazz.newInstance();
- if (optionsFactory instanceof
ConfigurableRocksDBOptionsFactory) {
- optionsFactory =
- ((ConfigurableRocksDBOptionsFactory)
optionsFactory).configure(config);
+ optionsFactory = originalOptionsFactory;
+ } else if (factoryClassName != null) {
+ // Do nothing if user does not define any factory class.
+ if (factoryClassName.equalsIgnoreCase(
+ DefaultConfigurableOptionsFactory.class.getName())) {
+ // From FLINK-24046, we deprecate the
DefaultConfigurableOptionsFactory.
+ LOG.warn(
+ "{} is deprecated. Please remove this value from the
configuration."
+ + "It is safe to do so since the configurable
options will be loaded "
+ + "in other place. For more information,
please refer to FLINK-24046.",
+ DefaultConfigurableOptionsFactory.class.getName());
+ } else {
+ try {
+ Class<? extends RocksDBOptionsFactory> clazz =
+ Class.forName(factoryClassName, false, classLoader)
+ .asSubclass(RocksDBOptionsFactory.class);
+
+ optionsFactory = clazz.newInstance();
+ if (optionsFactory instanceof
ConfigurableRocksDBOptionsFactory) {
+ optionsFactory =
+ ((ConfigurableRocksDBOptionsFactory)
optionsFactory)
+ .configure(config);
+ }
+ LOG.info("Using configured options factory: {}.",
optionsFactory);
+
+ } catch (ClassNotFoundException e) {
+ throw new DynamicCodeLoadingException(
+ "Cannot find configured options factory class: " +
factoryClassName, e);
+ } catch (ClassCastException | InstantiationException |
IllegalAccessException e) {
+ throw new DynamicCodeLoadingException(
+ "The class configured under '"
+ + RocksDBOptions.OPTIONS_FACTORY.key()
+ + "' is not a valid options factory ("
+ + factoryClassName
+ + ')',
+ e);
}
- LOG.info("Using configured options factory: {}.",
optionsFactory);
-
- return optionsFactory;
- } catch (ClassNotFoundException e) {
- throw new DynamicCodeLoadingException(
- "Cannot find configured options factory class: " +
factoryClassName, e);
- } catch (ClassCastException | InstantiationException |
IllegalAccessException e) {
- throw new DynamicCodeLoadingException(
- "The class configured under '"
- + RocksDBOptions.OPTIONS_FACTORY.key()
- + "' is not a valid options factory ("
- + factoryClassName
- + ')',
- e);
}
}
+
+ if (optionsFactory instanceof DefaultOperatorStateBackendBuilder) {
+ LOG.warn(
+ "{} is extending from {}, which is deprecated and will be
removed in "
+ + "future. It is highly recommended to directly
implement the "
Review comment:
Typo, should be `in the future`.
##########
File path:
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/EmbeddedRocksDBStateBackend.java
##########
@@ -497,45 +508,58 @@ private RocksDBOptionsFactory configureOptionsFactory(
}
LOG.info("Using application-defined options factory: {}.",
originalOptionsFactory);
- return originalOptionsFactory;
- }
-
- // if using DefaultConfigurableOptionsFactory by default, we could
avoid reflection to speed
- // up.
- if
(factoryClassName.equalsIgnoreCase(DefaultConfigurableOptionsFactory.class.getName()))
{
- DefaultConfigurableOptionsFactory optionsFactory =
- new DefaultConfigurableOptionsFactory();
- optionsFactory.configure(config);
- LOG.info("Using default options factory: {}.", optionsFactory);
-
- return optionsFactory;
- } else {
- try {
- Class<? extends RocksDBOptionsFactory> clazz =
- Class.forName(factoryClassName, false, classLoader)
- .asSubclass(RocksDBOptionsFactory.class);
-
- RocksDBOptionsFactory optionsFactory = clazz.newInstance();
- if (optionsFactory instanceof
ConfigurableRocksDBOptionsFactory) {
- optionsFactory =
- ((ConfigurableRocksDBOptionsFactory)
optionsFactory).configure(config);
+ optionsFactory = originalOptionsFactory;
+ } else if (factoryClassName != null) {
+ // Do nothing if user does not define any factory class.
+ if (factoryClassName.equalsIgnoreCase(
+ DefaultConfigurableOptionsFactory.class.getName())) {
+ // From FLINK-24046, we deprecate the
DefaultConfigurableOptionsFactory.
+ LOG.warn(
+ "{} is deprecated. Please remove this value from the
configuration."
+ + "It is safe to do so since the configurable
options will be loaded "
+ + "in other place. For more information,
please refer to FLINK-24046.",
+ DefaultConfigurableOptionsFactory.class.getName());
+ } else {
+ try {
+ Class<? extends RocksDBOptionsFactory> clazz =
+ Class.forName(factoryClassName, false, classLoader)
+ .asSubclass(RocksDBOptionsFactory.class);
+
+ optionsFactory = clazz.newInstance();
+ if (optionsFactory instanceof
ConfigurableRocksDBOptionsFactory) {
+ optionsFactory =
+ ((ConfigurableRocksDBOptionsFactory)
optionsFactory)
+ .configure(config);
+ }
+ LOG.info("Using configured options factory: {}.",
optionsFactory);
+
+ } catch (ClassNotFoundException e) {
+ throw new DynamicCodeLoadingException(
+ "Cannot find configured options factory class: " +
factoryClassName, e);
+ } catch (ClassCastException | InstantiationException |
IllegalAccessException e) {
+ throw new DynamicCodeLoadingException(
+ "The class configured under '"
+ + RocksDBOptions.OPTIONS_FACTORY.key()
+ + "' is not a valid options factory ("
+ + factoryClassName
+ + ')',
+ e);
}
- LOG.info("Using configured options factory: {}.",
optionsFactory);
-
- return optionsFactory;
- } catch (ClassNotFoundException e) {
- throw new DynamicCodeLoadingException(
- "Cannot find configured options factory class: " +
factoryClassName, e);
- } catch (ClassCastException | InstantiationException |
IllegalAccessException e) {
- throw new DynamicCodeLoadingException(
- "The class configured under '"
- + RocksDBOptions.OPTIONS_FACTORY.key()
- + "' is not a valid options factory ("
- + factoryClassName
- + ')',
- e);
}
}
+
+ if (optionsFactory instanceof DefaultOperatorStateBackendBuilder) {
Review comment:
```suggestion
if (optionsFactory instanceof DefaultConfigurableOptionsFactory) {
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]