Myasuka commented on a change in pull request #17874: URL: https://github.com/apache/flink/pull/17874#discussion_r758898982
########## File path: flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/PredefinedOptions.java ########## @@ -191,68 +157,55 @@ public ColumnFamilyOptions createColumnOptions(Collection<AutoCloseable> handles * * <ul> * <li>setIncreaseParallelism(4) - * <li>setUseFsync(false) * <li>setDisableDataSync(true) * <li>setMaxOpenFiles(-1) * <li>setInfoLogLevel(InfoLogLevel.HEADER_LEVEL) - * <li>setStatsDumpPeriodSec(0) * </ul> * * <p>Note: Because Flink does not rely on RocksDB data on disk for recovery, there is no need * to sync data to stable storage. */ - FLASH_SSD_OPTIMIZED { - - @Override - public DBOptions createDBOptions(Collection<AutoCloseable> handlesToClose) { - return new DBOptions() - .setIncreaseParallelism(4) - .setUseFsync(false) - .setMaxOpenFiles(-1) - .setInfoLogLevel(InfoLogLevel.HEADER_LEVEL) - .setStatsDumpPeriodSec(0); - } + FLASH_SSD_OPTIMIZED( + new HashMap<ConfigOption<?>, Object>() { + private static final long serialVersionUID = 462493450763181265L; Review comment: According to the [code guide](https://flink.apache.org/contributing/code-style-and-quality-java.html#java-serialization), we should make the `serialVersionUID` as `1L`. Same for all predefined options. ########## File path: flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/EmbeddedRocksDBStateBackend.java ########## @@ -497,45 +508,58 @@ private RocksDBOptionsFactory configureOptionsFactory( } LOG.info("Using application-defined options factory: {}.", originalOptionsFactory); - return originalOptionsFactory; - } - - // if using DefaultConfigurableOptionsFactory by default, we could avoid reflection to speed - // up. - if (factoryClassName.equalsIgnoreCase(DefaultConfigurableOptionsFactory.class.getName())) { - DefaultConfigurableOptionsFactory optionsFactory = - new DefaultConfigurableOptionsFactory(); - optionsFactory.configure(config); - LOG.info("Using default options factory: {}.", optionsFactory); - - return optionsFactory; - } else { - try { - Class<? extends RocksDBOptionsFactory> clazz = - Class.forName(factoryClassName, false, classLoader) - .asSubclass(RocksDBOptionsFactory.class); - - RocksDBOptionsFactory optionsFactory = clazz.newInstance(); - if (optionsFactory instanceof ConfigurableRocksDBOptionsFactory) { - optionsFactory = - ((ConfigurableRocksDBOptionsFactory) optionsFactory).configure(config); + optionsFactory = originalOptionsFactory; + } else if (factoryClassName != null) { + // Do nothing if user does not define any factory class. + if (factoryClassName.equalsIgnoreCase( + DefaultConfigurableOptionsFactory.class.getName())) { + // From FLINK-24046, we deprecate the DefaultConfigurableOptionsFactory. + LOG.warn( + "{} is deprecated. Please remove this value from the configuration." + + "It is safe to do so since the configurable options will be loaded " + + "in other place. For more information, please refer to FLINK-24046.", + DefaultConfigurableOptionsFactory.class.getName()); + } else { + try { + Class<? extends RocksDBOptionsFactory> clazz = + Class.forName(factoryClassName, false, classLoader) + .asSubclass(RocksDBOptionsFactory.class); + + optionsFactory = clazz.newInstance(); + if (optionsFactory instanceof ConfigurableRocksDBOptionsFactory) { + optionsFactory = + ((ConfigurableRocksDBOptionsFactory) optionsFactory) + .configure(config); + } + LOG.info("Using configured options factory: {}.", optionsFactory); + + } catch (ClassNotFoundException e) { + throw new DynamicCodeLoadingException( + "Cannot find configured options factory class: " + factoryClassName, e); + } catch (ClassCastException | InstantiationException | IllegalAccessException e) { + throw new DynamicCodeLoadingException( + "The class configured under '" + + RocksDBOptions.OPTIONS_FACTORY.key() + + "' is not a valid options factory (" + + factoryClassName + + ')', + e); } - LOG.info("Using configured options factory: {}.", optionsFactory); - - return optionsFactory; - } catch (ClassNotFoundException e) { - throw new DynamicCodeLoadingException( - "Cannot find configured options factory class: " + factoryClassName, e); - } catch (ClassCastException | InstantiationException | IllegalAccessException e) { - throw new DynamicCodeLoadingException( - "The class configured under '" - + RocksDBOptions.OPTIONS_FACTORY.key() - + "' is not a valid options factory (" - + factoryClassName - + ')', - e); } } + + if (optionsFactory instanceof DefaultOperatorStateBackendBuilder) { + LOG.warn( + "{} is extending from {}, which is deprecated and will be removed in " + + "future. It is highly recommended to directly implement the " Review comment: Typo, should be `in the future`. ########## File path: flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/EmbeddedRocksDBStateBackend.java ########## @@ -497,45 +508,58 @@ private RocksDBOptionsFactory configureOptionsFactory( } LOG.info("Using application-defined options factory: {}.", originalOptionsFactory); - return originalOptionsFactory; - } - - // if using DefaultConfigurableOptionsFactory by default, we could avoid reflection to speed - // up. - if (factoryClassName.equalsIgnoreCase(DefaultConfigurableOptionsFactory.class.getName())) { - DefaultConfigurableOptionsFactory optionsFactory = - new DefaultConfigurableOptionsFactory(); - optionsFactory.configure(config); - LOG.info("Using default options factory: {}.", optionsFactory); - - return optionsFactory; - } else { - try { - Class<? extends RocksDBOptionsFactory> clazz = - Class.forName(factoryClassName, false, classLoader) - .asSubclass(RocksDBOptionsFactory.class); - - RocksDBOptionsFactory optionsFactory = clazz.newInstance(); - if (optionsFactory instanceof ConfigurableRocksDBOptionsFactory) { - optionsFactory = - ((ConfigurableRocksDBOptionsFactory) optionsFactory).configure(config); + optionsFactory = originalOptionsFactory; + } else if (factoryClassName != null) { + // Do nothing if user does not define any factory class. + if (factoryClassName.equalsIgnoreCase( + DefaultConfigurableOptionsFactory.class.getName())) { + // From FLINK-24046, we deprecate the DefaultConfigurableOptionsFactory. + LOG.warn( + "{} is deprecated. Please remove this value from the configuration." + + "It is safe to do so since the configurable options will be loaded " + + "in other place. For more information, please refer to FLINK-24046.", + DefaultConfigurableOptionsFactory.class.getName()); + } else { + try { + Class<? extends RocksDBOptionsFactory> clazz = + Class.forName(factoryClassName, false, classLoader) + .asSubclass(RocksDBOptionsFactory.class); + + optionsFactory = clazz.newInstance(); + if (optionsFactory instanceof ConfigurableRocksDBOptionsFactory) { + optionsFactory = + ((ConfigurableRocksDBOptionsFactory) optionsFactory) + .configure(config); + } + LOG.info("Using configured options factory: {}.", optionsFactory); + + } catch (ClassNotFoundException e) { + throw new DynamicCodeLoadingException( + "Cannot find configured options factory class: " + factoryClassName, e); + } catch (ClassCastException | InstantiationException | IllegalAccessException e) { + throw new DynamicCodeLoadingException( + "The class configured under '" + + RocksDBOptions.OPTIONS_FACTORY.key() + + "' is not a valid options factory (" + + factoryClassName + + ')', + e); } - LOG.info("Using configured options factory: {}.", optionsFactory); - - return optionsFactory; - } catch (ClassNotFoundException e) { - throw new DynamicCodeLoadingException( - "Cannot find configured options factory class: " + factoryClassName, e); - } catch (ClassCastException | InstantiationException | IllegalAccessException e) { - throw new DynamicCodeLoadingException( - "The class configured under '" - + RocksDBOptions.OPTIONS_FACTORY.key() - + "' is not a valid options factory (" - + factoryClassName - + ')', - e); } } + + if (optionsFactory instanceof DefaultOperatorStateBackendBuilder) { Review comment: ```suggestion if (optionsFactory instanceof DefaultConfigurableOptionsFactory) { ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org