StefanRRichter commented on a change in pull request #7586: 
[FLINK-10912][rocksdb] Configurable RocksDBStateBackend options
URL: https://github.com/apache/flink/pull/7586#discussion_r258020089
 
 

 ##########
 File path: 
flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java
 ##########
 @@ -396,11 +400,155 @@ public void testPredefinedOptions() throws Exception {
                }
        }
 
+       @Test
+       public void testSetConfigurableOptions() throws Exception  {
+               String checkpointPath = 
tempFolder.newFolder().toURI().toString();
+               RocksDBStateBackend rocksDbBackend = new 
RocksDBStateBackend(checkpointPath);
+
+               assertNull(rocksDbBackend.getOptions());
+
+               ConfigurableOptionsFactory customizedOptions = new 
ConfigurableOptionsFactory()
+                       .setMaxBackgroundThreads(4)
+                       .setMaxOpenFiles(-1)
+                       .setCompactionStyle(CompactionStyle.LEVEL)
+                       .setUseDynamicLevelSize(true)
+                       .setTargetFileSizeBase("4MB")
+                       .setMaxSizeLevelBase("128 mb")
+                       .setWriteBufferSize("128 MB")
+                       .setMaxWriteBufferNumber(4)
+                       .setMinWriteBufferNumberToMerge(3)
+                       .setBlockSize("64KB")
+                       .setBlockCacheSize("512mb");
+
+               rocksDbBackend.setOptions(customizedOptions);
+
+               try (DBOptions dbOptions = rocksDbBackend.getDbOptions()) {
+                       assertEquals(-1, dbOptions.maxOpenFiles());
+               }
+
+               try (ColumnFamilyOptions columnOptions = 
rocksDbBackend.getColumnOptions()) {
+                       assertEquals(CompactionStyle.LEVEL, 
columnOptions.compactionStyle());
+                       
assertTrue(columnOptions.levelCompactionDynamicLevelBytes());
+                       assertEquals(4 * SizeUnit.MB, 
columnOptions.targetFileSizeBase());
+                       assertEquals(128 * SizeUnit.MB, 
columnOptions.maxBytesForLevelBase());
+                       assertEquals(4, columnOptions.maxWriteBufferNumber());
+                       assertEquals(3, 
columnOptions.minWriteBufferNumberToMerge());
+
+                       BlockBasedTableConfig tableConfig = 
(BlockBasedTableConfig) columnOptions.tableFormatConfig();
+                       assertEquals(64 * SizeUnit.KB, tableConfig.blockSize());
+                       assertEquals(512 * SizeUnit.MB, 
tableConfig.blockCacheSize());
+               }
+       }
+
+       @Test
+       public void testConfigurableOptionsFromConfig() throws IOException {
+               Configuration configuration = new Configuration();
+               
assertTrue(ConfigurableOptionsFactory.fromConfig(configuration).getConfiguredOptions().isEmpty());
+
+               // verify illegal configuration
+               {
+                       
verifyIllegalArgument(RocksDBConfigurableOptions.MAX_BACKGROUND_THREADS, "-1");
+                       
verifyIllegalArgument(RocksDBConfigurableOptions.MAX_WRITE_BUFFER_NUMBER, "-1");
+                       
verifyIllegalArgument(RocksDBConfigurableOptions.MIN_WRITE_BUFFER_NUMBER_TO_MERGE,
 "-1");
+
+                       
verifyIllegalArgument(RocksDBConfigurableOptions.TARGET_FILE_SIZE_BASE, "0KB");
+                       
verifyIllegalArgument(RocksDBConfigurableOptions.MAX_SIZE_LEVEL_BASE, "1BB");
+                       
verifyIllegalArgument(RocksDBConfigurableOptions.WRITE_BUFFER_SIZE, "-1KB");
+                       
verifyIllegalArgument(RocksDBConfigurableOptions.BLOCK_SIZE, "0MB");
+                       
verifyIllegalArgument(RocksDBConfigurableOptions.BLOCK_CACHE_SIZE, "0");
+
+                       
verifyIllegalArgument(RocksDBConfigurableOptions.USE_DYNAMIC_LEVEL_SIZE, "1");
+
+                       
verifyIllegalArgument(RocksDBConfigurableOptions.COMPACTION_STYLE, "LEV");
+               }
+
+               // verify legal configuration
+               {
+                       
configuration.setString(RocksDBConfigurableOptions.COMPACTION_STYLE, "level");
+                       
configuration.setString(RocksDBConfigurableOptions.USE_DYNAMIC_LEVEL_SIZE, 
"TRUE");
+                       
configuration.setString(RocksDBConfigurableOptions.TARGET_FILE_SIZE_BASE, "8 
mb");
+                       
configuration.setString(RocksDBConfigurableOptions.MAX_SIZE_LEVEL_BASE, 
"128MB");
+                       
configuration.setString(RocksDBConfigurableOptions.MAX_BACKGROUND_THREADS, "4");
+                       
configuration.setString(RocksDBConfigurableOptions.MAX_WRITE_BUFFER_NUMBER, 
"4");
+                       
configuration.setString(RocksDBConfigurableOptions.MIN_WRITE_BUFFER_NUMBER_TO_MERGE,
 "2");
+                       
configuration.setString(RocksDBConfigurableOptions.WRITE_BUFFER_SIZE, "64 MB");
+                       
configuration.setString(RocksDBConfigurableOptions.BLOCK_SIZE, "4 kb");
+                       
configuration.setString(RocksDBConfigurableOptions.BLOCK_CACHE_SIZE, "512 mb");
+
+                       ConfigurableOptionsFactory customizedOptions = 
ConfigurableOptionsFactory.fromConfig(configuration);
+                       String checkpointPath = 
tempFolder.newFolder().toURI().toString();
+                       RocksDBStateBackend rocksDbBackend = new 
RocksDBStateBackend(checkpointPath);
+                       rocksDbBackend.setOptions(customizedOptions);
+
+                       try (DBOptions dbOptions = 
rocksDbBackend.getDbOptions()) {
+                               assertEquals(-1, dbOptions.maxOpenFiles());
+                       }
+
+                       try (ColumnFamilyOptions columnOptions = 
rocksDbBackend.getColumnOptions()) {
+                               assertEquals(CompactionStyle.LEVEL, 
columnOptions.compactionStyle());
+                               
assertTrue(columnOptions.levelCompactionDynamicLevelBytes());
+                               assertEquals(8 * SizeUnit.MB, 
columnOptions.targetFileSizeBase());
+                               assertEquals(128 * SizeUnit.MB, 
columnOptions.maxBytesForLevelBase());
+                               assertEquals(4, 
columnOptions.maxWriteBufferNumber());
+                               assertEquals(2, 
columnOptions.minWriteBufferNumberToMerge());
+                               assertEquals(64 * SizeUnit.MB, 
columnOptions.writeBufferSize());
+
+                               BlockBasedTableConfig tableConfig = 
(BlockBasedTableConfig) columnOptions.tableFormatConfig();
+                               assertEquals(4 * SizeUnit.KB, 
tableConfig.blockSize());
+                               assertEquals(512 * SizeUnit.MB, 
tableConfig.blockCacheSize());
+                       }
+               }
+       }
+
+       /**
+        * This test verify configurations from flink-conf.yaml would be 
overrode by programmatically configured.
+        */
+       @Test
+       public void testConfigOptions() throws Exception  {
+               Configuration configuration = new Configuration();
+               configuration.setString(RocksDBOptions.PREDEFINED_OPTIONS, 
PredefinedOptions.SPINNING_DISK_OPTIMIZED.name());
+               
configuration.setString(RocksDBConfigurableOptions.MAX_WRITE_BUFFER_NUMBER, 
"4");
+
+               String checkpointPath = 
tempFolder.newFolder().toURI().toString();
+               RocksDBStateBackend rocksDbBackend = new 
RocksDBStateBackend(checkpointPath);
+
+               // we re-configured predefined options as 
'PredefinedOptions.DEFAULT' programmatically.
+               rocksDbBackend.setPredefinedOptions(PredefinedOptions.DEFAULT);
+
+               ConfigurableOptionsFactory optionsFactory = new 
ConfigurableOptionsFactory().setMaxWriteBufferNumber(3);
+               // we re-configured max write-buffer number as '3' 
programmatically.
+               rocksDbBackend.setOptions(optionsFactory);
+
+               rocksDbBackend = rocksDbBackend.configure(configuration);
+
+               assertEquals(PredefinedOptions.DEFAULT, 
rocksDbBackend.getPredefinedOptions());
+
+               try (ColumnFamilyOptions columnFamilyOptions = 
rocksDbBackend.getColumnOptions()){
+                       assertEquals(3, 
columnFamilyOptions.maxWriteBufferNumber());
+               }
+       }
+
        @Test
        public void testOptionsFactory() throws Exception {
                String checkpointPath = 
tempFolder.newFolder().toURI().toString();
                RocksDBStateBackend rocksDbBackend = new 
RocksDBStateBackend(checkpointPath);
 
+               // 1. verify configure user-defined options factory through 
flink-conf.yaml
+               Configuration config = new Configuration();
+               config.setString(RocksDBOptions.OPTIONS_FACTORY.key(), 
TestOptionsFactory.class.getName());
+
+               rocksDbBackend = rocksDbBackend.configure(config);
+
+               final Environment env = 
getMockEnvironment(tempFolder.newFolder());
+               // call createKeyedStateBackend to instantiate the user-defined 
options factory.
+               createKeyedStateBackend(rocksDbBackend, env);
+
+               assertNotNull(rocksDbBackend.getOptions());
+               try (ColumnFamilyOptions colCreated = 
rocksDbBackend.getColumnOptions()) {
+                       assertEquals(CompactionStyle.UNIVERSAL, 
colCreated.compactionStyle());
 
 Review comment:
   Why do we validate the successful setting of the options factory in such a 
complicated way? Wouln't it be enough to just check that `getOptions` returns 
an instance of `TestOptionsFactory`? There should already be another test to 
validate the order that setter overrides flink-conf and so on. This would keep 
the test more targetet or am I missing something?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to