StefanRRichter commented on a change in pull request #7586: 
[FLINK-10912][rocksdb] Configurable RocksDBStateBackend options
URL: https://github.com/apache/flink/pull/7586#discussion_r258074857
 
 

 ##########
 File path: 
flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java
 ##########
 @@ -396,11 +400,155 @@ public void testPredefinedOptions() throws Exception {
                }
        }
 
+       @Test
+       public void testSetConfigurableOptions() throws Exception  {
+               String checkpointPath = 
tempFolder.newFolder().toURI().toString();
+               RocksDBStateBackend rocksDbBackend = new 
RocksDBStateBackend(checkpointPath);
+
+               assertNull(rocksDbBackend.getOptions());
+
+               ConfigurableOptionsFactory customizedOptions = new 
ConfigurableOptionsFactory()
+                       .setMaxBackgroundThreads(4)
+                       .setMaxOpenFiles(-1)
+                       .setCompactionStyle(CompactionStyle.LEVEL)
+                       .setUseDynamicLevelSize(true)
+                       .setTargetFileSizeBase("4MB")
+                       .setMaxSizeLevelBase("128 mb")
+                       .setWriteBufferSize("128 MB")
+                       .setMaxWriteBufferNumber(4)
+                       .setMinWriteBufferNumberToMerge(3)
+                       .setBlockSize("64KB")
+                       .setBlockCacheSize("512mb");
+
+               rocksDbBackend.setOptions(customizedOptions);
+
+               try (DBOptions dbOptions = rocksDbBackend.getDbOptions()) {
+                       assertEquals(-1, dbOptions.maxOpenFiles());
+               }
+
+               try (ColumnFamilyOptions columnOptions = 
rocksDbBackend.getColumnOptions()) {
+                       assertEquals(CompactionStyle.LEVEL, 
columnOptions.compactionStyle());
+                       
assertTrue(columnOptions.levelCompactionDynamicLevelBytes());
+                       assertEquals(4 * SizeUnit.MB, 
columnOptions.targetFileSizeBase());
+                       assertEquals(128 * SizeUnit.MB, 
columnOptions.maxBytesForLevelBase());
+                       assertEquals(4, columnOptions.maxWriteBufferNumber());
+                       assertEquals(3, 
columnOptions.minWriteBufferNumberToMerge());
+
+                       BlockBasedTableConfig tableConfig = 
(BlockBasedTableConfig) columnOptions.tableFormatConfig();
+                       assertEquals(64 * SizeUnit.KB, tableConfig.blockSize());
+                       assertEquals(512 * SizeUnit.MB, 
tableConfig.blockCacheSize());
+               }
+       }
+
+       @Test
+       public void testConfigurableOptionsFromConfig() throws IOException {
+               Configuration configuration = new Configuration();
+               
assertTrue(ConfigurableOptionsFactory.fromConfig(configuration).getConfiguredOptions().isEmpty());
+
+               // verify illegal configuration
+               {
+                       
verifyIllegalArgument(RocksDBConfigurableOptions.MAX_BACKGROUND_THREADS, "-1");
+                       
verifyIllegalArgument(RocksDBConfigurableOptions.MAX_WRITE_BUFFER_NUMBER, "-1");
+                       
verifyIllegalArgument(RocksDBConfigurableOptions.MIN_WRITE_BUFFER_NUMBER_TO_MERGE,
 "-1");
+
+                       
verifyIllegalArgument(RocksDBConfigurableOptions.TARGET_FILE_SIZE_BASE, "0KB");
+                       
verifyIllegalArgument(RocksDBConfigurableOptions.MAX_SIZE_LEVEL_BASE, "1BB");
+                       
verifyIllegalArgument(RocksDBConfigurableOptions.WRITE_BUFFER_SIZE, "-1KB");
+                       
verifyIllegalArgument(RocksDBConfigurableOptions.BLOCK_SIZE, "0MB");
+                       
verifyIllegalArgument(RocksDBConfigurableOptions.BLOCK_CACHE_SIZE, "0");
+
+                       
verifyIllegalArgument(RocksDBConfigurableOptions.USE_DYNAMIC_LEVEL_SIZE, "1");
+
+                       
verifyIllegalArgument(RocksDBConfigurableOptions.COMPACTION_STYLE, "LEV");
+               }
+
+               // verify legal configuration
+               {
+                       
configuration.setString(RocksDBConfigurableOptions.COMPACTION_STYLE, "level");
+                       
configuration.setString(RocksDBConfigurableOptions.USE_DYNAMIC_LEVEL_SIZE, 
"TRUE");
+                       
configuration.setString(RocksDBConfigurableOptions.TARGET_FILE_SIZE_BASE, "8 
mb");
+                       
configuration.setString(RocksDBConfigurableOptions.MAX_SIZE_LEVEL_BASE, 
"128MB");
+                       
configuration.setString(RocksDBConfigurableOptions.MAX_BACKGROUND_THREADS, "4");
+                       
configuration.setString(RocksDBConfigurableOptions.MAX_WRITE_BUFFER_NUMBER, 
"4");
+                       
configuration.setString(RocksDBConfigurableOptions.MIN_WRITE_BUFFER_NUMBER_TO_MERGE,
 "2");
+                       
configuration.setString(RocksDBConfigurableOptions.WRITE_BUFFER_SIZE, "64 MB");
+                       
configuration.setString(RocksDBConfigurableOptions.BLOCK_SIZE, "4 kb");
+                       
configuration.setString(RocksDBConfigurableOptions.BLOCK_CACHE_SIZE, "512 mb");
+
+                       ConfigurableOptionsFactory customizedOptions = 
ConfigurableOptionsFactory.fromConfig(configuration);
+                       String checkpointPath = 
tempFolder.newFolder().toURI().toString();
+                       RocksDBStateBackend rocksDbBackend = new 
RocksDBStateBackend(checkpointPath);
+                       rocksDbBackend.setOptions(customizedOptions);
+
+                       try (DBOptions dbOptions = 
rocksDbBackend.getDbOptions()) {
+                               assertEquals(-1, dbOptions.maxOpenFiles());
+                       }
+
+                       try (ColumnFamilyOptions columnOptions = 
rocksDbBackend.getColumnOptions()) {
+                               assertEquals(CompactionStyle.LEVEL, 
columnOptions.compactionStyle());
+                               
assertTrue(columnOptions.levelCompactionDynamicLevelBytes());
+                               assertEquals(8 * SizeUnit.MB, 
columnOptions.targetFileSizeBase());
+                               assertEquals(128 * SizeUnit.MB, 
columnOptions.maxBytesForLevelBase());
+                               assertEquals(4, 
columnOptions.maxWriteBufferNumber());
+                               assertEquals(2, 
columnOptions.minWriteBufferNumberToMerge());
+                               assertEquals(64 * SizeUnit.MB, 
columnOptions.writeBufferSize());
+
+                               BlockBasedTableConfig tableConfig = 
(BlockBasedTableConfig) columnOptions.tableFormatConfig();
+                               assertEquals(4 * SizeUnit.KB, 
tableConfig.blockSize());
+                               assertEquals(512 * SizeUnit.MB, 
tableConfig.blockCacheSize());
+                       }
+               }
+       }
+
+       /**
+        * This test verify configurations from flink-conf.yaml would be 
overrode by programmatically configured.
+        */
+       @Test
+       public void testConfigOptions() throws Exception  {
+               Configuration configuration = new Configuration();
+               configuration.setString(RocksDBOptions.PREDEFINED_OPTIONS, 
PredefinedOptions.SPINNING_DISK_OPTIMIZED.name());
+               
configuration.setString(RocksDBConfigurableOptions.MAX_WRITE_BUFFER_NUMBER, 
"4");
+
+               String checkpointPath = 
tempFolder.newFolder().toURI().toString();
+               RocksDBStateBackend rocksDbBackend = new 
RocksDBStateBackend(checkpointPath);
+
+               // we re-configured predefined options as 
'PredefinedOptions.DEFAULT' programmatically.
+               rocksDbBackend.setPredefinedOptions(PredefinedOptions.DEFAULT);
+
+               ConfigurableOptionsFactory optionsFactory = new 
ConfigurableOptionsFactory().setMaxWriteBufferNumber(3);
+               // we re-configured max write-buffer number as '3' 
programmatically.
+               rocksDbBackend.setOptions(optionsFactory);
+
+               rocksDbBackend = rocksDbBackend.configure(configuration);
+
+               assertEquals(PredefinedOptions.DEFAULT, 
rocksDbBackend.getPredefinedOptions());
+
+               try (ColumnFamilyOptions columnFamilyOptions = 
rocksDbBackend.getColumnOptions()){
+                       assertEquals(3, 
columnFamilyOptions.maxWriteBufferNumber());
+               }
+       }
+
        @Test
        public void testOptionsFactory() throws Exception {
                String checkpointPath = 
tempFolder.newFolder().toURI().toString();
                RocksDBStateBackend rocksDbBackend = new 
RocksDBStateBackend(checkpointPath);
 
+               // 1. verify configure user-defined options factory through 
flink-conf.yaml
+               Configuration config = new Configuration();
+               config.setString(RocksDBOptions.OPTIONS_FACTORY.key(), 
TestOptionsFactory.class.getName());
+
+               rocksDbBackend = rocksDbBackend.configure(config);
+
+               final Environment env = 
getMockEnvironment(tempFolder.newFolder());
+               // call createKeyedStateBackend to instantiate the user-defined 
options factory.
+               createKeyedStateBackend(rocksDbBackend, env);
+
+               assertNotNull(rocksDbBackend.getOptions());
+               try (ColumnFamilyOptions colCreated = 
rocksDbBackend.getColumnOptions()) {
+                       assertEquals(CompactionStyle.UNIVERSAL, 
colCreated.compactionStyle());
 
 Review comment:
   I will try to double-check about the classloader. My feeling is that this 
code should run on the client and that is already in a user code context, so 
that we could use the context classloader of the current thread. But I prefer 
to first really check this and then will get back to you.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to