Zakelly commented on a change in pull request #17874:
URL: https://github.com/apache/flink/pull/17874#discussion_r758904601
##########
File path:
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/EmbeddedRocksDBStateBackend.java
##########
@@ -497,45 +508,58 @@ private RocksDBOptionsFactory configureOptionsFactory(
}
LOG.info("Using application-defined options factory: {}.",
originalOptionsFactory);
- return originalOptionsFactory;
- }
-
- // if using DefaultConfigurableOptionsFactory by default, we could
avoid reflection to speed
- // up.
- if
(factoryClassName.equalsIgnoreCase(DefaultConfigurableOptionsFactory.class.getName()))
{
- DefaultConfigurableOptionsFactory optionsFactory =
- new DefaultConfigurableOptionsFactory();
- optionsFactory.configure(config);
- LOG.info("Using default options factory: {}.", optionsFactory);
-
- return optionsFactory;
- } else {
- try {
- Class<? extends RocksDBOptionsFactory> clazz =
- Class.forName(factoryClassName, false, classLoader)
- .asSubclass(RocksDBOptionsFactory.class);
-
- RocksDBOptionsFactory optionsFactory = clazz.newInstance();
- if (optionsFactory instanceof
ConfigurableRocksDBOptionsFactory) {
- optionsFactory =
- ((ConfigurableRocksDBOptionsFactory)
optionsFactory).configure(config);
+ optionsFactory = originalOptionsFactory;
+ } else if (factoryClassName != null) {
+ // Do nothing if user does not define any factory class.
+ if (factoryClassName.equalsIgnoreCase(
+ DefaultConfigurableOptionsFactory.class.getName())) {
+ // From FLINK-24046, we deprecate the
DefaultConfigurableOptionsFactory.
+ LOG.warn(
+ "{} is deprecated. Please remove this value from the
configuration."
+ + "It is safe to do so since the configurable
options will be loaded "
+ + "in other place. For more information,
please refer to FLINK-24046.",
+ DefaultConfigurableOptionsFactory.class.getName());
+ } else {
+ try {
+ Class<? extends RocksDBOptionsFactory> clazz =
+ Class.forName(factoryClassName, false, classLoader)
+ .asSubclass(RocksDBOptionsFactory.class);
+
+ optionsFactory = clazz.newInstance();
+ if (optionsFactory instanceof
ConfigurableRocksDBOptionsFactory) {
+ optionsFactory =
+ ((ConfigurableRocksDBOptionsFactory)
optionsFactory)
+ .configure(config);
+ }
+ LOG.info("Using configured options factory: {}.",
optionsFactory);
+
+ } catch (ClassNotFoundException e) {
+ throw new DynamicCodeLoadingException(
+ "Cannot find configured options factory class: " +
factoryClassName, e);
+ } catch (ClassCastException | InstantiationException |
IllegalAccessException e) {
+ throw new DynamicCodeLoadingException(
+ "The class configured under '"
+ + RocksDBOptions.OPTIONS_FACTORY.key()
+ + "' is not a valid options factory ("
+ + factoryClassName
+ + ')',
+ e);
}
- LOG.info("Using configured options factory: {}.",
optionsFactory);
-
- return optionsFactory;
- } catch (ClassNotFoundException e) {
- throw new DynamicCodeLoadingException(
- "Cannot find configured options factory class: " +
factoryClassName, e);
- } catch (ClassCastException | InstantiationException |
IllegalAccessException e) {
- throw new DynamicCodeLoadingException(
- "The class configured under '"
- + RocksDBOptions.OPTIONS_FACTORY.key()
- + "' is not a valid options factory ("
- + factoryClassName
- + ')',
- e);
}
}
+
+ if (optionsFactory instanceof DefaultOperatorStateBackendBuilder) {
Review comment:
My mistake
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]