Myasuka commented on a change in pull request #17874:
URL: https://github.com/apache/flink/pull/17874#discussion_r757203945



##########
File path: 
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/PredefinedOptions.java
##########
@@ -50,26 +56,15 @@
      * <p>The following options are set:
      *
      * <ul>
-     *   <li>setUseFsync(false)
      *   <li>setInfoLogLevel(InfoLogLevel.HEADER_LEVEL)
-     *   <li>setStatsDumpPeriodSec(0)
      * </ul>
      */
-    DEFAULT {
-
-        @Override
-        public DBOptions createDBOptions(Collection<AutoCloseable> 
handlesToClose) {
-            return new DBOptions()
-                    .setUseFsync(false)
-                    .setInfoLogLevel(InfoLogLevel.HEADER_LEVEL)
-                    .setStatsDumpPeriodSec(0);
-        }
-
-        @Override
-        public ColumnFamilyOptions 
createColumnOptions(Collection<AutoCloseable> handlesToClose) {
-            return new ColumnFamilyOptions();
-        }
-    },
+    DEFAULT(
+            new HashMap<ConfigOption<?>, Object>() {
+                {
+                    put(RocksDBConfigurableOptions.LOG_LEVEL, 
InfoLogLevel.HEADER_LEVEL);
+                }
+            }),

Review comment:
       This could be simpilfied to 
`Collections.singletonMap(RocksDBConfigurableOptions.LOG_LEVEL, 
InfoLogLevel.HEADER_LEVEL)`

##########
File path: 
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/PredefinedOptions.java
##########
@@ -131,55 +115,35 @@ public ColumnFamilyOptions 
createColumnOptions(Collection<AutoCloseable> handles
      *   <li>setIncreaseParallelism(4)
      *   <li>setMinWriteBufferNumberToMerge(3)
      *   <li>setMaxWriteBufferNumber(4)
-     *   <li>setUseFsync(false)
      *   <li>setMaxOpenFiles(-1)
      *   <li>setInfoLogLevel(InfoLogLevel.HEADER_LEVEL)
-     *   <li>setStatsDumpPeriodSec(0)
      *   <li>BlockBasedTableConfig.setBlockCacheSize(256 MBytes)
      *   <li>BlockBasedTableConfigsetBlockSize(128 KBytes)

Review comment:
       Maybe we can also fix this to `BlockBasedTableConfig.setBlockSize(128 
KBytes)`

##########
File path: 
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBResourceContainer.java
##########
@@ -245,6 +268,121 @@ private boolean 
overwriteFilterIfExist(BlockBasedTableConfig blockBasedTableConf
         return true;
     }
 
+    /** Create a {@link DBOptions} for RocksDB, including some common 
settings. */
+    DBOptions createDBOptions() {

Review comment:
       I think this method should be renamed to `createBaseCommonDBOptions` 
with `private` scope.

##########
File path: 
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/EmbeddedRocksDBStateBackend.java
##########
@@ -782,6 +791,29 @@ public void setWriteBatchSize(long writeBatchSize) {
     //  utilities
     // ------------------------------------------------------------------------
 
+    private ReadableConfig mergeConfigurableOptions(ReadableConfig base, 
ReadableConfig onTop) {
+        if (base == null) {
+            base = new Configuration();
+        }
+        if (onTop == null) {

Review comment:
       Actually, the `onTop` cannot be null.

##########
File path: 
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBConfigurableOptions.java
##########
@@ -55,23 +57,23 @@
     public static final ConfigOption<Integer> MAX_BACKGROUND_THREADS =
             key("state.backend.rocksdb.thread.num")
                     .intType()
-                    .noDefaultValue()
+                    .defaultValue(1)

Review comment:
       I noticed that RocksDB has changed the default value to 2, I will create 
a hotfix for this.
   Once we fill `RocksDBConfigurableOptions` with default values, we should not 
say "RocksDB has default configuration " as Flink control the configurations 
totally.

##########
File path: 
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBResourceContainer.java
##########
@@ -245,6 +268,121 @@ private boolean 
overwriteFilterIfExist(BlockBasedTableConfig blockBasedTableConf
         return true;
     }
 
+    /** Create a {@link DBOptions} for RocksDB, including some common 
settings. */
+    DBOptions createDBOptions() {
+        return new DBOptions().setUseFsync(false).setStatsDumpPeriodSec(0);
+    }
+
+    /** Create a {@link ColumnFamilyOptions} for RocksDB, including some 
common settings. */
+    ColumnFamilyOptions createColumnOptions() {

Review comment:
       I think this method should be renamed to `createBaseColumnOptions` with 
`private` scope.

##########
File path: 
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/EmbeddedRocksDBStateBackend.java
##########
@@ -782,6 +791,29 @@ public void setWriteBatchSize(long writeBatchSize) {
     //  utilities
     // ------------------------------------------------------------------------
 
+    private ReadableConfig mergeConfigurableOptions(ReadableConfig base, 
ReadableConfig onTop) {
+        if (base == null) {
+            base = new Configuration();
+        }
+        if (onTop == null) {
+            onTop = new Configuration();
+        }
+        Configuration configuration = new Configuration();
+        for (ConfigOption<?> option : 
RocksDBConfigurableOptions.CANDIDATE_CONFIGS) {
+            Optional<?> baseValue = base.getOptional(option);
+            Optional<?> topValue = onTop.getOptional(option);
+
+            if (topValue.isPresent()) {
+                RocksDBConfigurableOptions.checkArgumentValid(option, 
topValue.get());
+                configuration.setString(option.key(), 
topValue.get().toString());
+            } else if (baseValue.isPresent()) {
+                RocksDBConfigurableOptions.checkArgumentValid(option, 
baseValue.get());
+                configuration.setString(option.key(), 
baseValue.get().toString());
+            }

Review comment:
       I think code below looks better:
   
   ~~~ java
               if (topValue.isPresent() || baseValue.isPresent()) {
                   Object validValue = topValue.isPresent() ? topValue.get() : 
baseValue.get();
                   RocksDBConfigurableOptions.checkArgumentValid(option, 
validValue);
                   configuration.setString(option.key(), validValue.toString());
               }
   ~~~

##########
File path: 
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/EmbeddedRocksDBStateBackend.java
##########
@@ -500,15 +509,15 @@ private RocksDBOptionsFactory configureOptionsFactory(
             return originalOptionsFactory;
         }
 
-        // if using DefaultConfigurableOptionsFactory by default, we could 
avoid reflection to speed
-        // up.
-        if 
(factoryClassName.equalsIgnoreCase(DefaultConfigurableOptionsFactory.class.getName()))
 {
-            DefaultConfigurableOptionsFactory optionsFactory =
-                    new DefaultConfigurableOptionsFactory();
-            optionsFactory.configure(config);
-            LOG.info("Using default options factory: {}.", optionsFactory);
-
-            return optionsFactory;
+        // From FLINK-24046, we deprecate the 
DefaultConfigurableOptionsFactory.
+        if (factoryClassName == null) {
+            return null;
+        } else if (factoryClassName.equalsIgnoreCase(
+                DefaultConfigurableOptionsFactory.class.getName())) {
+            LOG.info(
+                    "{} is deprecated. Please remove this value from the 
configuration.",

Review comment:
       I think we should tell whether it's safe for users to remove this from 
configuration.
   

##########
File path: 
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBConfigurableOptions.java
##########
@@ -248,4 +250,78 @@
                     .defaultValue(false)
                     .withDescription(
                             "If true, RocksDB will use block-based filter 
instead of full filter, this only take effect when bloom filter is used.");
+
+    public static final ConfigOption<?>[] CANDIDATE_CONFIGS =

Review comment:
       This could be a package public field.

##########
File path: 
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/PredefinedOptions.java
##########
@@ -83,35 +78,24 @@ public ColumnFamilyOptions 
createColumnOptions(Collection<AutoCloseable> handles
      *   <li>setCompactionStyle(CompactionStyle.LEVEL)
      *   <li>setLevelCompactionDynamicLevelBytes(true)
      *   <li>setIncreaseParallelism(4)
-     *   <li>setUseFsync(false)
      *   <li>setDisableDataSync(true)
      *   <li>setMaxOpenFiles(-1)
      *   <li>setInfoLogLevel(InfoLogLevel.HEADER_LEVEL)
-     *   <li>setStatsDumpPeriodSec(0)
      * </ul>
      *
      * <p>Note: Because Flink does not rely on RocksDB data on disk for 
recovery, there is no need
      * to sync data to stable storage.
      */
-    SPINNING_DISK_OPTIMIZED {
-
-        @Override
-        public DBOptions createDBOptions(Collection<AutoCloseable> 
handlesToClose) {
-            return new DBOptions()
-                    .setIncreaseParallelism(4)
-                    .setUseFsync(false)
-                    .setMaxOpenFiles(-1)
-                    .setInfoLogLevel(InfoLogLevel.HEADER_LEVEL)
-                    .setStatsDumpPeriodSec(0);
-        }
-
-        @Override
-        public ColumnFamilyOptions 
createColumnOptions(Collection<AutoCloseable> handlesToClose) {
-            return new ColumnFamilyOptions()
-                    .setCompactionStyle(CompactionStyle.LEVEL)
-                    .setLevelCompactionDynamicLevelBytes(true);
-        }
-    },
+    SPINNING_DISK_OPTIMIZED(
+            new HashMap<ConfigOption<?>, Object>() {
+                {
+                    put(RocksDBConfigurableOptions.MAX_BACKGROUND_THREADS, 4);
+                    put(RocksDBConfigurableOptions.MAX_OPEN_FILES, -1);
+                    put(RocksDBConfigurableOptions.LOG_LEVEL, 
InfoLogLevel.HEADER_LEVEL);
+                    put(RocksDBConfigurableOptions.COMPACTION_STYLE, 
CompactionStyle.LEVEL);
+                    put(RocksDBConfigurableOptions.USE_DYNAMIC_LEVEL_SIZE, 
true);
+                }

Review comment:
       If choose to use anonymous class way to create a new hash map, these 
methdos should declare `serialVersionUID`

##########
File path: 
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBResourceContainer.java
##########
@@ -127,10 +147,13 @@ public Long getWriteBufferManagerCapacity() {
 
     /** Gets the RocksDB {@link ColumnFamilyOptions} to be used for all 
RocksDB instances. */
     public ColumnFamilyOptions getColumnOptions() {
-        // initial options from pre-defined profile
-        ColumnFamilyOptions opt = 
predefinedOptions.createColumnOptions(handlesToClose);
+        // initial options from common profile
+        ColumnFamilyOptions opt = createColumnOptions();
         handlesToClose.add(opt);
 
+        // load configurable options on top of pre-defined profile
+        opt = getColumnFamilyOptionsFromConfigurableOptions(opt, 
handlesToClose);

Review comment:
       There is no need to reassign `opt` here.

##########
File path: 
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/DefaultConfigurableOptionsFactory.java
##########
@@ -65,7 +65,13 @@
  * An implementation of {@link ConfigurableRocksDBOptionsFactory} using 
options provided by {@link
  * RocksDBConfigurableOptions}. It acts as the default options factory within 
{@link
  * EmbeddedRocksDBStateBackend} if the user did not define a {@link 
RocksDBOptionsFactory}.
+ *
+ * <p>After FLINK-24046, we refactor the config procedure for RocksDB. User 
could use {@link
+ * ConfigurableRocksDBOptionsFactory} to apply some customized options. 
Besides this, we load the
+ * configurable options in {@link RocksDBResourceContainer} instead of {@link
+ * DefaultConfigurableOptionsFactory}. Thus, we mark this factory Deprecated.
  */
+@Deprecated
 public class DefaultConfigurableOptionsFactory implements 
ConfigurableRocksDBOptionsFactory {

Review comment:
       Once we deprecate this class, we should also remove its usage as mush as 
possible in the project.
   I think we'd better remove the usage in `test_state_backend.py` and 
`RocksDBStateBackendConfigTest`

##########
File path: 
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBResourceContainer.java
##########
@@ -91,10 +108,13 @@ public RocksDBResourceContainer(
 
     /** Gets the RocksDB {@link DBOptions} to be used for RocksDB instances. */
     public DBOptions getDbOptions() {
-        // initial options from pre-defined profile
-        DBOptions opt = predefinedOptions.createDBOptions(handlesToClose);
+        // initial options from common profile
+        DBOptions opt = createDBOptions();
         handlesToClose.add(opt);
 
+        // load configurable options on top of pre-defined profile
+        opt = getDBOptionsFromConfigurableOptions(opt, handlesToClose);

Review comment:
       There is no need to reassign `opt` here. Moreover, there is no need to 
pass `handlesToClose` in.

##########
File path: 
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBConfigurableOptions.java
##########
@@ -248,4 +250,78 @@
                     .defaultValue(false)
                     .withDescription(
                             "If true, RocksDB will use block-based filter 
instead of full filter, this only take effect when bloom filter is used.");
+
+    public static final ConfigOption<?>[] CANDIDATE_CONFIGS =
+            new ConfigOption<?>[] {
+                // configurable DBOptions
+                MAX_BACKGROUND_THREADS,
+                MAX_OPEN_FILES,
+                LOG_LEVEL,
+                LOG_MAX_FILE_SIZE,
+                LOG_FILE_NUM,
+                LOG_DIR,
+
+                // configurable ColumnFamilyOptions
+                COMPACTION_STYLE,
+                USE_DYNAMIC_LEVEL_SIZE,
+                TARGET_FILE_SIZE_BASE,
+                MAX_SIZE_LEVEL_BASE,
+                WRITE_BUFFER_SIZE,
+                MAX_WRITE_BUFFER_NUMBER,
+                MIN_WRITE_BUFFER_NUMBER_TO_MERGE,
+                BLOCK_SIZE,
+                METADATA_BLOCK_SIZE,
+                BLOCK_CACHE_SIZE,
+                USE_BLOOM_FILTER,
+                BLOOM_FILTER_BITS_PER_KEY,
+                BLOOM_FILTER_BLOCK_BASED_MODE
+            };
+
+    private static final Set<ConfigOption<?>> POSITIVE_INT_CONFIG_SET =
+            new HashSet<>(
+                    Arrays.asList(
+                            MAX_BACKGROUND_THREADS,
+                            LOG_FILE_NUM,
+                            MAX_WRITE_BUFFER_NUMBER,
+                            MIN_WRITE_BUFFER_NUMBER_TO_MERGE));
+
+    private static final Set<ConfigOption<?>> SIZE_CONFIG_SET =
+            new HashSet<>(
+                    Arrays.asList(
+                            TARGET_FILE_SIZE_BASE,
+                            MAX_SIZE_LEVEL_BASE,
+                            WRITE_BUFFER_SIZE,
+                            BLOCK_SIZE,
+                            METADATA_BLOCK_SIZE,
+                            BLOCK_CACHE_SIZE));
+
+    /**
+     * Helper method to check whether the (key,value) is valid through given 
configuration and
+     * returns the formatted value.
+     *
+     * @param option The configuration key which is configurable in {@link
+     *     RocksDBConfigurableOptions}.
+     * @param value The value within given configuration.
+     */
+    public static void checkArgumentValid(ConfigOption<?> option, Object 
value) {

Review comment:
       This method could be a package public method.

##########
File path: 
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/EmbeddedRocksDBStateBackend.java
##########
@@ -500,15 +509,15 @@ private RocksDBOptionsFactory configureOptionsFactory(
             return originalOptionsFactory;
         }
 
-        // if using DefaultConfigurableOptionsFactory by default, we could 
avoid reflection to speed
-        // up.
-        if 
(factoryClassName.equalsIgnoreCase(DefaultConfigurableOptionsFactory.class.getName()))
 {
-            DefaultConfigurableOptionsFactory optionsFactory =
-                    new DefaultConfigurableOptionsFactory();
-            optionsFactory.configure(config);
-            LOG.info("Using default options factory: {}.", optionsFactory);
-
-            return optionsFactory;
+        // From FLINK-24046, we deprecate the 
DefaultConfigurableOptionsFactory.
+        if (factoryClassName == null) {
+            return null;
+        } else if (factoryClassName.equalsIgnoreCase(
+                DefaultConfigurableOptionsFactory.class.getName())) {
+            LOG.info(
+                    "{} is deprecated. Please remove this value from the 
configuration.",

Review comment:
       I think we should also tell users if the configured 
`originalOptionsFactory` or `factoryClassName` is actually extending from 
`DefaultConfigurableOptionsFactory`. If so, please print related warnings.

##########
File path: 
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/DefaultConfigurableOptionsFactory.java
##########
@@ -65,7 +65,13 @@
  * An implementation of {@link ConfigurableRocksDBOptionsFactory} using 
options provided by {@link
  * RocksDBConfigurableOptions}. It acts as the default options factory within 
{@link
  * EmbeddedRocksDBStateBackend} if the user did not define a {@link 
RocksDBOptionsFactory}.
+ *
+ * <p>After FLINK-24046, we refactor the config procedure for RocksDB. User 
could use {@link
+ * ConfigurableRocksDBOptionsFactory} to apply some customized options. 
Besides this, we load the
+ * configurable options in {@link RocksDBResourceContainer} instead of {@link
+ * DefaultConfigurableOptionsFactory}. Thus, we mark this factory Deprecated.

Review comment:
       I think we should tell user explictly that this class is ignored for 
general case and still kept for backward compatibility if user still leverage 
this class.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to