Github user aljoscha commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4907#discussion_r146993120
  
    --- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendLoadingTest.java
 ---
    @@ -48,75 +53,249 @@
     
        private final ClassLoader cl = getClass().getClassLoader();
     
    -   private final String backendKey = CoreOptions.STATE_BACKEND.key();
    +   private final String backendKey = 
CheckpointingOptions.STATE_BACKEND.key();
     
        // 
------------------------------------------------------------------------
    +   //  defaults
    +   // 
------------------------------------------------------------------------
     
        @Test
        public void testNoStateBackendDefined() throws Exception {
    -           assertNull(AbstractStateBackend.loadStateBackendFromConfig(new 
Configuration(), cl, null));
    +           assertNull(StateBackendLoader.loadStateBackendFromConfig(new 
Configuration(), cl, null));
        }
     
        @Test
        public void testInstantiateMemoryBackendByDefault() throws Exception {
    -           StateBackend backend = AbstractStateBackend
    -                           .loadStateBackendFromConfigOrCreateDefault(new 
Configuration(), cl, null);
    +           StateBackend backend =
    +                           
StateBackendLoader.fromApplicationOrConfigOrDefault(null, new Configuration(), 
cl, null);
     
                assertTrue(backend instanceof MemoryStateBackend);
        }
     
        @Test
    -   public void testLoadMemoryStateBackend() throws Exception {
    -           // we configure with the explicit string (rather than 
AbstractStateBackend#X_STATE_BACKEND_NAME)
    -           // to guard against config-breaking changes of the name 
    +   public void testApplicationDefinedHasPrecedence() throws Exception {
    +           final StateBackend appBackend = 
Mockito.mock(StateBackend.class);
    +
                final Configuration config = new Configuration();
                config.setString(backendKey, "jobmanager");
     
    -           StateBackend backend = AbstractStateBackend
    -                           .loadStateBackendFromConfigOrCreateDefault(new 
Configuration(), cl, null);
    +           StateBackend backend = 
StateBackendLoader.fromApplicationOrConfigOrDefault(appBackend, config, cl, 
null);
    +           assertEquals(appBackend, backend);
    +   }
     
    -           assertTrue(backend instanceof MemoryStateBackend);
    +   // 
------------------------------------------------------------------------
    +   //  Memory State Backend
    +   // 
------------------------------------------------------------------------
    +
    +   /**
    +    * Validates loading a memory state backend from the cluster 
configuration.
    +    */
    +   @Test
    +   public void testLoadMemoryStateBackendNoParameters() throws Exception {
    +           // we configure with the explicit string (rather than 
AbstractStateBackend#X_STATE_BACKEND_NAME)
    +           // to guard against config-breaking changes of the name
    +
    +           final Configuration config1 = new Configuration();
    +           config1.setString(backendKey, "jobmanager");
    +
    +           final Configuration config2 = new Configuration();
    +           config2.setString(backendKey, 
MemoryStateBackendFactory.class.getName());
    +
    +           StateBackend backend1 = 
StateBackendLoader.loadStateBackendFromConfig(config1, cl, null);
    +           StateBackend backend2 = 
StateBackendLoader.loadStateBackendFromConfig(config2, cl, null);
    +
    +           assertTrue(backend1 instanceof MemoryStateBackend);
    +           assertTrue(backend2 instanceof MemoryStateBackend);
    +   }
    +
    +   /**
    +    * Validates loading a memory state backend with additional parameters 
from the cluster configuration.
    +    */
    +   @Test
    +   public void testLoadMemoryStateWithParameters() throws Exception {
    +           final String checkpointDir = new 
Path(tmp.newFolder().toURI()).toString();
    +           final String savepointDir = new 
Path(tmp.newFolder().toURI()).toString();
    +           final Path expectedCheckpointPath = new Path(checkpointDir);
    +           final Path expectedSavepointPath = new Path(savepointDir);
    +
    +           // we configure with the explicit string (rather than 
AbstractStateBackend#X_STATE_BACKEND_NAME)
    +           // to guard against config-breaking changes of the name
    +
    +           final Configuration config1 = new Configuration();
    +           config1.setString(backendKey, "jobmanager");
    +           config1.setString(CheckpointingOptions.CHECKPOINTS_DIRECTORY, 
checkpointDir);
    +           config1.setString(CheckpointingOptions.SAVEPOINT_DIRECTORY, 
savepointDir);
    +
    +           final Configuration config2 = new Configuration();
    +           config2.setString(backendKey, 
MemoryStateBackendFactory.class.getName());
    +           config2.setString(CheckpointingOptions.CHECKPOINTS_DIRECTORY, 
checkpointDir);
    +           config2.setString(CheckpointingOptions.SAVEPOINT_DIRECTORY, 
savepointDir);
    +
    +           MemoryStateBackend backend1 = (MemoryStateBackend)
    +                           
StateBackendLoader.loadStateBackendFromConfig(config1, cl, null);
    +           MemoryStateBackend backend2 = (MemoryStateBackend)
    +                           
StateBackendLoader.loadStateBackendFromConfig(config2, cl, null);
    +
    +           assertNotNull(backend1);
    +           assertNotNull(backend2);
    +
    +           assertEquals(expectedCheckpointPath, 
backend1.getCheckpointPath());
    +           assertEquals(expectedCheckpointPath, 
backend2.getCheckpointPath());
    +           assertEquals(expectedSavepointPath, 
backend1.getSavepointPath());
    +           assertEquals(expectedSavepointPath, 
backend2.getSavepointPath());
    +   }
    +
    +   /**
    +    * Validates taking the application-defined memory state backend and 
adding with additional
    --- End diff --
    
    nit: superfluous "with"


---

Reply via email to