Myasuka commented on a change in pull request #17874:
URL: https://github.com/apache/flink/pull/17874#discussion_r758898982



##########
File path: 
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/PredefinedOptions.java
##########
@@ -191,68 +157,55 @@ public ColumnFamilyOptions 
createColumnOptions(Collection<AutoCloseable> handles
      *
      * <ul>
      *   <li>setIncreaseParallelism(4)
-     *   <li>setUseFsync(false)
      *   <li>setDisableDataSync(true)
      *   <li>setMaxOpenFiles(-1)
      *   <li>setInfoLogLevel(InfoLogLevel.HEADER_LEVEL)
-     *   <li>setStatsDumpPeriodSec(0)
      * </ul>
      *
      * <p>Note: Because Flink does not rely on RocksDB data on disk for 
recovery, there is no need
      * to sync data to stable storage.
      */
-    FLASH_SSD_OPTIMIZED {
-
-        @Override
-        public DBOptions createDBOptions(Collection<AutoCloseable> 
handlesToClose) {
-            return new DBOptions()
-                    .setIncreaseParallelism(4)
-                    .setUseFsync(false)
-                    .setMaxOpenFiles(-1)
-                    .setInfoLogLevel(InfoLogLevel.HEADER_LEVEL)
-                    .setStatsDumpPeriodSec(0);
-        }
+    FLASH_SSD_OPTIMIZED(
+            new HashMap<ConfigOption<?>, Object>() {
+                private static final long serialVersionUID = 
462493450763181265L;

Review comment:
       According to the [code 
guide](https://flink.apache.org/contributing/code-style-and-quality-java.html#java-serialization),
 we should make the `serialVersionUID` as `1L`. Same for all predefined options.

##########
File path: 
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/EmbeddedRocksDBStateBackend.java
##########
@@ -497,45 +508,58 @@ private RocksDBOptionsFactory configureOptionsFactory(
             }
             LOG.info("Using application-defined options factory: {}.", 
originalOptionsFactory);
 
-            return originalOptionsFactory;
-        }
-
-        // if using DefaultConfigurableOptionsFactory by default, we could 
avoid reflection to speed
-        // up.
-        if 
(factoryClassName.equalsIgnoreCase(DefaultConfigurableOptionsFactory.class.getName()))
 {
-            DefaultConfigurableOptionsFactory optionsFactory =
-                    new DefaultConfigurableOptionsFactory();
-            optionsFactory.configure(config);
-            LOG.info("Using default options factory: {}.", optionsFactory);
-
-            return optionsFactory;
-        } else {
-            try {
-                Class<? extends RocksDBOptionsFactory> clazz =
-                        Class.forName(factoryClassName, false, classLoader)
-                                .asSubclass(RocksDBOptionsFactory.class);
-
-                RocksDBOptionsFactory optionsFactory = clazz.newInstance();
-                if (optionsFactory instanceof 
ConfigurableRocksDBOptionsFactory) {
-                    optionsFactory =
-                            ((ConfigurableRocksDBOptionsFactory) 
optionsFactory).configure(config);
+            optionsFactory = originalOptionsFactory;
+        } else if (factoryClassName != null) {
+            // Do nothing if user does not define any factory class.
+            if (factoryClassName.equalsIgnoreCase(
+                    DefaultConfigurableOptionsFactory.class.getName())) {
+                // From FLINK-24046, we deprecate the 
DefaultConfigurableOptionsFactory.
+                LOG.warn(
+                        "{} is deprecated. Please remove this value from the 
configuration."
+                                + "It is safe to do so since the configurable 
options will be loaded "
+                                + "in other place. For more information, 
please refer to FLINK-24046.",
+                        DefaultConfigurableOptionsFactory.class.getName());
+            } else {
+                try {
+                    Class<? extends RocksDBOptionsFactory> clazz =
+                            Class.forName(factoryClassName, false, classLoader)
+                                    .asSubclass(RocksDBOptionsFactory.class);
+
+                    optionsFactory = clazz.newInstance();
+                    if (optionsFactory instanceof 
ConfigurableRocksDBOptionsFactory) {
+                        optionsFactory =
+                                ((ConfigurableRocksDBOptionsFactory) 
optionsFactory)
+                                        .configure(config);
+                    }
+                    LOG.info("Using configured options factory: {}.", 
optionsFactory);
+
+                } catch (ClassNotFoundException e) {
+                    throw new DynamicCodeLoadingException(
+                            "Cannot find configured options factory class: " + 
factoryClassName, e);
+                } catch (ClassCastException | InstantiationException | 
IllegalAccessException e) {
+                    throw new DynamicCodeLoadingException(
+                            "The class configured under '"
+                                    + RocksDBOptions.OPTIONS_FACTORY.key()
+                                    + "' is not a valid options factory ("
+                                    + factoryClassName
+                                    + ')',
+                            e);
                 }
-                LOG.info("Using configured options factory: {}.", 
optionsFactory);
-
-                return optionsFactory;
-            } catch (ClassNotFoundException e) {
-                throw new DynamicCodeLoadingException(
-                        "Cannot find configured options factory class: " + 
factoryClassName, e);
-            } catch (ClassCastException | InstantiationException | 
IllegalAccessException e) {
-                throw new DynamicCodeLoadingException(
-                        "The class configured under '"
-                                + RocksDBOptions.OPTIONS_FACTORY.key()
-                                + "' is not a valid options factory ("
-                                + factoryClassName
-                                + ')',
-                        e);
             }
         }
+
+        if (optionsFactory instanceof DefaultOperatorStateBackendBuilder) {
+            LOG.warn(
+                    "{} is extending from {}, which is deprecated and will be 
removed in "
+                            + "future. It is highly recommended to directly 
implement the "

Review comment:
       Typo, should be `in the future`.

##########
File path: 
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/EmbeddedRocksDBStateBackend.java
##########
@@ -497,45 +508,58 @@ private RocksDBOptionsFactory configureOptionsFactory(
             }
             LOG.info("Using application-defined options factory: {}.", 
originalOptionsFactory);
 
-            return originalOptionsFactory;
-        }
-
-        // if using DefaultConfigurableOptionsFactory by default, we could 
avoid reflection to speed
-        // up.
-        if 
(factoryClassName.equalsIgnoreCase(DefaultConfigurableOptionsFactory.class.getName()))
 {
-            DefaultConfigurableOptionsFactory optionsFactory =
-                    new DefaultConfigurableOptionsFactory();
-            optionsFactory.configure(config);
-            LOG.info("Using default options factory: {}.", optionsFactory);
-
-            return optionsFactory;
-        } else {
-            try {
-                Class<? extends RocksDBOptionsFactory> clazz =
-                        Class.forName(factoryClassName, false, classLoader)
-                                .asSubclass(RocksDBOptionsFactory.class);
-
-                RocksDBOptionsFactory optionsFactory = clazz.newInstance();
-                if (optionsFactory instanceof 
ConfigurableRocksDBOptionsFactory) {
-                    optionsFactory =
-                            ((ConfigurableRocksDBOptionsFactory) 
optionsFactory).configure(config);
+            optionsFactory = originalOptionsFactory;
+        } else if (factoryClassName != null) {
+            // Do nothing if user does not define any factory class.
+            if (factoryClassName.equalsIgnoreCase(
+                    DefaultConfigurableOptionsFactory.class.getName())) {
+                // From FLINK-24046, we deprecate the 
DefaultConfigurableOptionsFactory.
+                LOG.warn(
+                        "{} is deprecated. Please remove this value from the 
configuration."
+                                + "It is safe to do so since the configurable 
options will be loaded "
+                                + "in other place. For more information, 
please refer to FLINK-24046.",
+                        DefaultConfigurableOptionsFactory.class.getName());
+            } else {
+                try {
+                    Class<? extends RocksDBOptionsFactory> clazz =
+                            Class.forName(factoryClassName, false, classLoader)
+                                    .asSubclass(RocksDBOptionsFactory.class);
+
+                    optionsFactory = clazz.newInstance();
+                    if (optionsFactory instanceof 
ConfigurableRocksDBOptionsFactory) {
+                        optionsFactory =
+                                ((ConfigurableRocksDBOptionsFactory) 
optionsFactory)
+                                        .configure(config);
+                    }
+                    LOG.info("Using configured options factory: {}.", 
optionsFactory);
+
+                } catch (ClassNotFoundException e) {
+                    throw new DynamicCodeLoadingException(
+                            "Cannot find configured options factory class: " + 
factoryClassName, e);
+                } catch (ClassCastException | InstantiationException | 
IllegalAccessException e) {
+                    throw new DynamicCodeLoadingException(
+                            "The class configured under '"
+                                    + RocksDBOptions.OPTIONS_FACTORY.key()
+                                    + "' is not a valid options factory ("
+                                    + factoryClassName
+                                    + ')',
+                            e);
                 }
-                LOG.info("Using configured options factory: {}.", 
optionsFactory);
-
-                return optionsFactory;
-            } catch (ClassNotFoundException e) {
-                throw new DynamicCodeLoadingException(
-                        "Cannot find configured options factory class: " + 
factoryClassName, e);
-            } catch (ClassCastException | InstantiationException | 
IllegalAccessException e) {
-                throw new DynamicCodeLoadingException(
-                        "The class configured under '"
-                                + RocksDBOptions.OPTIONS_FACTORY.key()
-                                + "' is not a valid options factory ("
-                                + factoryClassName
-                                + ')',
-                        e);
             }
         }
+
+        if (optionsFactory instanceof DefaultOperatorStateBackendBuilder) {

Review comment:
       ```suggestion
           if (optionsFactory instanceof DefaultConfigurableOptionsFactory) {
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to