Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/4907#discussion_r146993120 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendLoadingTest.java --- @@ -48,75 +53,249 @@ private final ClassLoader cl = getClass().getClassLoader(); - private final String backendKey = CoreOptions.STATE_BACKEND.key(); + private final String backendKey = CheckpointingOptions.STATE_BACKEND.key(); // ------------------------------------------------------------------------ + // defaults + // ------------------------------------------------------------------------ @Test public void testNoStateBackendDefined() throws Exception { - assertNull(AbstractStateBackend.loadStateBackendFromConfig(new Configuration(), cl, null)); + assertNull(StateBackendLoader.loadStateBackendFromConfig(new Configuration(), cl, null)); } @Test public void testInstantiateMemoryBackendByDefault() throws Exception { - StateBackend backend = AbstractStateBackend - .loadStateBackendFromConfigOrCreateDefault(new Configuration(), cl, null); + StateBackend backend = + StateBackendLoader.fromApplicationOrConfigOrDefault(null, new Configuration(), cl, null); assertTrue(backend instanceof MemoryStateBackend); } @Test - public void testLoadMemoryStateBackend() throws Exception { - // we configure with the explicit string (rather than AbstractStateBackend#X_STATE_BACKEND_NAME) - // to guard against config-breaking changes of the name + public void testApplicationDefinedHasPrecedence() throws Exception { + final StateBackend appBackend = Mockito.mock(StateBackend.class); + final Configuration config = new Configuration(); config.setString(backendKey, "jobmanager"); - StateBackend backend = AbstractStateBackend - .loadStateBackendFromConfigOrCreateDefault(new Configuration(), cl, null); + StateBackend backend = StateBackendLoader.fromApplicationOrConfigOrDefault(appBackend, config, cl, null); + assertEquals(appBackend, backend); + } - assertTrue(backend instanceof MemoryStateBackend); + // ------------------------------------------------------------------------ + // Memory State Backend + // ------------------------------------------------------------------------ + + /** + * Validates loading a memory state backend from the cluster configuration. + */ + @Test + public void testLoadMemoryStateBackendNoParameters() throws Exception { + // we configure with the explicit string (rather than AbstractStateBackend#X_STATE_BACKEND_NAME) + // to guard against config-breaking changes of the name + + final Configuration config1 = new Configuration(); + config1.setString(backendKey, "jobmanager"); + + final Configuration config2 = new Configuration(); + config2.setString(backendKey, MemoryStateBackendFactory.class.getName()); + + StateBackend backend1 = StateBackendLoader.loadStateBackendFromConfig(config1, cl, null); + StateBackend backend2 = StateBackendLoader.loadStateBackendFromConfig(config2, cl, null); + + assertTrue(backend1 instanceof MemoryStateBackend); + assertTrue(backend2 instanceof MemoryStateBackend); + } + + /** + * Validates loading a memory state backend with additional parameters from the cluster configuration. + */ + @Test + public void testLoadMemoryStateWithParameters() throws Exception { + final String checkpointDir = new Path(tmp.newFolder().toURI()).toString(); + final String savepointDir = new Path(tmp.newFolder().toURI()).toString(); + final Path expectedCheckpointPath = new Path(checkpointDir); + final Path expectedSavepointPath = new Path(savepointDir); + + // we configure with the explicit string (rather than AbstractStateBackend#X_STATE_BACKEND_NAME) + // to guard against config-breaking changes of the name + + final Configuration config1 = new Configuration(); + config1.setString(backendKey, "jobmanager"); + config1.setString(CheckpointingOptions.CHECKPOINTS_DIRECTORY, checkpointDir); + config1.setString(CheckpointingOptions.SAVEPOINT_DIRECTORY, savepointDir); + + final Configuration config2 = new Configuration(); + config2.setString(backendKey, MemoryStateBackendFactory.class.getName()); + config2.setString(CheckpointingOptions.CHECKPOINTS_DIRECTORY, checkpointDir); + config2.setString(CheckpointingOptions.SAVEPOINT_DIRECTORY, savepointDir); + + MemoryStateBackend backend1 = (MemoryStateBackend) + StateBackendLoader.loadStateBackendFromConfig(config1, cl, null); + MemoryStateBackend backend2 = (MemoryStateBackend) + StateBackendLoader.loadStateBackendFromConfig(config2, cl, null); + + assertNotNull(backend1); + assertNotNull(backend2); + + assertEquals(expectedCheckpointPath, backend1.getCheckpointPath()); + assertEquals(expectedCheckpointPath, backend2.getCheckpointPath()); + assertEquals(expectedSavepointPath, backend1.getSavepointPath()); + assertEquals(expectedSavepointPath, backend2.getSavepointPath()); + } + + /** + * Validates taking the application-defined memory state backend and adding with additional --- End diff -- nit: superfluous "with"
---