azagrebin commented on a change in pull request #8479: [FLINK-11193][State 
Backends]Use user passed configuration overriding default configuration loading 
from file
URL: https://github.com/apache/flink/pull/8479#discussion_r321285268
 
 

 ##########
 File path: 
flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java
 ##########
 @@ -172,6 +173,40 @@ public void testConfigureTimerService() throws Exception {
                keyedBackend.dispose();
        }
 
+       /**
+        * Validates that user custom configuration from code should override 
the flink-conf.yaml.
+        * @throws Exception
+        */
+       @Test
+       public void testConfigureTimerServiceLoadingFromApplication() throws 
Exception {
+               final Environment env = 
getMockEnvironment(tempFolder.newFolder());
+
+               RocksDBStateBackend backend = new 
RocksDBStateBackend(tempFolder.newFolder().toURI().toString());
+               Configuration config = new Configuration();
+               config.setString(
+                       RocksDBOptions.TIMER_SERVICE_FACTORY,
+                       
RocksDBStateBackend.PriorityQueueStateType.ROCKSDB.toString());
+               backend = backend.configure(config, 
Thread.currentThread().getContextClassLoader());
+
+               RocksDBKeyedStateBackend<Integer> keyedBackend = 
createKeyedStateBackend(backend, env);
+
+               Assert.assertEquals(RocksDBPriorityQueueSetFactory.class, 
keyedBackend.getPriorityQueueFactory().getClass());
+
+               Configuration configFromConfFile = new Configuration();
+               configFromConfFile.setString(
+                       RocksDBOptions.TIMER_SERVICE_FACTORY,
+                       
RocksDBStateBackend.PriorityQueueStateType.HEAP.toString());
+
+               StateBackend loadedBackend = 
StateBackendLoader.fromApplicationOrConfigOrDefault(backend, configFromConfFile,
+                       Thread.currentThread().getContextClassLoader(), null);
+
+               assertTrue(loadedBackend instanceof RocksDBStateBackend);
+               final RocksDBStateBackend rocksDBStateBackend = 
(RocksDBStateBackend) loadedBackend;
+
+               RocksDBKeyedStateBackend<Integer> keyedBackend2 = 
createKeyedStateBackend(rocksDBStateBackend, env);
+               Assert.assertEquals(RocksDBPriorityQueueSetFactory.class, 
keyedBackend2.getPriorityQueueFactory().getClass());
 
 Review comment:
   `keyedBackend(2).dispose();` at the end

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to