[FLINK-5822] [state backends] Make JobManager / Checkpoint Coordinator aware of 
the root state backend

This closes #3411


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/3446e66a
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/3446e66a
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/3446e66a

Branch: refs/heads/master
Commit: 3446e66aac63a3dfdaf8cfd4a73bd80a7f038379
Parents: 5b7f21d
Author: Stephan Ewen <[email protected]>
Authored: Fri Feb 17 17:51:00 2017 +0100
Committer: Stephan Ewen <[email protected]>
Committed: Tue Feb 28 19:02:13 2017 +0100

----------------------------------------------------------------------
 .../streaming/state/RocksDBStateBackend.java    |  12 +-
 .../state/RocksDBStateBackendFactory.java       |  19 +-
 .../jobmanager/JMXJobManagerMetricTest.java     |   2 +-
 .../CheckpointConfigHandlerTest.java            |   3 +
 .../checkpoint/CheckpointCoordinator.java       |  21 ++-
 .../runtime/executiongraph/ExecutionGraph.java  |   4 +-
 .../executiongraph/ExecutionGraphBuilder.java   |  32 +++-
 .../jobgraph/tasks/JobSnapshottingSettings.java |  15 +-
 .../runtime/state/AbstractStateBackend.java     | 173 ++++++++++++++++++-
 .../runtime/state/StateBackendFactory.java      |  16 +-
 .../state/filesystem/FsStateBackend.java        |  31 +++-
 .../state/filesystem/FsStateBackendFactory.java |  22 +--
 .../flink/runtime/state/heap/package-info.java  |  23 +++
 .../runtime/state/internal/package-info.java    |  52 ++++++
 .../state/memory/MemoryStateBackend.java        |   2 +-
 .../checkpoint/CheckpointStatsTrackerTest.java  |   1 +
 .../checkpoint/CoordinatorShutdownTest.java     |   5 +-
 ...ExecutionGraphCheckpointCoordinatorTest.java |   3 +-
 .../ArchivedExecutionGraphTest.java             |   3 +-
 .../tasks/JobSnapshottingSettingsTest.java      |   6 +
 .../jobmanager/JobManagerHARecoveryTest.java    |   1 +
 .../runtime/jobmanager/JobManagerTest.java      |   5 +
 .../flink/runtime/jobmanager/JobSubmitTest.java |   2 +-
 .../runtime/state/StateBackendLoadingTest.java  | 164 ++++++++++++++++++
 .../runtime/jobmanager/JobManagerITCase.scala   |   3 +
 .../api/graph/StreamGraphGenerator.java         |   2 +-
 .../api/graph/StreamingJobGraphGenerator.java   |   1 +
 .../streaming/runtime/tasks/StreamTask.java     |  72 ++------
 .../runtime/tasks/BlockingCheckpointsTest.java  |   2 +-
 .../streaming/runtime/tasks/StreamTaskTest.java |  56 +++---
 .../streaming/runtime/StateBackendITCase.java   |   2 +-
 31 files changed, 609 insertions(+), 146 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/3446e66a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
----------------------------------------------------------------------
diff --git 
a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
 
b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
index 3fd5d0f..dd0e2f7 100644
--- 
a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
+++ 
b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
@@ -29,10 +29,12 @@ import 
org.apache.flink.runtime.state.CheckpointStreamFactory;
 import org.apache.flink.runtime.state.KeyGroupRange;
 import org.apache.flink.runtime.state.filesystem.FsStateBackend;
 import org.apache.flink.util.AbstractID;
+
 import org.rocksdb.ColumnFamilyOptions;
 import org.rocksdb.DBOptions;
 import org.rocksdb.NativeLibraryLoader;
 import org.rocksdb.RocksDB;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -160,7 +162,7 @@ public class RocksDBStateBackend extends 
AbstractStateBackend {
 
        private void lazyInitializeForJob(
                        Environment env,
-                       String operatorIdentifier) throws Exception {
+                       String operatorIdentifier) throws IOException {
 
                if (isInitialized) {
                        return;
@@ -193,7 +195,7 @@ public class RocksDBStateBackend extends 
AbstractStateBackend {
                        }
 
                        if (dirs.isEmpty()) {
-                               throw new Exception("No local storage 
directories available. " + errorMessage);
+                               throw new IOException("No local storage 
directories available. " + errorMessage);
                        } else {
                                initializedDbBasePaths = dirs.toArray(new 
File[dirs.size()]);
                        }
@@ -235,7 +237,7 @@ public class RocksDBStateBackend extends 
AbstractStateBackend {
                        TypeSerializer<K> keySerializer,
                        int numberOfKeyGroups,
                        KeyGroupRange keyGroupRange,
-                       TaskKvStateRegistry kvStateRegistry) throws Exception {
+                       TaskKvStateRegistry kvStateRegistry) throws IOException 
{
 
                // first, make sure that the RocksDB JNI library is loaded
                // we do this explicitly here to have better error handling
@@ -437,7 +439,7 @@ public class RocksDBStateBackend extends 
AbstractStateBackend {
        //  static library loading utilities
        // 
------------------------------------------------------------------------
 
-       private void ensureRocksDBIsLoaded(String tempDirectory) throws 
Exception {
+       private void ensureRocksDBIsLoaded(String tempDirectory) throws 
IOException {
                synchronized (RocksDBStateBackend.class) {
                        if (!rocksDbInitialized) {
 
@@ -488,7 +490,7 @@ public class RocksDBStateBackend extends 
AbstractStateBackend {
                                        }
                                }
 
-                               throw new Exception("Could not load the native 
RocksDB library", lastException);
+                               throw new IOException("Could not load the 
native RocksDB library", lastException);
                        }
                }
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/3446e66a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendFactory.java
----------------------------------------------------------------------
diff --git 
a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendFactory.java
 
b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendFactory.java
index 5002272..bd9bcaa 100644
--- 
a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendFactory.java
+++ 
b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendFactory.java
@@ -15,24 +15,25 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.flink.contrib.streaming.state;
 
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.IllegalConfigurationException;
 import org.apache.flink.core.fs.Path;
-import org.apache.flink.runtime.state.AbstractStateBackend;
 import org.apache.flink.runtime.state.StateBackendFactory;
-import org.apache.flink.runtime.state.filesystem.FsStateBackend;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.File;
+import java.io.IOException;
 
 /**
  * A factory that creates an {@link 
org.apache.flink.contrib.streaming.state.RocksDBStateBackend}
  * from a configuration.
  */
-public class RocksDBStateBackendFactory implements 
StateBackendFactory<FsStateBackend> {
+public class RocksDBStateBackendFactory implements 
StateBackendFactory<RocksDBStateBackend> {
 
        protected static final Logger LOG = 
LoggerFactory.getLogger(RocksDBStateBackendFactory.class);
 
@@ -44,9 +45,11 @@ public class RocksDBStateBackendFactory implements 
StateBackendFactory<FsStateBa
        public static final String ROCKSDB_CHECKPOINT_DIRECTORY_URI_CONF_KEY = 
"state.backend.rocksdb.checkpointdir";
 
        @Override
-       public AbstractStateBackend createFromConfig(Configuration config) 
throws Exception {
-               String checkpointDirURI = 
config.getString(CHECKPOINT_DIRECTORY_URI_CONF_KEY, null);
-               String rocksdbLocalPath = 
config.getString(ROCKSDB_CHECKPOINT_DIRECTORY_URI_CONF_KEY, null);
+       public RocksDBStateBackend createFromConfig(Configuration config) 
+                       throws IllegalConfigurationException, IOException {
+
+               final String checkpointDirURI = 
config.getString(CHECKPOINT_DIRECTORY_URI_CONF_KEY, null);
+               final String rocksdbLocalPath = 
config.getString(ROCKSDB_CHECKPOINT_DIRECTORY_URI_CONF_KEY, null);
 
                if (checkpointDirURI == null) {
                        throw new IllegalConfigurationException(
@@ -67,8 +70,8 @@ public class RocksDBStateBackendFactory implements 
StateBackendFactory<FsStateBa
                        return backend;
                }
                catch (IllegalArgumentException e) {
-                       throw new Exception("Cannot initialize RocksDB State 
Backend with URI '"
-                                                                       + 
checkpointDirURI + '.', e);
+                       throw new IllegalConfigurationException(
+                                       "Cannot initialize RocksDB State 
Backend with URI '" + checkpointDirURI + '.', e);
                }
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/3446e66a/flink-metrics/flink-metrics-jmx/src/test/java/org/apache/flink/runtime/jobmanager/JMXJobManagerMetricTest.java
----------------------------------------------------------------------
diff --git 
a/flink-metrics/flink-metrics-jmx/src/test/java/org/apache/flink/runtime/jobmanager/JMXJobManagerMetricTest.java
 
b/flink-metrics/flink-metrics-jmx/src/test/java/org/apache/flink/runtime/jobmanager/JMXJobManagerMetricTest.java
index b3b7dfc..1fdac65 100644
--- 
a/flink-metrics/flink-metrics-jmx/src/test/java/org/apache/flink/runtime/jobmanager/JMXJobManagerMetricTest.java
+++ 
b/flink-metrics/flink-metrics-jmx/src/test/java/org/apache/flink/runtime/jobmanager/JMXJobManagerMetricTest.java
@@ -74,7 +74,7 @@ public class JMXJobManagerMetricTest {
                                Collections.<JobVertexID>emptyList(),
                                Collections.<JobVertexID>emptyList(),
                                Collections.<JobVertexID>emptyList(),
-                               500, 500, 50, 5, 
ExternalizedCheckpointSettings.none(), true));
+                               500, 500, 50, 5, 
ExternalizedCheckpointSettings.none(), null, true));
 
                        flink.waitForActorsToBeAlive();
 

http://git-wip-us.apache.org/repos/asf/flink/blob/3446e66a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointConfigHandlerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointConfigHandlerTest.java
 
b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointConfigHandlerTest.java
index e517c3c..95ced0a 100644
--- 
a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointConfigHandlerTest.java
+++ 
b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointConfigHandlerTest.java
@@ -56,6 +56,7 @@ public class CheckpointConfigHandlerTest {
                        minPause,
                        maxConcurrent,
                        externalized,
+                       null,
                        true);
 
                AccessExecutionGraph graph = mock(AccessExecutionGraph.class);
@@ -92,6 +93,7 @@ public class CheckpointConfigHandlerTest {
                        1212L,
                        12,
                        ExternalizedCheckpointSettings.none(),
+                       null,
                        false); // at least once
 
                AccessExecutionGraph graph = mock(AccessExecutionGraph.class);
@@ -122,6 +124,7 @@ public class CheckpointConfigHandlerTest {
                        1212L,
                        12,
                        externalizedSettings,
+                       null,
                        false); // at least once
 
                AccessExecutionGraph graph = mock(AccessExecutionGraph.class);

http://git-wip-us.apache.org/repos/asf/flink/blob/3446e66a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
index 6da6f7d..0592e3d 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
@@ -85,6 +85,12 @@ public class CheckpointCoordinator {
        /** The job whose checkpoint this coordinator coordinates */
        private final JobID job;
 
+       /** Default checkpoint properties **/
+       private final CheckpointProperties checkpointProperties;
+
+       /** The executor used for asynchronous calls, like potentially blocking 
I/O */
+       private final Executor executor;
+       
        /** Tasks who need to be sent a message when a checkpoint is started */
        private final ExecutionVertex[] tasksToTrigger;
 
@@ -101,7 +107,9 @@ public class CheckpointCoordinator {
         * accessing this don't block the job manager actor and run 
asynchronously. */
        private final CompletedCheckpointStore completedCheckpointStore;
 
-       /** Default directory for persistent checkpoints; <code>null</code> if 
none configured. */
+       /** Default directory for persistent checkpoints; <code>null</code> if 
none configured.
+        * THIS WILL BE REPLACED BY PROPER STATE-BACKEND METADATA WRITING */
+       @Nullable
        private final String checkpointDirectory;
 
        /** A list of recent checkpoint IDs, to identify late messages (vs 
invalid ones) */
@@ -154,11 +162,6 @@ public class CheckpointCoordinator {
        @Nullable
        private CheckpointStatsTracker statsTracker;
 
-       /** Default checkpoint properties **/
-       private final CheckpointProperties checkpointProperties;
-
-       private final Executor executor;
-
        // 
--------------------------------------------------------------------------------------------
 
        public CheckpointCoordinator(
@@ -173,7 +176,7 @@ public class CheckpointCoordinator {
                        ExecutionVertex[] tasksToCommitTo,
                        CheckpointIDCounter checkpointIDCounter,
                        CompletedCheckpointStore completedCheckpointStore,
-                       String checkpointDirectory,
+                       @Nullable String checkpointDirectory,
                        Executor executor) {
 
                // sanity checks
@@ -211,6 +214,8 @@ public class CheckpointCoordinator {
                this.checkpointIdCounter = checkNotNull(checkpointIDCounter);
                this.completedCheckpointStore = 
checkNotNull(completedCheckpointStore);
                this.checkpointDirectory = checkpointDirectory;
+               this.executor = checkNotNull(executor);
+
                this.recentPendingCheckpoints = new 
ArrayDeque<>(NUM_GHOST_CHECKPOINT_IDS);
 
                this.timer = new Timer("Checkpoint Timer", true);
@@ -229,8 +234,6 @@ public class CheckpointCoordinator {
                } catch (Throwable t) {
                        throw new RuntimeException("Failed to start checkpoint 
ID counter: " + t.getMessage(), t);
                }
-
-               this.executor = checkNotNull(executor);
        }
 
        /**

http://git-wip-us.apache.org/repos/asf/flink/blob/3446e66a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
index ad4347d..a76a421 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
@@ -61,6 +61,7 @@ import 
org.apache.flink.runtime.jobgraph.tasks.JobSnapshottingSettings;
 import org.apache.flink.runtime.jobmanager.scheduler.CoLocationGroup;
 import 
org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException;
 import org.apache.flink.runtime.query.KvStateLocationRegistry;
+import org.apache.flink.runtime.state.StateBackend;
 import org.apache.flink.runtime.taskmanager.TaskExecutionState;
 import org.apache.flink.runtime.util.SerializableObject;
 import org.apache.flink.runtime.util.SerializedThrowable;
@@ -348,7 +349,7 @@ public class ExecutionGraph implements 
AccessExecutionGraph, Archiveable<Archive
                return false;
        }
 
-       public void enableSnapshotCheckpointing(
+       public void enableCheckpointing(
                        long interval,
                        long checkpointTimeout,
                        long minPauseBetweenCheckpoints,
@@ -360,6 +361,7 @@ public class ExecutionGraph implements 
AccessExecutionGraph, Archiveable<Archive
                        CheckpointIDCounter checkpointIDCounter,
                        CompletedCheckpointStore checkpointStore,
                        String checkpointDir,
+                       StateBackend metadataStore,
                        CheckpointStatsTracker statsTracker) {
 
                // simple sanity checks

http://git-wip-us.apache.org/repos/asf/flink/blob/3446e66a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphBuilder.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphBuilder.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphBuilder.java
index c558e43..2a79302 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphBuilder.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphBuilder.java
@@ -23,6 +23,7 @@ import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.IllegalConfigurationException;
 import org.apache.flink.metrics.MetricGroup;
 import org.apache.flink.runtime.JobException;
 import org.apache.flink.runtime.checkpoint.CheckpointIDCounter;
@@ -37,6 +38,9 @@ import org.apache.flink.runtime.jobgraph.JobVertex;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.jobgraph.jsonplan.JsonPlanGenerator;
 import org.apache.flink.runtime.jobgraph.tasks.JobSnapshottingSettings;
+import org.apache.flink.runtime.state.AbstractStateBackend;
+import org.apache.flink.runtime.state.StateBackend;
+import org.apache.flink.util.DynamicCodeLoadingException;
 import org.slf4j.Logger;
 
 import javax.annotation.Nullable;
@@ -71,8 +75,8 @@ public class ExecutionGraphBuilder {
                        MetricGroup metrics,
                        int parallelismForAutoMax,
                        Logger log)
-               throws JobExecutionException, JobException
-       {
+               throws JobExecutionException, JobException {
+
                checkNotNull(jobGraph, "job graph cannot be null");
 
                final String jobName = jobGraph.getName();
@@ -191,7 +195,28 @@ public class ExecutionGraphBuilder {
                        String externalizedCheckpointsDir = 
jobManagerConfig.getString(
                                        
ConfigConstants.CHECKPOINTS_DIRECTORY_KEY, null);
 
-                       executionGraph.enableSnapshotCheckpointing(
+                       // load the state backend for checkpoint metadata.
+                       // if specified in the application, use from there, 
otherwise load from configuration
+                       final StateBackend metadataBackend;
+
+                       final StateBackend applicationConfiguredBackend = 
snapshotSettings.getDefaultStateBackend();
+                       if (applicationConfiguredBackend != null) {
+                               metadataBackend = applicationConfiguredBackend;
+
+                               log.info("Using application-defined state 
backend for checkpoint/savepoint metadata: {}.",
+                                               applicationConfiguredBackend);
+                       }
+                       else {
+                               try {
+                                       metadataBackend = AbstractStateBackend
+                                                       
.loadStateBackendFromConfigOrCreateDefault(jobManagerConfig, classLoader, log);
+                               }
+                               catch (IllegalConfigurationException | 
IOException | DynamicCodeLoadingException e) {
+                                       throw new JobExecutionException(jobId, 
"Could not instantiate configured state backend", e);
+                               }
+                       }
+
+                       executionGraph.enableCheckpointing(
                                        
snapshotSettings.getCheckpointInterval(),
                                        snapshotSettings.getCheckpointTimeout(),
                                        
snapshotSettings.getMinPauseBetweenCheckpoints(),
@@ -203,6 +228,7 @@ public class ExecutionGraphBuilder {
                                        checkpointIdCounter,
                                        completedCheckpoints,
                                        externalizedCheckpointsDir,
+                                       metadataBackend,
                                        checkpointStatsTracker);
                }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/3446e66a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/JobSnapshottingSettings.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/JobSnapshottingSettings.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/JobSnapshottingSettings.java
index 561ba89..233aa88 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/JobSnapshottingSettings.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/JobSnapshottingSettings.java
@@ -19,7 +19,9 @@
 package org.apache.flink.runtime.jobgraph.tasks;
 
 import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.state.StateBackend;
 
+import javax.annotation.Nullable;
 import java.util.List;
 
 import static java.util.Objects.requireNonNull;
@@ -50,6 +52,10 @@ public class JobSnapshottingSettings implements 
java.io.Serializable {
        /** Settings for externalized checkpoints. */
        private final ExternalizedCheckpointSettings 
externalizedCheckpointSettings;
 
+       /** The default state backend, if configured by the user in the job */
+       @Nullable
+       private final StateBackend defaultStateBackend;
+
        /**
         * Flag indicating whether exactly once checkpoint mode has been 
configured.
         * If <code>false</code>, at least once mode has been configured. This 
is
@@ -58,7 +64,7 @@ public class JobSnapshottingSettings implements 
java.io.Serializable {
         * UI.
         */
        private final boolean isExactlyOnce;
-       
+
        public JobSnapshottingSettings(
                        List<JobVertexID> verticesToTrigger,
                        List<JobVertexID> verticesToAcknowledge,
@@ -68,6 +74,7 @@ public class JobSnapshottingSettings implements 
java.io.Serializable {
                        long minPauseBetweenCheckpoints,
                        int maxConcurrentCheckpoints,
                        ExternalizedCheckpointSettings 
externalizedCheckpointSettings,
+                       @Nullable StateBackend defaultStateBackend,
                        boolean isExactlyOnce) {
 
                // sanity checks
@@ -84,6 +91,7 @@ public class JobSnapshottingSettings implements 
java.io.Serializable {
                this.minPauseBetweenCheckpoints = minPauseBetweenCheckpoints;
                this.maxConcurrentCheckpoints = maxConcurrentCheckpoints;
                this.externalizedCheckpointSettings = 
requireNonNull(externalizedCheckpointSettings);
+               this.defaultStateBackend = defaultStateBackend;
                this.isExactlyOnce = isExactlyOnce;
        }
        
@@ -121,6 +129,11 @@ public class JobSnapshottingSettings implements 
java.io.Serializable {
                return externalizedCheckpointSettings;
        }
 
+       @Nullable
+       public StateBackend getDefaultStateBackend() {
+               return defaultStateBackend;
+       }
+
        public boolean isExactlyOnce() {
                return isExactlyOnce;
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/3446e66a/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractStateBackend.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractStateBackend.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractStateBackend.java
index a335e45..2cf20a1 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractStateBackend.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractStateBackend.java
@@ -21,20 +21,50 @@ package org.apache.flink.runtime.state;
 import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.CoreOptions;
+import org.apache.flink.configuration.IllegalConfigurationException;
 import org.apache.flink.runtime.execution.Environment;
 import org.apache.flink.runtime.query.TaskKvStateRegistry;
+import org.apache.flink.runtime.state.filesystem.FsStateBackend;
+import org.apache.flink.runtime.state.filesystem.FsStateBackendFactory;
+import org.apache.flink.runtime.state.memory.MemoryStateBackend;
+import org.apache.flink.util.DynamicCodeLoadingException;
+
+import org.slf4j.Logger;
 
 import javax.annotation.Nullable;
 import java.io.IOException;
 
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
 /**
  * An abstract base implementation of the {@link StateBackend} interface.
+ * 
+ * <p>
  */
 @PublicEvolving
 public abstract class AbstractStateBackend implements StateBackend, 
java.io.Serializable {
 
        private static final long serialVersionUID = 4620415814639230247L;
 
+       // 
------------------------------------------------------------------------
+       //  Configuration shortcut names
+       // 
------------------------------------------------------------------------
+
+       /** The shortcut configuration name for the MemoryState backend that 
checkpoints to the JobManager */
+       public static final String MEMORY_STATE_BACKEND_NAME = "jobmanager";
+
+       /** The shortcut configuration name for the FileSystem State backend */ 
+       public static final String FS_STATE_BACKEND_NAME = "filesystem";
+
+       /** The shortcut configuration name for the RocksDB State Backend */
+       public static final String ROCKSDB_STATE_BACKEND_NAME = "rocksdb";
+
+       // 
------------------------------------------------------------------------
+       //  State Backend - Persisting Byte Storage
+       // 
------------------------------------------------------------------------
+
        @Override
        public abstract CheckpointStreamFactory createStreamFactory(
                        JobID jobId,
@@ -46,6 +76,10 @@ public abstract class AbstractStateBackend implements 
StateBackend, java.io.Seri
                        String operatorIdentifier,
                        @Nullable String targetLocation) throws IOException;
 
+       // 
------------------------------------------------------------------------
+       //  State Backend - State-Holding Backends
+       // 
------------------------------------------------------------------------
+
        @Override
        public abstract <K> AbstractKeyedStateBackend<K> 
createKeyedStateBackend(
                        Environment env,
@@ -54,7 +88,7 @@ public abstract class AbstractStateBackend implements 
StateBackend, java.io.Seri
                        TypeSerializer<K> keySerializer,
                        int numberOfKeyGroups,
                        KeyGroupRange keyGroupRange,
-                       TaskKvStateRegistry kvStateRegistry) throws Exception;
+                       TaskKvStateRegistry kvStateRegistry) throws IOException;
 
        @Override
        public OperatorStateBackend createOperatorStateBackend(
@@ -63,4 +97,141 @@ public abstract class AbstractStateBackend implements 
StateBackend, java.io.Seri
 
                return new 
DefaultOperatorStateBackend(env.getUserClassLoader());
        }
+
+       // 
------------------------------------------------------------------------
+       //  Loading the state backend from a configuration 
+       // 
------------------------------------------------------------------------
+
+       /**
+        * Loads the state backend from the configuration, from the parameter 
'state.backend', as defined
+        * in {@link CoreOptions#STATE_BACKEND}.
+        * 
+        * <p>The state backends can be specified either via their shortcut 
name, or via the class name
+        * of a {@link StateBackendFactory}. If a StateBackendFactory class 
name is specified, the factory
+        * is instantiated (via its zero-argument constructor) and its
+        * {@link StateBackendFactory#createFromConfig(Configuration)} method 
is called.
+        *
+        * <p>Recognized shortcut names are '{@value 
AbstractStateBackend#MEMORY_STATE_BACKEND_NAME}',
+        * '{@value AbstractStateBackend#FS_STATE_BACKEND_NAME}', and
+        * '{@value AbstractStateBackend#ROCKSDB_STATE_BACKEND_NAME}'.
+        * 
+        * @param config The configuration to load the state backend from
+        * @param classLoader The class loader that should be used to load the 
state backend
+        * @param logger Optionally, a logger to log actions to (may be null)
+        * 
+        * @return The instantiated state backend.
+        * 
+        * @throws DynamicCodeLoadingException
+        *             Thrown if a state backend factory is configured and the 
factory class was not
+        *             found or the factory could not be instantiated
+        * @throws IllegalConfigurationException
+        *             May be thrown by the StateBackendFactory when creating / 
configuring the state
+        *             backend in the factory
+        * @throws IOException
+        *             May be thrown by the StateBackendFactory when 
instantiating the state backend
+        */
+       public static StateBackend loadStateBackendFromConfig(
+                       Configuration config,
+                       ClassLoader classLoader,
+                       @Nullable Logger logger) throws 
IllegalConfigurationException, DynamicCodeLoadingException, IOException {
+
+               checkNotNull(config, "config");
+               checkNotNull(classLoader, "classLoader");
+
+               final String backendName = 
config.getString(CoreOptions.STATE_BACKEND);
+               if (backendName == null) {
+                       return null;
+               }
+
+               // by default the factory class is the backend name 
+               String factoryClassName = backendName;
+
+               switch (backendName.toLowerCase()) {
+                       case MEMORY_STATE_BACKEND_NAME:
+                               if (logger != null) {
+                                       logger.info("State backend is set to 
heap memory (checkpoint to JobManager)");
+                               }
+                               return new MemoryStateBackend();
+
+                       case FS_STATE_BACKEND_NAME:
+                               FsStateBackend fsBackend = new 
FsStateBackendFactory().createFromConfig(config);
+                               if (logger != null) {
+                                       logger.info("State backend is set to 
heap memory (checkpoints to filesystem \"{}\")",
+                                                       
fsBackend.getBasePath());
+                               }
+                               return fsBackend;
+
+                       case ROCKSDB_STATE_BACKEND_NAME:
+                               factoryClassName = 
"org.apache.flink.contrib.streaming.state.RocksDBStateBackendFactory";
+                               // fall through to the 'default' case that uses 
reflection to load the backend
+                               // that way we can keep RocksDB in a separate 
module
+
+                       default:
+                               if (logger != null) {
+                                       logger.info("Loading state backend via 
factory {}", factoryClassName);
+                               }
+
+                               StateBackendFactory<?> factory;
+                               try {
+                                       @SuppressWarnings("rawtypes")
+                                       Class<? extends StateBackendFactory> 
clazz = 
+                                                       
Class.forName(factoryClassName, false, classLoader)
+                                                                       
.asSubclass(StateBackendFactory.class);
+
+                                       factory = clazz.newInstance();
+                               }
+                               catch (ClassNotFoundException e) {
+                                       throw new DynamicCodeLoadingException(
+                                                       "Cannot find configured 
state backend factory class: " + backendName, e);
+                               }
+                               catch (ClassCastException | 
InstantiationException | IllegalAccessException e) {
+                                       throw new 
DynamicCodeLoadingException("The class configured under '" +
+                                                       
CoreOptions.STATE_BACKEND.key() + "' is not a valid state backend factory (" +
+                                                       backendName + ')', e);
+                               }
+                               
+                               return factory.createFromConfig(config);
+               }
+       }
+
+       /**
+        * Loads the state backend from the configuration, from the parameter 
'state.backend', as defined
+        * in {@link CoreOptions#STATE_BACKEND}. If no state backend is 
configures, this instantiates the
+        * default state backend (the {@link MemoryStateBackend}). 
+        *
+        * <p>Refer to {@link #loadStateBackendFromConfig(Configuration, 
ClassLoader, Logger)} for details on
+        * how the state backend is loaded from the configuration.
+        *
+        * @param config The configuration to load the state backend from
+        * @param classLoader The class loader that should be used to load the 
state backend
+        * @param logger Optionally, a logger to log actions to (may be null)
+        *
+        * @return The instantiated state backend.
+        *
+        * @throws DynamicCodeLoadingException
+        *             Thrown if a state backend factory is configured and the 
factory class was not
+        *             found or the factory could not be instantiated
+        * @throws IllegalConfigurationException
+        *             May be thrown by the StateBackendFactory when creating / 
configuring the state
+        *             backend in the factory
+        * @throws IOException
+        *             May be thrown by the StateBackendFactory when 
instantiating the state backend
+        */
+       public static StateBackend loadStateBackendFromConfigOrCreateDefault(
+                       Configuration config,
+                       ClassLoader classLoader,
+                       @Nullable Logger logger) throws 
IllegalConfigurationException, DynamicCodeLoadingException, IOException {
+
+               final StateBackend fromConfig = 
loadStateBackendFromConfig(config, classLoader, logger);
+
+               if (fromConfig != null) {
+                       return fromConfig;
+               }
+               else {
+                       if (logger != null) {
+                               logger.info("No state backend has been 
configured, using default state backend (Memory / JobManager)");
+                       }
+                       return new MemoryStateBackend();
+               }
+       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/3446e66a/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateBackendFactory.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateBackendFactory.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateBackendFactory.java
index 39e7ed2..78c976a 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateBackendFactory.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateBackendFactory.java
@@ -18,17 +18,24 @@
 
 package org.apache.flink.runtime.state;
 
+import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.IllegalConfigurationException;
 
+import java.io.IOException;
 import java.io.Serializable;
 
 /**
  * A factory to create a specific state backend. The state backend creation 
gets a Configuration
  * object that can be used to read further config values.
  * 
+ * <p>The state backend factory is typically specified in the configuration to 
produce a
+ * configured state backend.
+ * 
  * @param <T> The type of the state backend created.
  */
-public interface StateBackendFactory<T extends AbstractStateBackend> extends 
Serializable {
+@PublicEvolving
+public interface StateBackendFactory<T extends StateBackend> extends 
Serializable {
 
        /**
         * Creates the state backend, optionally using the given configuration.
@@ -36,7 +43,10 @@ public interface StateBackendFactory<T extends 
AbstractStateBackend> extends Ser
         * @param config The Flink configuration (loaded by the TaskManager).
         * @return The created state backend. 
         * 
-        * @throws Exception Exceptions during instantiation can be forwarded.
+        * @throws IllegalConfigurationException
+        *             If the configuration misses critical values, or 
specifies invalid values
+        * @throws IOException
+        *             If the state backend initialization failed due to an I/O 
exception
         */
-       AbstractStateBackend createFromConfig(Configuration config) throws 
Exception;
+       T createFromConfig(Configuration config) throws 
IllegalConfigurationException, IOException;
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/3446e66a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsStateBackend.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsStateBackend.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsStateBackend.java
index b614d98..5e8a15d 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsStateBackend.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsStateBackend.java
@@ -36,6 +36,8 @@ import java.io.IOException;
 import java.net.URI;
 import java.net.URISyntaxException;
 
+import static org.apache.flink.util.Preconditions.checkArgument;
+
 /**
  * The file state backend is a state backend that stores the state of 
streaming jobs in a file system.
  *
@@ -139,17 +141,14 @@ public class FsStateBackend extends AbstractStateBackend {
         *                             rather than in files
         * 
         * @throws IOException Thrown, if no file system can be found for the 
scheme in the URI.
+        * @throws IllegalArgumentException Thrown, if the {@code 
fileStateSizeThreshold} is out of bounds.
         */
        public FsStateBackend(URI checkpointDataUri, int 
fileStateSizeThreshold) throws IOException {
-               if (fileStateSizeThreshold < 0) {
-                       throw new IllegalArgumentException("The threshold for 
file state size must be zero or larger.");
-               }
-               if (fileStateSizeThreshold > MAX_FILE_STATE_THRESHOLD) {
-                       throw new IllegalArgumentException("The threshold for 
file state size cannot be larger than " +
-                               MAX_FILE_STATE_THRESHOLD);
-               }
+               checkArgument(fileStateSizeThreshold >= 0, "The threshold for 
file state size must be zero or larger.");
+               checkArgument(fileStateSizeThreshold <= 
MAX_FILE_STATE_THRESHOLD, 
+                               "The threshold for file state size cannot be 
larger than %s", MAX_FILE_STATE_THRESHOLD);
+
                this.fileStateThreshold = fileStateSizeThreshold;
-               
                this.basePath = validateAndNormalizeUri(checkpointDataUri);
        }
 
@@ -163,6 +162,19 @@ public class FsStateBackend extends AbstractStateBackend {
                return basePath;
        }
 
+       /**
+        * Gets the threshold below which state is stored as part of the 
metadata, rather than in files.
+        * This threshold ensures that the backend does not create a large 
amount of very small files,
+        * where potentially the file pointers are larger than the state itself.
+        * 
+        * <p>By default, this threshold is {@value 
#DEFAULT_FILE_STATE_THRESHOLD}.
+        * 
+        * @return The file size threshold, in bytes.
+        */
+       public int getMinFileSizeThreshold() {
+               return fileStateThreshold;
+       }
+
        // 
------------------------------------------------------------------------
        //  initialization and cleanup
        // 
------------------------------------------------------------------------
@@ -189,7 +201,8 @@ public class FsStateBackend extends AbstractStateBackend {
                        TypeSerializer<K> keySerializer,
                        int numberOfKeyGroups,
                        KeyGroupRange keyGroupRange,
-                       TaskKvStateRegistry kvStateRegistry) throws Exception {
+                       TaskKvStateRegistry kvStateRegistry) throws IOException 
{
+
                return new HeapKeyedStateBackend<>(
                                kvStateRegistry,
                                keySerializer,

http://git-wip-us.apache.org/repos/asf/flink/blob/3446e66a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsStateBackendFactory.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsStateBackendFactory.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsStateBackendFactory.java
index 042700c..4c933ef 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsStateBackendFactory.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsStateBackendFactory.java
@@ -23,6 +23,8 @@ import 
org.apache.flink.configuration.IllegalConfigurationException;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.runtime.state.StateBackendFactory;
 
+import java.io.IOException;
+
 /**
  * A factory that creates an {@link 
org.apache.flink.runtime.state.filesystem.FsStateBackend}
  * from a configuration.
@@ -35,28 +37,26 @@ public class FsStateBackendFactory implements 
StateBackendFactory<FsStateBackend
        /** The key under which the config stores the threshold for state to be 
store in memory,
         * rather than in files */
        public static final String MEMORY_THRESHOLD_CONF_KEY = 
"state.backend.fs.memory-threshold";
-       
-       
+
+
        @Override
-       public FsStateBackend createFromConfig(Configuration config) throws 
Exception {
-               String checkpointDirURI = 
config.getString(CHECKPOINT_DIRECTORY_URI_CONF_KEY, null);
-               int memoryThreshold = config.getInteger(
+       public FsStateBackend createFromConfig(Configuration config) throws 
IllegalConfigurationException {
+               final String checkpointDirURI = 
config.getString(CHECKPOINT_DIRECTORY_URI_CONF_KEY, null);
+               final int memoryThreshold = config.getInteger(
                        MEMORY_THRESHOLD_CONF_KEY, 
FsStateBackend.DEFAULT_FILE_STATE_THRESHOLD);
-               
+
                if (checkpointDirURI == null) {
                        throw new IllegalConfigurationException(
                                        "Cannot create the file system state 
backend: The configuration does not specify the " +
                                                        "checkpoint directory 
'" + CHECKPOINT_DIRECTORY_URI_CONF_KEY + '\'');
                }
-               
+
                try {
                        Path path = new Path(checkpointDirURI);
                        return new FsStateBackend(path.toUri(), 
memoryThreshold);
                }
-               catch (IllegalArgumentException e) {
-                       throw new Exception("Cannot initialize File System 
State Backend with URI '"
-                                       + checkpointDirURI + '.', e);
+               catch (IOException | IllegalArgumentException e) {
+                       throw new IllegalConfigurationException("Invalid 
configuration for the state backend", e);
                }
-               
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/3446e66a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/package-info.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/package-info.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/package-info.java
new file mode 100644
index 0000000..2f34ed8
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/package-info.java
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * This package contains the classes for key/value state backends that store 
the state
+ * on the JVM heap as objects.
+ */
+package org.apache.flink.runtime.state.heap;
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/3446e66a/flink-runtime/src/main/java/org/apache/flink/runtime/state/internal/package-info.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/internal/package-info.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/internal/package-info.java
new file mode 100644
index 0000000..fcc4df9
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/internal/package-info.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * This package holds the classes of the <b>internal state type hierarchy</b>.
+ *
+ * <p>The internal state classes give access to the namespace getters and 
setters and access to
+ * additional functionality, like raw value access or state merging.
+ *
+ * <p>The public API state hierarchy is intended to be programmed against by 
Flink applications.
+ * The internal state hierarchy holds all the auxiliary methods that are used 
by the runtime and not
+ * intended to be used by user applications. These internal methods are 
considered of limited use to users and
+ * only confusing, and are usually not regarded as stable across releases.
+ *
+ * <p>Each specific type in the internal state hierarchy extends the type from 
the public
+ * state hierarchy. The following illustrates the relationship between the 
public- and the internal
+ * hierarchy at the example of a subset of the classes:
+ *
+ * <pre>
+ *             State
+ *               |
+ *               +-------------------InternalKvState
+ *               |                         |
+ *          MergingState                   |
+ *               |                         |
+ *               +-----------------InternalMergingState
+ *               |                         |
+ *      +--------+------+                  |
+ *      |               |                  |
+ * ReducingState    ListState        +-----+-----------------+
+ *      |               |            |                       |
+ *      |               +-----------   -----------------InternalListState
+ *      |                            |
+ *      +------------------InternalReducingState
+ * </pre>
+ */
+package org.apache.flink.runtime.state.internal;
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/3446e66a/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemoryStateBackend.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemoryStateBackend.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemoryStateBackend.java
index 2cc1164..6e6b034 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemoryStateBackend.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemoryStateBackend.java
@@ -90,7 +90,7 @@ public class MemoryStateBackend extends AbstractStateBackend {
                        TypeSerializer<K> keySerializer,
                        int numberOfKeyGroups,
                        KeyGroupRange keyGroupRange,
-                       TaskKvStateRegistry kvStateRegistry) throws IOException 
{
+                       TaskKvStateRegistry kvStateRegistry) {
 
                return new HeapKeyedStateBackend<>(
                                kvStateRegistry,

http://git-wip-us.apache.org/repos/asf/flink/blob/3446e66a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStatsTrackerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStatsTrackerTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStatsTrackerTest.java
index 9a39182..7ab71cb 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStatsTrackerTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStatsTrackerTest.java
@@ -62,6 +62,7 @@ public class CheckpointStatsTrackerTest {
                        191929L,
                        123,
                        ExternalizedCheckpointSettings.none(),
+                       null,
                        false);
 
                CheckpointStatsTracker tracker = new CheckpointStatsTracker(

http://git-wip-us.apache.org/repos/asf/flink/blob/3446e66a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CoordinatorShutdownTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CoordinatorShutdownTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CoordinatorShutdownTest.java
index 7949ef0..976da48 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CoordinatorShutdownTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CoordinatorShutdownTest.java
@@ -32,7 +32,6 @@ import 
org.apache.flink.runtime.jobgraph.tasks.JobSnapshottingSettings;
 import org.apache.flink.runtime.messages.JobManagerMessages;
 import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
 import org.apache.flink.runtime.testingUtils.TestingUtils;
-import org.apache.flink.runtime.testtasks.NoOpInvokable;
 
 import org.junit.Test;
 
@@ -67,7 +66,7 @@ public class CoordinatorShutdownTest {
                        
                        JobGraph testGraph = new JobGraph("test job", vertex);
                        testGraph.setSnapshotSettings(new 
JobSnapshottingSettings(vertexIdList, vertexIdList, vertexIdList, 
-                                       5000, 60000, 0L, Integer.MAX_VALUE, 
ExternalizedCheckpointSettings.none(), true));
+                                       5000, 60000, 0L, Integer.MAX_VALUE, 
ExternalizedCheckpointSettings.none(), null, true));
                        
                        ActorGateway jmGateway = 
cluster.getLeaderGateway(TestingUtils.TESTING_DURATION());
 
@@ -126,7 +125,7 @@ public class CoordinatorShutdownTest {
 
                        JobGraph testGraph = new JobGraph("test job", vertex);
                        testGraph.setSnapshotSettings(new 
JobSnapshottingSettings(vertexIdList, vertexIdList, vertexIdList,
-                                       5000, 60000, 0L, Integer.MAX_VALUE, 
ExternalizedCheckpointSettings.none(), true));
+                                       5000, 60000, 0L, Integer.MAX_VALUE, 
ExternalizedCheckpointSettings.none(), null, true));
                        
                        ActorGateway jmGateway = 
cluster.getLeaderGateway(TestingUtils.TESTING_DURATION());
 

http://git-wip-us.apache.org/repos/asf/flink/blob/3446e66a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ExecutionGraphCheckpointCoordinatorTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ExecutionGraphCheckpointCoordinatorTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ExecutionGraphCheckpointCoordinatorTest.java
index 47e6826..8f565dd 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ExecutionGraphCheckpointCoordinatorTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ExecutionGraphCheckpointCoordinatorTest.java
@@ -106,7 +106,7 @@ public class ExecutionGraphCheckpointCoordinatorTest {
                        ClassLoader.getSystemClassLoader(),
                        new UnregisteredMetricsGroup());
 
-               executionGraph.enableSnapshotCheckpointing(
+               executionGraph.enableCheckpointing(
                                100,
                                100,
                                100,
@@ -118,6 +118,7 @@ public class ExecutionGraphCheckpointCoordinatorTest {
                                counter,
                                store,
                                null,
+                               null,
                                CheckpointStatsTrackerTest.createTestTracker());
 
                JobVertex jobVertex = new JobVertex("MockVertex");

http://git-wip-us.apache.org/repos/asf/flink/blob/3446e66a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraphTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraphTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraphTest.java
index 46ce3f4..3090172 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraphTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraphTest.java
@@ -112,7 +112,7 @@ public class ArchivedExecutionGraphTest {
                                mock(JobSnapshottingSettings.class),
                                new UnregisteredMetricsGroup());
 
-               runtimeGraph.enableSnapshotCheckpointing(
+               runtimeGraph.enableCheckpointing(
                        100,
                        100,
                        100,
@@ -124,6 +124,7 @@ public class ArchivedExecutionGraphTest {
                        new StandaloneCheckpointIDCounter(),
                        new StandaloneCompletedCheckpointStore(1),
                        null,
+                       null,
                        statsTracker);
 
                Map<String, Accumulator<?, ?>> userAccumulators = new 
HashMap<>();

http://git-wip-us.apache.org/repos/asf/flink/blob/3446e66a/flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/tasks/JobSnapshottingSettingsTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/tasks/JobSnapshottingSettingsTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/tasks/JobSnapshottingSettingsTest.java
index 667dbca..2508d5c 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/tasks/JobSnapshottingSettingsTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/tasks/JobSnapshottingSettingsTest.java
@@ -20,11 +20,14 @@ package org.apache.flink.runtime.jobgraph.tasks;
 
 import org.apache.flink.core.testutils.CommonTestUtils;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.state.memory.MemoryStateBackend;
 import org.junit.Test;
 
 import java.util.Arrays;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
 
 public class JobSnapshottingSettingsTest {
 
@@ -42,6 +45,7 @@ public class JobSnapshottingSettingsTest {
                        112,
                        12,
                        
ExternalizedCheckpointSettings.externalizeCheckpoints(true),
+                       new MemoryStateBackend(),
                        false);
 
                JobSnapshottingSettings copy = 
CommonTestUtils.createCopySerializable(settings);
@@ -55,5 +59,7 @@ public class JobSnapshottingSettingsTest {
                
assertEquals(settings.getExternalizedCheckpointSettings().externalizeCheckpoints(),
 copy.getExternalizedCheckpointSettings().externalizeCheckpoints());
                
assertEquals(settings.getExternalizedCheckpointSettings().deleteOnCancellation(),
 copy.getExternalizedCheckpointSettings().deleteOnCancellation());
                assertEquals(settings.isExactlyOnce(), copy.isExactlyOnce());
+               assertNotNull(copy.getDefaultStateBackend());
+               assertTrue(copy.getDefaultStateBackend().getClass() == 
MemoryStateBackend.class);
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/3446e66a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java
index cbb077c..115b06c 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java
@@ -225,6 +225,7 @@ public class JobManagerHARecoveryTest {
                                        0,
                                        1,
                                        ExternalizedCheckpointSettings.none(),
+                                       null,
                                        true));
 
                        
BlockingStatefulInvokable.initializeStaticHelpers(slots);

http://git-wip-us.apache.org/repos/asf/flink/blob/3446e66a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java
index c5f6d99..727fc65 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java
@@ -829,6 +829,7 @@ public class JobManagerTest {
                                        0,
                                        Integer.MAX_VALUE,
                                        ExternalizedCheckpointSettings.none(),
+                                       null,
                                        true);
 
                        jobGraph.setSnapshotSettings(snapshottingSettings);
@@ -954,6 +955,7 @@ public class JobManagerTest {
                                0,
                                Integer.MAX_VALUE,
                                ExternalizedCheckpointSettings.none(),
+                               null,
                                true);
 
                        jobGraph.setSnapshotSettings(snapshottingSettings);
@@ -1059,6 +1061,7 @@ public class JobManagerTest {
                                        0,
                                        Integer.MAX_VALUE,
                                        ExternalizedCheckpointSettings.none(),
+                                       null,
                                        true);
 
                        jobGraph.setSnapshotSettings(snapshottingSettings);
@@ -1161,6 +1164,7 @@ public class JobManagerTest {
                                        0,
                                        Integer.MAX_VALUE,
                                        ExternalizedCheckpointSettings.none(),
+                                       null,
                                        true);
 
                        jobGraph.setSnapshotSettings(snapshottingSettings);
@@ -1207,6 +1211,7 @@ public class JobManagerTest {
                                        0,
                                        Integer.MAX_VALUE,
                                        ExternalizedCheckpointSettings.none(),
+                                       null,
                                        true);
 
                        
newJobGraph.setSnapshotSettings(newSnapshottingSettings);

http://git-wip-us.apache.org/repos/asf/flink/blob/3446e66a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobSubmitTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobSubmitTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobSubmitTest.java
index feb3d4d..529c100 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobSubmitTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobSubmitTest.java
@@ -229,7 +229,7 @@ public class JobSubmitTest {
 
                JobGraph jg = new JobGraph("test job", jobVertex);
                jg.setSnapshotSettings(new 
JobSnapshottingSettings(vertexIdList, vertexIdList, vertexIdList,
-                       5000, 5000, 0L, 10, 
ExternalizedCheckpointSettings.none(), true));
+                       5000, 5000, 0L, 10, 
ExternalizedCheckpointSettings.none(), null, true));
                return jg;
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/3446e66a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendLoadingTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendLoadingTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendLoadingTest.java
new file mode 100644
index 0000000..a64faf1
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendLoadingTest.java
@@ -0,0 +1,164 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.CoreOptions;
+import org.apache.flink.configuration.IllegalConfigurationException;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.state.filesystem.FsStateBackend;
+import org.apache.flink.runtime.state.filesystem.FsStateBackendFactory;
+import org.apache.flink.runtime.state.memory.MemoryStateBackend;
+import org.apache.flink.util.DynamicCodeLoadingException;
+
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.IOException;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+/**
+ * This test validates that state backends are properly loaded from 
configuration.
+ */
+public class StateBackendLoadingTest {
+
+       @Rule
+       public final TemporaryFolder tmp = new TemporaryFolder();
+
+       private final ClassLoader cl = getClass().getClassLoader();
+
+       private final String backendKey = CoreOptions.STATE_BACKEND.key();
+
+       // 
------------------------------------------------------------------------
+
+       @Test
+       public void testNoStateBackendDefined() throws Exception {
+               assertNull(AbstractStateBackend.loadStateBackendFromConfig(new 
Configuration(), cl, null));
+       }
+
+       @Test
+       public void testInstantiateMemoryBackendByDefault() throws Exception {
+               StateBackend backend = AbstractStateBackend
+                               .loadStateBackendFromConfigOrCreateDefault(new 
Configuration(), cl, null);
+
+               assertTrue(backend instanceof MemoryStateBackend);
+       }
+
+       @Test
+       public void testLoadMemoryStateBackend() throws Exception {
+               // we configure with the explicit string (rather than 
AbstractStateBackend#X_STATE_BACKEND_NAME)
+               // to guard against config-breaking changes of the name 
+               final Configuration config = new Configuration();
+               config.setString(backendKey, "jobmanager");
+
+               StateBackend backend = AbstractStateBackend
+                               .loadStateBackendFromConfigOrCreateDefault(new 
Configuration(), cl, null);
+
+               assertTrue(backend instanceof MemoryStateBackend);
+       }
+
+       @Test
+       public void testLoadFileSystemStateBackend() throws Exception {
+               final String checkpointDir = new 
Path(tmp.getRoot().toURI()).toString();
+               final Path expectedPath = new Path(checkpointDir);
+               final int threshold = 1000000;
+
+               // we configure with the explicit string (rather than 
AbstractStateBackend#X_STATE_BACKEND_NAME)
+               // to guard against config-breaking changes of the name 
+               final Configuration config1 = new Configuration();
+               config1.setString(backendKey, "filesystem");
+               config1.setString("state.checkpoints.dir", checkpointDir);
+               config1.setString("state.backend.fs.checkpointdir", 
checkpointDir);
+               config1.setInteger("state.backend.fs.memory-threshold", 
threshold);
+
+               final Configuration config2 = new Configuration();
+               config2.setString(backendKey, 
FsStateBackendFactory.class.getName());
+               config2.setString("state.checkpoints.dir", checkpointDir);
+               config2.setString("state.backend.fs.checkpointdir", 
checkpointDir);
+               config2.setInteger("state.backend.fs.memory-threshold", 
threshold);
+
+               StateBackend backend1 = AbstractStateBackend
+                               
.loadStateBackendFromConfigOrCreateDefault(config1, cl, null);
+
+               StateBackend backend2 = AbstractStateBackend
+                               
.loadStateBackendFromConfigOrCreateDefault(config2, cl, null);
+
+               assertTrue(backend1 instanceof FsStateBackend);
+               assertTrue(backend2 instanceof FsStateBackend);
+
+               FsStateBackend fs1 = (FsStateBackend) backend1;
+               FsStateBackend fs2 = (FsStateBackend) backend2;
+
+               assertEquals(expectedPath, fs1.getBasePath());
+               assertEquals(expectedPath, fs2.getBasePath());
+               assertEquals(threshold, fs1.getMinFileSizeThreshold());
+               assertEquals(threshold, fs2.getMinFileSizeThreshold());
+       }
+
+       /**
+        * This test makes sure that failures properly manifest when the state 
backend could not be loaded.
+        */
+       @Test
+       public void testLoadingFails() throws Exception {
+               final Configuration config = new Configuration();
+
+               // try a value that is neither recognized as a name, nor 
corresponds to a class
+               config.setString(backendKey, "does.not.exist");
+               try {
+                       
AbstractStateBackend.loadStateBackendFromConfigOrCreateDefault(config, cl, 
null);
+                       fail("should fail with an exception");
+               } catch (DynamicCodeLoadingException ignored) {
+                       // expected
+               }
+
+               // try a class that is not a factory
+               config.setString(backendKey, java.io.File.class.getName());
+               try {
+                       
AbstractStateBackend.loadStateBackendFromConfigOrCreateDefault(config, cl, 
null);
+                       fail("should fail with an exception");
+               } catch (DynamicCodeLoadingException ignored) {
+                       // expected
+               }
+
+               // a factory that fails
+               config.setString(backendKey, FailingFactory.class.getName());
+               try {
+                       
AbstractStateBackend.loadStateBackendFromConfigOrCreateDefault(config, cl, 
null);
+                       fail("should fail with an exception");
+               } catch (IOException ignored) {
+                       // expected
+               }
+       }
+
+       // 
------------------------------------------------------------------------
+
+       static final class FailingFactory implements 
StateBackendFactory<StateBackend> {
+               private static final long serialVersionUID = 1L;
+
+               @Override
+               public StateBackend createFromConfig(Configuration config) 
throws IllegalConfigurationException, IOException {
+                       throw new IOException("fail!");
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/3446e66a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerITCase.scala
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerITCase.scala
 
b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerITCase.scala
index 75f1fd4..31e72dd 100644
--- 
a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerITCase.scala
+++ 
b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerITCase.scala
@@ -822,6 +822,7 @@ class JobManagerITCase(_system: ActorSystem)
             60000,
             1,
             ExternalizedCheckpointSettings.none,
+            null,
             true))
 
           // Submit job...
@@ -881,6 +882,7 @@ class JobManagerITCase(_system: ActorSystem)
             60000,
             1,
             ExternalizedCheckpointSettings.none,
+            null,
             true))
 
           // Submit job...
@@ -948,6 +950,7 @@ class JobManagerITCase(_system: ActorSystem)
             60000,
             1,
             ExternalizedCheckpointSettings.none,
+            null,
             true))
 
           // Submit job...

http://git-wip-us.apache.org/repos/asf/flink/blob/3446e66a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java
index f55ff47..bd018c3 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java
@@ -82,7 +82,7 @@ public class StreamGraphGenerator {
        public static final int UPPER_BOUND_MAX_PARALLELISM = 
KeyGroupRangeAssignment.UPPER_BOUND_MAX_PARALLELISM;
 
        // The StreamGraph that is being built, this is initialized at the 
beginning.
-       private StreamGraph streamGraph;
+       private final StreamGraph streamGraph;
 
        private final StreamExecutionEnvironment env;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/3446e66a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
index a4bb165..003eff9 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
@@ -539,6 +539,7 @@ public class StreamingJobGraphGenerator {
                                cfg.getCheckpointTimeout(), 
cfg.getMinPauseBetweenCheckpoints(),
                                cfg.getMaxConcurrentCheckpoints(),
                                externalizedCheckpointSettings,
+                               streamGraph.getStateBackend(),
                                isExactlyOnce);
 
                jobGraph.setSnapshotSettings(settings);

http://git-wip-us.apache.org/repos/asf/flink/blob/3446e66a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
index 938ffd2..1e208ee 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
@@ -20,9 +20,6 @@ package org.apache.flink.streaming.runtime.tasks;
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.common.accumulators.Accumulator;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.CoreOptions;
-import org.apache.flink.configuration.IllegalConfigurationException;
 import org.apache.flink.core.fs.CloseableRegistry;
 import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
 import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
@@ -43,13 +40,10 @@ import org.apache.flink.runtime.state.KeyGroupRange;
 import org.apache.flink.runtime.state.KeyGroupsStateHandle;
 import org.apache.flink.runtime.state.OperatorStateBackend;
 import org.apache.flink.runtime.state.OperatorStateHandle;
-import org.apache.flink.runtime.state.StateBackendFactory;
+import org.apache.flink.runtime.state.StateBackend;
 import org.apache.flink.runtime.state.StateUtil;
 import org.apache.flink.runtime.state.StreamStateHandle;
 import org.apache.flink.runtime.state.TaskStateHandles;
-import org.apache.flink.runtime.state.filesystem.FsStateBackend;
-import org.apache.flink.runtime.state.filesystem.FsStateBackendFactory;
-import org.apache.flink.runtime.state.memory.MemoryStateBackend;
 import org.apache.flink.runtime.taskmanager.DispatcherThreadFactory;
 import org.apache.flink.streaming.api.TimeCharacteristic;
 import org.apache.flink.streaming.api.graph.StreamConfig;
@@ -63,6 +57,7 @@ import org.apache.flink.util.CollectionUtil;
 import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.FutureUtil;
 import org.apache.flink.util.Preconditions;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -147,7 +142,7 @@ public abstract class StreamTask<OUT, OP extends 
StreamOperator<OUT>>
        private StreamConfig configuration;
 
        /** Our state backend. We use this to create checkpoint streams and a 
keyed state backend. */
-       private AbstractStateBackend stateBackend;
+       private StateBackend stateBackend;
 
        /** Keyed state backend for the head operator, if it is keyed. There 
can only ever be one. */
        private AbstractKeyedStateBackend<?> keyedStateBackend;
@@ -713,61 +708,20 @@ public abstract class StreamTask<OUT, OP extends 
StreamOperator<OUT>>
        //  State backend
        // 
------------------------------------------------------------------------
 
-       private AbstractStateBackend createStateBackend() throws Exception {
-               AbstractStateBackend stateBackend = 
configuration.getStateBackend(getUserCodeClassLoader());
+       private StateBackend createStateBackend() throws Exception {
+               final StateBackend fromJob = 
configuration.getStateBackend(getUserCodeClassLoader());
 
-               if (stateBackend != null) {
+               if (fromJob != null) {
                        // backend has been configured on the environment
                        LOG.info("Using user-defined state backend: {}.", 
stateBackend);
-               } else {
-                       // see if we have a backend specified in the 
configuration
-                       Configuration flinkConfig = 
getEnvironment().getTaskManagerInfo().getConfiguration();
-                       String backendName = 
flinkConfig.getString(CoreOptions.STATE_BACKEND, null);
-
-                       if (backendName == null) {
-                               LOG.warn("No state backend has been specified, 
using default state backend (Memory / JobManager)");
-                               backendName = "jobmanager";
-                       }
-
-                       switch (backendName.toLowerCase()) {
-                               case "jobmanager":
-                                       LOG.info("State backend is set to heap 
memory (checkpoint to jobmanager)");
-                                       stateBackend = new MemoryStateBackend();
-                                       break;
-
-                               case "filesystem":
-                                       FsStateBackend backend = new 
FsStateBackendFactory().createFromConfig(flinkConfig);
-                                       LOG.info("State backend is set to heap 
memory (checkpoints to filesystem \"{}\")",
-                                               backend.getBasePath());
-                                       stateBackend = backend;
-                                       break;
-
-                               case "rocksdb":
-                                       backendName = 
"org.apache.flink.contrib.streaming.state.RocksDBStateBackendFactory";
-                                       // fall through to the 'default' case 
that uses reflection to load the backend
-                                       // that way we can keep RocksDB in a 
separate module
-
-                               default:
-                                       try {
-                                               @SuppressWarnings("rawtypes")
-                                               Class<? extends 
StateBackendFactory> clazz =
-                                                               
Class.forName(backendName, false, getUserCodeClassLoader()).
-                                                                               
asSubclass(StateBackendFactory.class);
-
-                                               stateBackend = 
clazz.newInstance().createFromConfig(flinkConfig);
-                                       } catch (ClassNotFoundException e) {
-                                               throw new 
IllegalConfigurationException("Cannot find configured state backend: " + 
backendName);
-                                       } catch (ClassCastException e) {
-                                               throw new 
IllegalConfigurationException("The class configured under '" +
-                                                               
CoreOptions.STATE_BACKEND.key() + "' is not a valid state backend factory (" +
-                                                               backendName + 
')');
-                                       } catch (Throwable t) {
-                                               throw new 
IllegalConfigurationException("Cannot create configured state backend", t);
-                                       }
-                       }
+                       return fromJob;
+               }
+               else {
+                       return 
AbstractStateBackend.loadStateBackendFromConfigOrCreateDefault(
+                                       
getEnvironment().getTaskManagerInfo().getConfiguration(),
+                                       getUserCodeClassLoader(),
+                                       LOG);
                }
-
-               return stateBackend;
        }
 
        public OperatorStateBackend createOperatorStateBackend(

http://git-wip-us.apache.org/repos/asf/flink/blob/3446e66a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/BlockingCheckpointsTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/BlockingCheckpointsTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/BlockingCheckpointsTest.java
index 51294ce..e266ea1 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/BlockingCheckpointsTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/BlockingCheckpointsTest.java
@@ -183,7 +183,7 @@ public class BlockingCheckpointsTest {
                public <K> AbstractKeyedStateBackend<K> createKeyedStateBackend(
                                Environment env, JobID jobID, String 
operatorIdentifier,
                                TypeSerializer<K> keySerializer, int 
numberOfKeyGroups,
-                               KeyGroupRange keyGroupRange, 
TaskKvStateRegistry kvStateRegistry) throws Exception {
+                               KeyGroupRange keyGroupRange, 
TaskKvStateRegistry kvStateRegistry) {
 
                        throw new UnsupportedOperationException();
                }

http://git-wip-us.apache.org/repos/asf/flink/blob/3446e66a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
index 3d01fdd..3826051 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
@@ -807,33 +807,39 @@ public class StreamTaskTest extends TestLogger {
                private static final long serialVersionUID = 1L;
 
                @Override
-               public AbstractStateBackend createFromConfig(Configuration 
config) throws Exception {
+               public AbstractStateBackend createFromConfig(Configuration 
config) {
                        AbstractStateBackend stateBackendMock = 
mock(AbstractStateBackend.class);
 
-                       
Mockito.when(stateBackendMock.createOperatorStateBackend(
-                                       Mockito.any(Environment.class),
-                                       Mockito.any(String.class)))
-                               .thenAnswer(new Answer<OperatorStateBackend>() {
-                                       @Override
-                                       public OperatorStateBackend 
answer(InvocationOnMock invocationOnMock) throws Throwable {
-                                               return 
Mockito.mock(OperatorStateBackend.class);
-                                       }
-                               });
-
-                       Mockito.when(stateBackendMock.createKeyedStateBackend(
-                                       Mockito.any(Environment.class),
-                                       Mockito.any(JobID.class),
-                                       Mockito.any(String.class),
-                                       Mockito.any(TypeSerializer.class),
-                                       Mockito.any(int.class),
-                                       Mockito.any(KeyGroupRange.class),
-                                       Mockito.any(TaskKvStateRegistry.class)))
-                               .thenAnswer(new 
Answer<AbstractKeyedStateBackend>() {
-                                       @Override
-                                       public AbstractKeyedStateBackend 
answer(InvocationOnMock invocationOnMock) throws Throwable {
-                                               return 
Mockito.mock(AbstractKeyedStateBackend.class);
-                                       }
-                               });
+                       try {
+                               
Mockito.when(stateBackendMock.createOperatorStateBackend(
+                                               Mockito.any(Environment.class),
+                                               Mockito.any(String.class)))
+                                       .thenAnswer(new 
Answer<OperatorStateBackend>() {
+                                               @Override
+                                               public OperatorStateBackend 
answer(InvocationOnMock invocationOnMock) throws Throwable {
+                                                       return 
Mockito.mock(OperatorStateBackend.class);
+                                               }
+                                       });
+       
+                               
Mockito.when(stateBackendMock.createKeyedStateBackend(
+                                               Mockito.any(Environment.class),
+                                               Mockito.any(JobID.class),
+                                               Mockito.any(String.class),
+                                               
Mockito.any(TypeSerializer.class),
+                                               Mockito.any(int.class),
+                                               
Mockito.any(KeyGroupRange.class),
+                                               
Mockito.any(TaskKvStateRegistry.class)))
+                                       .thenAnswer(new 
Answer<AbstractKeyedStateBackend>() {
+                                               @Override
+                                               public 
AbstractKeyedStateBackend answer(InvocationOnMock invocationOnMock) throws 
Throwable {
+                                                       return 
Mockito.mock(AbstractKeyedStateBackend.class);
+                                               }
+                                       });
+                       }
+                       catch (Exception e) {
+                               // this is needed, because the signatures of 
the mocked methods throw 'Exception'
+                               throw new RuntimeException(e);
+                       }
 
                        return stateBackendMock;
                }

http://git-wip-us.apache.org/repos/asf/flink/blob/3446e66a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/StateBackendITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/StateBackendITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/StateBackendITCase.java
index 79665dd..4677242 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/StateBackendITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/StateBackendITCase.java
@@ -109,7 +109,7 @@ public class StateBackendITCase extends 
StreamingMultipleProgramsTestBase {
                                TypeSerializer<K> keySerializer,
                                int numberOfKeyGroups,
                                KeyGroupRange keyGroupRange,
-                               TaskKvStateRegistry kvStateRegistry) throws 
Exception {
+                               TaskKvStateRegistry kvStateRegistry) throws 
IOException {
                        throw new SuccessException();
                }
        }

Reply via email to