[
https://issues.apache.org/jira/browse/FLINK-5823?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16219567#comment-16219567
]
ASF GitHub Bot commented on FLINK-5823:
---------------------------------------
Github user aljoscha commented on a diff in the pull request:
https://github.com/apache/flink/pull/4907#discussion_r146993120
--- Diff:
flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendLoadingTest.java
---
@@ -48,75 +53,249 @@
private final ClassLoader cl = getClass().getClassLoader();
- private final String backendKey = CoreOptions.STATE_BACKEND.key();
+ private final String backendKey =
CheckpointingOptions.STATE_BACKEND.key();
//
------------------------------------------------------------------------
+ // defaults
+ //
------------------------------------------------------------------------
@Test
public void testNoStateBackendDefined() throws Exception {
- assertNull(AbstractStateBackend.loadStateBackendFromConfig(new
Configuration(), cl, null));
+ assertNull(StateBackendLoader.loadStateBackendFromConfig(new
Configuration(), cl, null));
}
@Test
public void testInstantiateMemoryBackendByDefault() throws Exception {
- StateBackend backend = AbstractStateBackend
- .loadStateBackendFromConfigOrCreateDefault(new
Configuration(), cl, null);
+ StateBackend backend =
+
StateBackendLoader.fromApplicationOrConfigOrDefault(null, new Configuration(),
cl, null);
assertTrue(backend instanceof MemoryStateBackend);
}
@Test
- public void testLoadMemoryStateBackend() throws Exception {
- // we configure with the explicit string (rather than
AbstractStateBackend#X_STATE_BACKEND_NAME)
- // to guard against config-breaking changes of the name
+ public void testApplicationDefinedHasPrecedence() throws Exception {
+ final StateBackend appBackend =
Mockito.mock(StateBackend.class);
+
final Configuration config = new Configuration();
config.setString(backendKey, "jobmanager");
- StateBackend backend = AbstractStateBackend
- .loadStateBackendFromConfigOrCreateDefault(new
Configuration(), cl, null);
+ StateBackend backend =
StateBackendLoader.fromApplicationOrConfigOrDefault(appBackend, config, cl,
null);
+ assertEquals(appBackend, backend);
+ }
- assertTrue(backend instanceof MemoryStateBackend);
+ //
------------------------------------------------------------------------
+ // Memory State Backend
+ //
------------------------------------------------------------------------
+
+ /**
+ * Validates loading a memory state backend from the cluster
configuration.
+ */
+ @Test
+ public void testLoadMemoryStateBackendNoParameters() throws Exception {
+ // we configure with the explicit string (rather than
AbstractStateBackend#X_STATE_BACKEND_NAME)
+ // to guard against config-breaking changes of the name
+
+ final Configuration config1 = new Configuration();
+ config1.setString(backendKey, "jobmanager");
+
+ final Configuration config2 = new Configuration();
+ config2.setString(backendKey,
MemoryStateBackendFactory.class.getName());
+
+ StateBackend backend1 =
StateBackendLoader.loadStateBackendFromConfig(config1, cl, null);
+ StateBackend backend2 =
StateBackendLoader.loadStateBackendFromConfig(config2, cl, null);
+
+ assertTrue(backend1 instanceof MemoryStateBackend);
+ assertTrue(backend2 instanceof MemoryStateBackend);
+ }
+
+ /**
+ * Validates loading a memory state backend with additional parameters
from the cluster configuration.
+ */
+ @Test
+ public void testLoadMemoryStateWithParameters() throws Exception {
+ final String checkpointDir = new
Path(tmp.newFolder().toURI()).toString();
+ final String savepointDir = new
Path(tmp.newFolder().toURI()).toString();
+ final Path expectedCheckpointPath = new Path(checkpointDir);
+ final Path expectedSavepointPath = new Path(savepointDir);
+
+ // we configure with the explicit string (rather than
AbstractStateBackend#X_STATE_BACKEND_NAME)
+ // to guard against config-breaking changes of the name
+
+ final Configuration config1 = new Configuration();
+ config1.setString(backendKey, "jobmanager");
+ config1.setString(CheckpointingOptions.CHECKPOINTS_DIRECTORY,
checkpointDir);
+ config1.setString(CheckpointingOptions.SAVEPOINT_DIRECTORY,
savepointDir);
+
+ final Configuration config2 = new Configuration();
+ config2.setString(backendKey,
MemoryStateBackendFactory.class.getName());
+ config2.setString(CheckpointingOptions.CHECKPOINTS_DIRECTORY,
checkpointDir);
+ config2.setString(CheckpointingOptions.SAVEPOINT_DIRECTORY,
savepointDir);
+
+ MemoryStateBackend backend1 = (MemoryStateBackend)
+
StateBackendLoader.loadStateBackendFromConfig(config1, cl, null);
+ MemoryStateBackend backend2 = (MemoryStateBackend)
+
StateBackendLoader.loadStateBackendFromConfig(config2, cl, null);
+
+ assertNotNull(backend1);
+ assertNotNull(backend2);
+
+ assertEquals(expectedCheckpointPath,
backend1.getCheckpointPath());
+ assertEquals(expectedCheckpointPath,
backend2.getCheckpointPath());
+ assertEquals(expectedSavepointPath,
backend1.getSavepointPath());
+ assertEquals(expectedSavepointPath,
backend2.getSavepointPath());
+ }
+
+ /**
+ * Validates taking the application-defined memory state backend and
adding with additional
--- End diff --
nit: superfluous "with"
> Store Checkpoint Root Metadata in StateBackend (not in HA custom store)
> -----------------------------------------------------------------------
>
> Key: FLINK-5823
> URL: https://issues.apache.org/jira/browse/FLINK-5823
> Project: Flink
> Issue Type: Sub-task
> Components: State Backends, Checkpointing
> Reporter: Stephan Ewen
> Assignee: Stephan Ewen
> Priority: Blocker
> Fix For: 1.4.0
>
>
--
This message was sent by Atlassian JIRA
(v6.4.14#64029)