http://git-wip-us.apache.org/repos/asf/flink/blob/a6890b28/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateHandleProviderFactory.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateHandleProviderFactory.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateHandleProviderFactory.java
deleted file mode 100644
index 0086ac6..0000000
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateHandleProviderFactory.java
+++ /dev/null
@@ -1,61 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.state;
-
-import org.apache.flink.configuration.ConfigConstants;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.IllegalConfigurationException;
-
-import java.io.Serializable;
-
-/**
- * State handler provider factory.
- *
- * <p>This is going to be superseded soon.
- */
-public class StateHandleProviderFactory {
-
-       /**
-        * Creates a {@link 
org.apache.flink.runtime.state.FileStateHandle.FileStateHandleProvider} at
-        * the configured recovery path.
-        */
-       public static <T extends Serializable> StateHandleProvider<T> 
createRecoveryFileStateHandleProvider(
-                       Configuration config) {
-
-               StateBackend stateBackend = StateBackend.fromConfig(config);
-
-               if (stateBackend == StateBackend.FILESYSTEM) {
-                       String recoveryPath = config.getString(
-                                       
ConfigConstants.STATE_BACKEND_FS_RECOVERY_PATH, "");
-
-                       if (recoveryPath.equals("")) {
-                               throw new 
IllegalConfigurationException("Missing recovery path. Specify via " +
-                                               "configuration key '" + 
ConfigConstants.STATE_BACKEND_FS_RECOVERY_PATH + "'.");
-                       }
-                       else {
-                               return 
FileStateHandle.createProvider(recoveryPath);
-                       }
-               }
-               else {
-                       throw new IllegalConfigurationException("Unexpected 
state backend configuration " +
-                                       stateBackend);
-               }
-       }
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/a6890b28/flink-runtime/src/main/java/org/apache/flink/runtime/state/StreamStateHandle.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/StreamStateHandle.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/StreamStateHandle.java
new file mode 100644
index 0000000..32c601e
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/StreamStateHandle.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state;
+
+import org.apache.flink.runtime.state.StateHandle;
+
+import java.io.InputStream;
+
+/**
+ * A state handle that produces an input stream when resolved.
+ */
+public interface StreamStateHandle extends StateHandle<InputStream> {}

http://git-wip-us.apache.org/repos/asf/flink/blob/a6890b28/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/AbstractFileState.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/AbstractFileState.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/AbstractFileState.java
new file mode 100644
index 0000000..d64e2c4
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/AbstractFileState.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.filesystem;
+
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+
+import java.io.IOException;
+
+/**
+ * Base class for state that is stored in a file.
+ */
+public abstract class AbstractFileState implements java.io.Serializable {
+       
+       private static final long serialVersionUID = 350284443258002355L;
+       
+       /** The path to the file in the filesystem, fully describing the file 
system */
+       private final Path filePath;
+
+       /** Cached file system handle */
+       private transient FileSystem fs;
+
+       /**
+        * Creates a new file state for the given file path.
+        * 
+        * @param filePath The path to the file that stores the state.
+        */
+       protected AbstractFileState(Path filePath) {
+               this.filePath = filePath;
+       }
+
+       /**
+        * Gets the path where this handle's state is stored.
+        * @return The path where this handle's state is stored.
+        */
+       public Path getFilePath() {
+               return filePath;
+       }
+
+       /**
+        * Discard the state by deleting the file that stores the state. If the 
parent directory
+        * of the state is empty after deleting the state file, it is also 
deleted.
+        * 
+        * @throws Exception Thrown, if the file deletion (not the directory 
deletion) fails.
+        */
+       public void discardState() throws Exception {
+               getFileSystem().delete(filePath, false);
+
+               // send a call to delete the directory containing the file. 
this will
+               // fail (and be ignored) when some files still exist
+               try {
+                       getFileSystem().delete(filePath.getParent(), false);
+               } catch (IOException ignored) {}
+       }
+
+       /**
+        * Gets the file system that stores the file state.
+        * @return The file system that stores the file state.
+        * @throws IOException Thrown if the file system cannot be accessed.
+        */
+       protected FileSystem getFileSystem() throws IOException {
+               if (fs == null) {
+                       fs = FileSystem.get(filePath.toUri());
+               }
+               return fs;
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/a6890b28/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FileSerializableStateHandle.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FileSerializableStateHandle.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FileSerializableStateHandle.java
new file mode 100644
index 0000000..b7e7cd1
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FileSerializableStateHandle.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.filesystem;
+
+import org.apache.flink.core.fs.FSDataInputStream;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.state.StateHandle;
+import org.apache.flink.util.InstantiationUtil;
+
+import java.io.ObjectInputStream;
+
+/**
+ * A state handle that points to state stored in a file via Java Serialization.
+ * 
+ * @param <T> The type of state pointed to by the state handle.
+ */
+public class FileSerializableStateHandle<T> extends AbstractFileState 
implements StateHandle<T> {
+
+       private static final long serialVersionUID = -657631394290213622L;
+       
+       /**
+        * Creates a new FileSerializableStateHandle pointing to state at the 
given file path.
+        * 
+        * @param filePath The path to the file containing the checkpointed 
state.
+        */
+       public FileSerializableStateHandle(Path filePath) {
+               super(filePath);
+       }
+
+       @Override
+       @SuppressWarnings("unchecked")
+       public T getState(ClassLoader classLoader) throws Exception {
+               FSDataInputStream inStream = 
getFileSystem().open(getFilePath());
+               ObjectInputStream ois = new 
InstantiationUtil.ClassLoaderObjectInputStream(inStream, classLoader);
+               return (T) ois.readObject();
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/a6890b28/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FileStreamStateHandle.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FileStreamStateHandle.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FileStreamStateHandle.java
new file mode 100644
index 0000000..f4681ea
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FileStreamStateHandle.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.filesystem;
+
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.state.StreamStateHandle;
+
+import java.io.InputStream;
+
+/**
+ * A state handle that points to state in a file system, accessible as an 
input stream.
+ */
+public class FileStreamStateHandle extends AbstractFileState implements 
StreamStateHandle {
+       
+       private static final long serialVersionUID = -6826990484549987311L;
+
+       /**
+        * Creates a new FileStreamStateHandle pointing to state at the given 
file path.
+        * 
+        * @param filePath The path to the file containing the checkpointed 
state.
+        */
+       public FileStreamStateHandle(Path filePath) {
+               super(filePath);
+       }
+
+       @Override
+       public InputStream getState(ClassLoader userCodeClassLoader) throws 
Exception {
+               return getFileSystem().open(getFilePath());
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/a6890b28/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsHeapKvState.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsHeapKvState.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsHeapKvState.java
new file mode 100644
index 0000000..e3116dd
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsHeapKvState.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.filesystem;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.core.memory.OutputViewDataOutputStreamWrapper;
+import org.apache.flink.runtime.state.AbstractHeapKvState;
+
+import java.io.DataOutputStream;
+import java.util.HashMap;
+
+/**
+ * Heap-backed key/value state that is snapshotted into files.
+ * 
+ * @param <K> The type of the key.
+ * @param <V> The type of the value.
+ */
+public class FsHeapKvState<K, V> extends AbstractHeapKvState<K, V, 
FsStateBackend> {
+       
+       /** The file system state backend backing snapshots of this state */
+       private final FsStateBackend backend;
+       
+       /**
+        * Creates a new and empty key/value state.
+        * 
+        * @param keySerializer The serializer for the key.
+        * @param valueSerializer The serializer for the value.
+        * @param defaultValue The value that is returned when no other value 
has been associated with a key, yet.
+        * @param backend The file system state backend backing snapshots of 
this state
+        */
+       public FsHeapKvState(TypeSerializer<K> keySerializer, TypeSerializer<V> 
valueSerializer,
+                                                       V defaultValue, 
FsStateBackend backend) {
+               super(keySerializer, valueSerializer, defaultValue);
+               this.backend = backend;
+       }
+
+       /**
+        * Creates a new key/value state with the given state contents.
+        * This method is used to re-create key/value state with existing data, 
for example from
+        * a snapshot.
+        * 
+        * @param keySerializer The serializer for the key.
+        * @param valueSerializer The serializer for the value.
+        * @param defaultValue The value that is returned when no other value 
has been associated with a key, yet.
+        * @param state The map of key/value pairs to initialize the state with.
+        * @param backend The file system state backend backing snapshots of 
this state
+        */
+       public FsHeapKvState(TypeSerializer<K> keySerializer, TypeSerializer<V> 
valueSerializer,
+                                                       V defaultValue, 
HashMap<K, V> state, FsStateBackend backend) {
+               super(keySerializer, valueSerializer, defaultValue, state);
+               this.backend = backend;
+       }
+
+       
+       @Override
+       public FsHeapKvStateSnapshot<K, V> shapshot(long checkpointId, long 
timestamp) throws Exception {
+               // first, create an output stream to write to
+               try (FsStateBackend.FsCheckpointStateOutputStream out = 
+                                       
backend.createCheckpointStateOutputStream(checkpointId, timestamp)) {
+
+                       // serialize the state to the output stream
+                       OutputViewDataOutputStreamWrapper outView = 
+                                       new 
OutputViewDataOutputStreamWrapper(new DataOutputStream(out));
+                       outView.writeInt(size());
+                       writeStateToOutputView(outView);
+                       outView.flush();
+                       
+                       // create a handle to the state
+                       return new FsHeapKvStateSnapshot<>(getKeySerializer(), 
getValueSerializer(), out.closeAndGetPath());
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/a6890b28/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsHeapKvStateSnapshot.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsHeapKvStateSnapshot.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsHeapKvStateSnapshot.java
new file mode 100644
index 0000000..781ee3d
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsHeapKvStateSnapshot.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.filesystem;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.core.fs.FSDataInputStream;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.core.memory.InputViewDataInputStreamWrapper;
+import org.apache.flink.runtime.state.KvStateSnapshot;
+
+import java.io.DataInputStream;
+import java.util.HashMap;
+
+/**
+ * A snapshot of a heap key/value state stored in a file.
+ * 
+ * @param <K> The type of the key in the snapshot state.
+ * @param <V> The type of the value in the snapshot state.
+ */
+public class FsHeapKvStateSnapshot<K, V> extends AbstractFileState implements 
KvStateSnapshot<K, V, FsStateBackend> {
+       
+       private static final long serialVersionUID = 1L;
+
+       /** Name of the key serializer class */
+       private final String keySerializerClassName;
+
+       /** Name of the value serializer class */
+       private final String valueSerializerClassName;
+
+       /**
+        * Creates a new state snapshot with data in the file system.
+        *
+        * @param keySerializer The serializer for the keys.
+        * @param valueSerializer The serializer for the values.
+        * @param filePath The path where the snapshot data is stored.
+        */
+       public FsHeapKvStateSnapshot(TypeSerializer<K> keySerializer, 
TypeSerializer<V> valueSerializer, Path filePath) {
+               super(filePath);
+               this.keySerializerClassName = 
keySerializer.getClass().getName();
+               this.valueSerializerClassName = 
valueSerializer.getClass().getName();
+       }
+
+       @Override
+       public FsHeapKvState<K, V> restoreState(
+                       FsStateBackend stateBackend,
+                       final TypeSerializer<K> keySerializer,
+                       final TypeSerializer<V> valueSerializer,
+                       V defaultValue,
+                       ClassLoader classLoader) throws Exception {
+
+               // validity checks
+               if 
(!keySerializer.getClass().getName().equals(keySerializerClassName) ||
+                               
!valueSerializer.getClass().getName().equals(valueSerializerClassName)) {
+                       throw new IllegalArgumentException(
+                                       "Cannot restore the state from the 
snapshot with the given serializers. " +
+                                                       "State (K/V) was 
serialized with (" + valueSerializerClassName +
+                                                       "/" + 
keySerializerClassName + ")");
+               }
+               
+               // state restore
+               try (FSDataInputStream inStream = 
stateBackend.getFileSystem().open(getFilePath())) {
+                       InputViewDataInputStreamWrapper inView = new 
InputViewDataInputStreamWrapper(new DataInputStream(inStream));
+                       
+                       final int numEntries = inView.readInt();
+                       HashMap<K, V> stateMap = new HashMap<>(numEntries);
+                       
+                       for (int i = 0; i < numEntries; i++) {
+                               K key = keySerializer.deserialize(inView);
+                               V value = valueSerializer.deserialize(inView);
+                               stateMap.put(key, value);
+                       }
+                       
+                       return new FsHeapKvState<K, V>(keySerializer, 
valueSerializer, defaultValue, stateMap, stateBackend);
+               }
+               catch (Exception e) {
+                       throw new Exception("Failed to restore state from file 
system", e);
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/a6890b28/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsStateBackend.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsStateBackend.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsStateBackend.java
new file mode 100644
index 0000000..045c411
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsStateBackend.java
@@ -0,0 +1,412 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.filesystem;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.core.fs.FSDataOutputStream;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.state.StateHandle;
+import org.apache.flink.runtime.state.StateBackend;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.ObjectOutputStream;
+import java.io.Serializable;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.UUID;
+
+/**
+ * The file state backend is a state backend that stores the state of 
streaming jobs in a file system.
+ * 
+ * <p>The state backend has one core directory into which it puts all 
checkpoint data. Inside that
+ * directory, it creates a directory per job, inside which each checkpoint 
gets a directory, with
+ * files for each state, for example:
+ * 
+ * {@code 
hdfs://namenode:port/flink-checkpoints/<job-id>/chk-17/6ba7b810-9dad-11d1-80b4-00c04fd430c8
 }
+ */
+public class FsStateBackend extends StateBackend<FsStateBackend> {
+
+       private static final long serialVersionUID = -8191916350224044011L;
+       
+       private static final Logger LOG = 
LoggerFactory.getLogger(FsStateBackend.class);
+       
+       
+       /** The path to the directory for the checkpoint data, including the 
file system
+        * description via scheme and optional authority */
+       private final Path basePath;
+       
+       /** The directory (job specific) into this initialized instance of the 
backend stores its data */
+       private transient Path checkpointDirectory;
+       
+       /** Cached handle to the file system for file operations */
+       private transient FileSystem filesystem;
+
+
+       /**
+        * Creates a new state backend that stores its checkpoint data in the 
file system and location
+        * defined by the given URI.
+        *
+        * <p>A file system for the file system scheme in the URI (e.g., 
'file://', 'hdfs://', or 'S3://')
+        * must be accessible via {@link FileSystem#get(URI)}.
+        *
+        * <p>For a state backend targeting HDFS, this means that the URI must 
either specify the authority
+        * (host and port), or that the Hadoop configuration that describes 
that information must be in the
+        * classpath.
+        *
+        * @param checkpointDataUri The URI describing the filesystem (scheme 
and optionally authority),
+        *                          and the path to teh checkpoint data 
directory.
+        * @throws IOException Thrown, if no file system can be found for the 
scheme in the URI.
+        */
+       public FsStateBackend(String checkpointDataUri) throws IOException {
+               this(new Path(checkpointDataUri));
+       }
+
+       /**
+        * Creates a new state backend that stores its checkpoint data in the 
file system and location
+        * defined by the given URI.
+        *
+        * <p>A file system for the file system scheme in the URI (e.g., 
'file://', 'hdfs://', or 'S3://')
+        * must be accessible via {@link FileSystem#get(URI)}.
+        *
+        * <p>For a state backend targeting HDFS, this means that the URI must 
either specify the authority
+        * (host and port), or that the Hadoop configuration that describes 
that information must be in the
+        * classpath.
+        *
+        * @param checkpointDataUri The URI describing the filesystem (scheme 
and optionally authority),
+        *                          and the path to teh checkpoint data 
directory.
+        * @throws IOException Thrown, if no file system can be found for the 
scheme in the URI.
+        */
+       public FsStateBackend(Path checkpointDataUri) throws IOException {
+               this(checkpointDataUri.toUri());
+       }
+
+       /**
+        * Creates a new state backend that stores its checkpoint data in the 
file system and location
+        * defined by the given URI.
+        * 
+        * <p>A file system for the file system scheme in the URI (e.g., 
'file://', 'hdfs://', or 'S3://')
+        * must be accessible via {@link FileSystem#get(URI)}.
+        * 
+        * <p>For a state backend targeting HDFS, this means that the URI must 
either specify the authority
+        * (host and port), or that the Hadoop configuration that describes 
that information must be in the
+        * classpath.
+        * 
+        * @param checkpointDataUri The URI describing the filesystem (scheme 
and optionally authority),
+        *                          and the path to teh checkpoint data 
directory.
+        * @throws IOException Thrown, if no file system can be found for the 
scheme in the URI.
+        */
+       public FsStateBackend(URI checkpointDataUri) throws IOException {
+               final String scheme = checkpointDataUri.getScheme();
+               final String path = checkpointDataUri.getPath();
+               
+               // some validity checks
+               if (scheme == null) {
+                       throw new IllegalArgumentException("The scheme 
(hdfs://, file://, etc) is null. " +
+                                       "Please specify the file system scheme 
explicitly in the URI.");
+               }
+               if (path == null) {
+                       throw new IllegalArgumentException("The path to store 
the checkpoint data in is null. " +
+                                       "Please specify a directory path for 
the checkpoint data.");
+               }
+               if (path.length() == 0 || path.equals("/")) {
+                       throw new IllegalArgumentException("Cannot use the root 
directory for checkpoints.");
+               }
+               
+               // we do a bit of work to make sure that the URI for the 
filesystem refers to exactly the same
+               // (distributed) filesystem on all hosts and includes full 
host/port information, even if the
+               // original URI did not include that. We count on the 
filesystem loading from the configuration
+               // to fill in the missing data.
+               
+               // try to grab the file system for this path/URI
+               this.filesystem = FileSystem.get(checkpointDataUri);
+               if (this.filesystem == null) {
+                       throw new IOException("Could not find a file system for 
the given scheme in the available configurations.");
+               }
+
+               URI fsURI = this.filesystem.getUri();
+               try {
+                       URI baseURI = new URI(fsURI.getScheme(), 
fsURI.getAuthority(), path, null, null);
+                       this.basePath = new Path(baseURI);
+               }
+               catch (URISyntaxException e) {
+                       throw new IOException(
+                                       String.format("Cannot create file 
system URI for checkpointDataUri %s and filesystem URI %s", 
+                                                       checkpointDataUri, 
fsURI), e);
+               }
+       }
+
+       /**
+        * Gets the base directory where all state-containing files are stored.
+        * The job specific directory is created inside this directory.
+        * 
+        * @return The base directory.
+        */
+       public Path getBasePath() {
+               return basePath;
+       }
+
+       /**
+        * Gets the directory where this state backend stores its checkpoint 
data. Will be null if
+        * the state backend has not been initialized.
+        * 
+        * @return The directory where this state backend stores its checkpoint 
data.
+        */
+       public Path getCheckpointDirectory() {
+               return checkpointDirectory;
+       }
+
+       /**
+        * Checks whether this state backend is initialized. Note that 
initialization does not carry
+        * across serialization. After each serialization, the state backend 
needs to be initialized.
+        * 
+        * @return True, if the file state backend has been initialized, false 
otherwise.
+        */
+       public boolean isInitialized() {
+               return filesystem != null && checkpointDirectory != null; 
+       }
+
+       /**
+        * Gets the file system handle for the file system that stores the 
state for this backend.
+        * 
+        * @return This backend's file system handle.
+        */
+       public FileSystem getFileSystem() {
+               if (filesystem != null) {
+                       return filesystem;
+               }
+               else {
+                       throw new IllegalStateException("State backend has not 
been initialized.");
+               }
+       }
+
+       // 
------------------------------------------------------------------------
+       //  initialization and cleanup
+       // 
------------------------------------------------------------------------
+       
+       @Override
+       public void initializeForJob(JobID jobId) throws Exception {
+               Path dir = new Path(basePath, jobId.toString());
+               
+               LOG.info("Initializing file state backend to URI " + dir);
+               
+               filesystem = basePath.getFileSystem();
+               filesystem.mkdirs(dir);
+
+               checkpointDirectory = dir;
+       }
+
+       @Override
+       public void disposeAllStateForCurrentJob() throws Exception {
+               FileSystem fs = this.filesystem;
+               Path dir = this.checkpointDirectory;
+               
+               if (fs != null && dir != null) {
+                       this.filesystem = null;
+                       this.checkpointDirectory = null;
+                       fs.delete(dir, true);
+               }
+               else {
+                       throw new IllegalStateException("state backend has not 
been initialized");
+               }
+       }
+
+       @Override
+       public void close() throws Exception {}
+
+       // 
------------------------------------------------------------------------
+       //  state backend operations
+       // 
------------------------------------------------------------------------
+       
+       @Override
+       public <K, V> FsHeapKvState<K, V> createKvState(
+                       TypeSerializer<K> keySerializer, TypeSerializer<V> 
valueSerializer, V defaultValue) throws Exception {
+               return new FsHeapKvState<K, V>(keySerializer, valueSerializer, 
defaultValue, this);
+       }
+
+       @Override
+       public <S extends Serializable> StateHandle<S> 
checkpointStateSerializable(
+                       S state, long checkpointID, long timestamp) throws 
Exception
+       {
+               checkFileSystemInitialized();
+
+               // make sure the directory for that specific checkpoint exists
+               final Path checkpointDir = 
createCheckpointDirPath(checkpointID);
+               filesystem.mkdirs(checkpointDir);
+
+               
+               Exception latestException = null;
+
+               for (int attempt = 0; attempt < 10; attempt++) {
+                       Path targetPath = new Path(checkpointDir, 
UUID.randomUUID().toString());
+                       FSDataOutputStream outStream;
+                       try {
+                               outStream = filesystem.create(targetPath, 
false);
+                       }
+                       catch (Exception e) {
+                               latestException = e;
+                               continue;
+                       }
+
+                       ObjectOutputStream os = new 
ObjectOutputStream(outStream);
+                       os.writeObject(state);
+                       os.close();
+                       return new FileSerializableStateHandle<S>(targetPath);
+               }
+               
+               throw new Exception("Could not open output stream for state 
backend", latestException);
+       }
+       
+       @Override
+       public FsCheckpointStateOutputStream 
createCheckpointStateOutputStream(long checkpointID, long timestamp) throws 
Exception {
+               checkFileSystemInitialized();
+               
+               final Path checkpointDir = 
createCheckpointDirPath(checkpointID);
+               filesystem.mkdirs(checkpointDir);
+               
+               Exception latestException = null;
+               
+               for (int attempt = 0; attempt < 10; attempt++) {
+                       Path targetPath = new Path(checkpointDir, 
UUID.randomUUID().toString());
+                       try {
+                               FSDataOutputStream outStream = 
filesystem.create(targetPath, false);
+                               return new 
FsCheckpointStateOutputStream(outStream, targetPath, filesystem);
+                       }
+                       catch (Exception e) {
+                               latestException = e;
+                       }
+               }
+               throw new Exception("Could not open output stream for state 
backend", latestException);
+       }
+       
+       // 
------------------------------------------------------------------------
+       //  utilities
+       // 
------------------------------------------------------------------------
+
+       private void checkFileSystemInitialized() throws IllegalStateException {
+               if (filesystem == null || checkpointDirectory == null) {
+                       throw new IllegalStateException("filesystem has not 
been re-initialized after deserialization");
+               }
+       }
+       
+       private Path createCheckpointDirPath(long checkpointID) {
+               return new Path(checkpointDirectory, "chk-" + checkpointID);
+       }
+       
+       @Override
+       public String toString() {
+               return checkpointDirectory == null ?
+                       "File State Backend @ " + basePath : 
+                       "File State Backend (initialized) @ " + 
checkpointDirectory;
+       }
+       
+       // 
------------------------------------------------------------------------
+       //  Output stream for state checkpointing
+       // 
------------------------------------------------------------------------
+
+       /**
+        * A CheckpointStateOutputStream that writes into a file and returns 
the path to that file upon
+        * closing.
+        */
+       public static final class FsCheckpointStateOutputStream extends 
CheckpointStateOutputStream {
+
+               private final FSDataOutputStream outStream;
+               
+               private final Path filePath;
+               
+               private final FileSystem fs;
+               
+               private boolean closed;
+
+               FsCheckpointStateOutputStream(FSDataOutputStream outStream, 
Path filePath, FileSystem fs) {
+                       this.outStream = outStream;
+                       this.filePath = filePath;
+                       this.fs = fs;
+               }
+
+
+               @Override
+               public void write(int b) throws IOException {
+                       outStream.write(b);
+               }
+
+               @Override
+               public void write(byte[] b, int off, int len) throws 
IOException {
+                       outStream.write(b, off, len);
+               }
+
+               @Override
+               public void flush() throws IOException {
+                       outStream.flush();
+               }
+
+               /**
+                * If the stream is only closed, we remove the produced file 
(cleanup through the auto close
+                * feature, for example). This method throws no exception if 
the deletion fails, but only
+                * logs the error.
+                */
+               @Override
+               public void close() {
+                       synchronized (this) {
+                               if (!closed) {
+                                       closed = true;
+                                       try {
+                                               outStream.close();
+                                               fs.delete(filePath, false);
+                                               
+                                               // attempt to delete the parent 
(will fail and be ignored if the parent has more files)
+                                               try {
+                                                       
fs.delete(filePath.getParent(), false);
+                                               } catch (IOException ignored) {}
+                                       }
+                                       catch (Exception e) {
+                                               LOG.warn("Cannot delete closed 
and discarded state stream to " + filePath, e);
+                                       }
+                               }
+                       }
+               }
+
+               @Override
+               public FileStreamStateHandle closeAndGetHandle() throws 
IOException {
+                       return new FileStreamStateHandle(closeAndGetPath());
+               }
+
+               /**
+                * Closes the stream and returns the path to the file that 
contains the stream's data.
+                * @return The path to the file that contains the stream's data.
+                * @throws IOException Thrown if the stream cannot be 
successfully closed.
+                */
+               public Path closeAndGetPath() throws IOException {
+                       synchronized (this) {
+                               if (!closed) {
+                                       closed = true;
+                                       outStream.close();
+                                       return filePath;
+                               }
+                               else {
+                                       throw new IOException("Stream has 
already been closed and discarded.");
+                               }
+                       }
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/a6890b28/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsStateBackendFactory.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsStateBackendFactory.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsStateBackendFactory.java
new file mode 100644
index 0000000..e687f7f
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsStateBackendFactory.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.filesystem;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.IllegalConfigurationException;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.state.StateBackendFactory;
+
+/**
+ * A factory that creates an {@link 
org.apache.flink.runtime.state.filesystem.FsStateBackend}
+ * from a configuration.
+ */
+public class FsStateBackendFactory implements 
StateBackendFactory<FsStateBackend> {
+       
+       /** The key under which the config stores the directory where 
checkpoints should be stored */
+       public static final String CHECKPOINT_DIRECTORY_URI_CONF_KEY = 
"state.backend.fs.checkpointdir";
+       
+       
+       @Override
+       public FsStateBackend createFromConfig(Configuration config) throws 
Exception {
+               String checkpointDirURI = 
config.getString(CHECKPOINT_DIRECTORY_URI_CONF_KEY, null);
+
+               if (checkpointDirURI == null) {
+                       throw new IllegalConfigurationException(
+                                       "Cannot create the file system state 
backend: The configuration does not specify the " +
+                                                       "checkpoint directory 
'" + CHECKPOINT_DIRECTORY_URI_CONF_KEY + '\'');
+               }
+               
+               try {
+                       Path path = new Path(checkpointDirURI);
+                       return new FsStateBackend(path);
+               }
+               catch (IllegalArgumentException e) {
+                       throw new Exception("Cannot initialize File System 
State Backend with URI '"
+                                       + checkpointDirURI + '.', e);
+               }
+               
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/a6890b28/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/ByteStreamStateHandle.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/ByteStreamStateHandle.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/ByteStreamStateHandle.java
new file mode 100644
index 0000000..29762f7
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/ByteStreamStateHandle.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.memory;
+
+import org.apache.flink.runtime.state.StreamStateHandle;
+
+import java.io.ByteArrayInputStream;
+import java.io.InputStream;
+
+/**
+ * A state handle that contains stream state in a byte array.
+ */
+public final class ByteStreamStateHandle implements StreamStateHandle {
+
+       private static final long serialVersionUID = -5280226231200217594L;
+       
+       /** the state data */
+       private final byte[] data;
+
+       /**
+        * Creates a new ByteStreamStateHandle containing the given data.
+        * 
+        * @param data The state data.
+        */
+       public ByteStreamStateHandle(byte[] data) {
+               this.data = data;
+       }
+
+       @Override
+       public InputStream getState(ClassLoader userCodeClassLoader) {
+               return new ByteArrayInputStream(data);
+       }
+
+       @Override
+       public void discardState() {}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/a6890b28/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemHeapKvState.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemHeapKvState.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemHeapKvState.java
new file mode 100644
index 0000000..96cb440
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemHeapKvState.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.memory;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.runtime.util.DataOutputSerializer;
+import org.apache.flink.runtime.state.AbstractHeapKvState;
+
+import java.util.HashMap;
+
+/**
+ * Heap-backed key/value state that is snapshotted into a serialized memory 
copy.
+ *
+ * @param <K> The type of the key.
+ * @param <V> The type of the value.
+ */
+public class MemHeapKvState<K, V> extends AbstractHeapKvState<K, V, 
MemoryStateBackend> {
+       
+       public MemHeapKvState(TypeSerializer<K> keySerializer, 
TypeSerializer<V> valueSerializer, V defaultValue) {
+               super(keySerializer, valueSerializer, defaultValue);
+       }
+
+       public MemHeapKvState(TypeSerializer<K> keySerializer, 
TypeSerializer<V> valueSerializer,
+                                                       V defaultValue, 
HashMap<K, V> state) {
+               super(keySerializer, valueSerializer, defaultValue, state);
+       }
+       
+       @Override
+       public MemoryHeapKvStateSnapshot<K, V> shapshot(long checkpointId, long 
timestamp) throws Exception {
+               DataOutputSerializer ser = new 
DataOutputSerializer(Math.max(size() * 16, 16));
+               writeStateToOutputView(ser);
+               byte[] bytes = ser.getCopyOfBuffer();
+               
+               return new MemoryHeapKvStateSnapshot<K, V>(getKeySerializer(), 
getValueSerializer(), bytes, size());
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/a6890b28/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemoryHeapKvStateSnapshot.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemoryHeapKvStateSnapshot.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemoryHeapKvStateSnapshot.java
new file mode 100644
index 0000000..1b03def
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemoryHeapKvStateSnapshot.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.memory;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.runtime.util.DataInputDeserializer;
+import org.apache.flink.runtime.state.KvStateSnapshot;
+
+import java.util.HashMap;
+
+/**
+ * A snapshot of a {@link MemHeapKvState} for a checkpoint. The data is stored 
in a heap byte
+ * array, in serialized form.
+ * 
+ * @param <K> The type of the key in the snapshot state.
+ * @param <V> The type of the value in the snapshot state.
+ */
+public class MemoryHeapKvStateSnapshot<K, V> implements KvStateSnapshot<K, V, 
MemoryStateBackend> {
+       
+       private static final long serialVersionUID = 1L;
+       
+       /** Name of the key serializer class */
+       private final String keySerializerClassName;
+
+       /** Name of the value serializer class */
+       private final String valueSerializerClassName;
+       
+       /** The serialized data of the state key/value pairs */
+       private final byte[] data;
+       
+       /** The number of key/value pairs */
+       private final int numEntries;
+
+       /**
+        * Creates a new heap memory state snapshot.
+        *
+        * @param keySerializer The serializer for the keys.
+        * @param valueSerializer The serializer for the values.
+        * @param data The serialized data of the state key/value pairs
+        * @param numEntries The number of key/value pairs
+        */
+       public MemoryHeapKvStateSnapshot(TypeSerializer<K> keySerializer,
+                                               TypeSerializer<V> 
valueSerializer, byte[] data, int numEntries) {
+               this.keySerializerClassName = 
keySerializer.getClass().getName();
+               this.valueSerializerClassName = 
valueSerializer.getClass().getName();
+               this.data = data;
+               this.numEntries = numEntries;
+       }
+
+
+       @Override
+       public MemHeapKvState<K, V> restoreState(
+                       MemoryStateBackend stateBackend,
+                       final TypeSerializer<K> keySerializer,
+                       final TypeSerializer<V> valueSerializer,
+                       V defaultValue,
+                       ClassLoader classLoader) throws Exception {
+
+               // validity checks
+               if 
(!keySerializer.getClass().getName().equals(keySerializerClassName) ||
+                       
!valueSerializer.getClass().getName().equals(valueSerializerClassName)) {
+                               throw new IllegalArgumentException(
+                                               "Cannot restore the state from 
the snapshot with the given serializers. " +
+                                               "State (K/V) was serialized 
with (" + valueSerializerClassName + 
+                                               "/" + keySerializerClassName + 
")");
+               }
+               
+               // restore state
+               HashMap<K, V> stateMap = new HashMap<>(numEntries);
+               DataInputDeserializer in = new DataInputDeserializer(data, 0, 
data.length);
+               
+               for (int i = 0; i < numEntries; i++) {
+                       K key = keySerializer.deserialize(in);
+                       V value = valueSerializer.deserialize(in);
+                       stateMap.put(key, value);
+               }
+               
+               return new MemHeapKvState<K, V>(keySerializer, valueSerializer, 
defaultValue, stateMap);
+       }
+
+       /**
+        * Discarding the heap state is a no-op.
+        */
+       @Override
+       public void discardState() {}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/a6890b28/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemoryStateBackend.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemoryStateBackend.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemoryStateBackend.java
new file mode 100644
index 0000000..8d297d4
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemoryStateBackend.java
@@ -0,0 +1,209 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.memory;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.runtime.state.StateHandle;
+import org.apache.flink.runtime.state.StateBackend;
+import org.apache.flink.runtime.state.StreamStateHandle;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.Serializable;
+
+/**
+ * A {@link StateBackend} that stores all its data and checkpoints in memory 
and has no
+ * capabilities to spill to disk. Checkpoints are serialized and the 
serialized data is
+ * transferred 
+ */
+public class MemoryStateBackend extends StateBackend<MemoryStateBackend> {
+
+       private static final long serialVersionUID = 4109305377809414635L;
+       
+       /** The default maximal size that the snapshotted memory state may have 
(5 MiBytes) */
+       private static final int DEFAULT_MAX_STATE_SIZE = 5 * 1024 * 1024;
+       
+       /** The maximal size that the snapshotted memory state may have */
+       private final int maxStateSize;
+
+       /**
+        * Creates a new memory state backend that accepts states whose 
serialized forms are
+        * up to the default state size (5 MB).
+        */
+       public MemoryStateBackend() {
+               this(DEFAULT_MAX_STATE_SIZE);
+       }
+
+       /**
+        * Creates a new memory state backend that accepts states whose 
serialized forms are
+        * up to the given number of bytes.
+        * 
+        * @param maxStateSize The maximal size of the serialized state
+        */
+       public MemoryStateBackend(int maxStateSize) {
+               this.maxStateSize = maxStateSize;
+       }
+
+       // 
------------------------------------------------------------------------
+       //  initialization and cleanup
+       // 
------------------------------------------------------------------------
+
+       @Override
+       public void initializeForJob(JobID job) {
+               // nothing to do here
+       }
+
+       @Override
+       public void disposeAllStateForCurrentJob() {
+               // nothing to do here, GC will do it
+       }
+
+       @Override
+       public void close() throws Exception {}
+
+       // 
------------------------------------------------------------------------
+       //  State backend operations
+       // 
------------------------------------------------------------------------
+       
+       @Override
+       public <K, V> MemHeapKvState<K, V> createKvState(
+                       TypeSerializer<K> keySerializer, TypeSerializer<V> 
valueSerializer, V defaultValue) {
+               return new MemHeapKvState<K, V>(keySerializer, valueSerializer, 
defaultValue);
+       }
+       
+       /**
+        * Serialized the given state into bytes using Java serialization and 
creates a state handle that
+        * can re-create that state.
+        * 
+        * @param state The state to checkpoint.
+        * @param checkpointID The ID of the checkpoint.
+        * @param timestamp The timestamp of the checkpoint.
+        * @param <S> The type of the state.
+        * 
+        * @return A state handle that contains the given state serialized as 
bytes.
+        * @throws Exception Thrown, if the serialization fails.
+        */
+       @Override
+       public <S extends Serializable> StateHandle<S> 
checkpointStateSerializable(
+                       S state, long checkpointID, long timestamp) throws 
Exception
+       {
+               SerializedStateHandle<S> handle = new 
SerializedStateHandle<>(state);
+               checkSize(handle.getSizeOfSerializedState(), maxStateSize);
+               return new SerializedStateHandle<S>(state);
+       }
+
+       @Override
+       public CheckpointStateOutputStream createCheckpointStateOutputStream(
+                       long checkpointID, long timestamp) throws Exception
+       {
+               return new MemoryCheckpointOutputStream(maxStateSize);
+       }
+
+       // 
------------------------------------------------------------------------
+       //  Utilities
+       // 
------------------------------------------------------------------------
+       
+       @Override
+       public String toString() {
+               return "MemoryStateBackend (data in heap memory / checkpoints 
to JobManager)";
+       }
+
+       static void checkSize(int size, int maxSize) throws IOException {
+               if (size > maxSize) {
+                       throw new IOException(
+                                       "Size of the state is larger than the 
maximum permitted memory-backed state. Size="
+                                                       + size + " , maxSize=" 
+ maxSize
+                                                       + " . Consider using a 
different state backend, like the File System State backend.");
+               }
+       }
+       
+       // 
------------------------------------------------------------------------
+
+       /**
+        * A CheckpointStateOutputStream that writes into a byte array.
+        */
+       public static final class MemoryCheckpointOutputStream extends 
CheckpointStateOutputStream {
+               
+               private final ByteArrayOutputStream os = new 
ByteArrayOutputStream();
+               
+               private final int maxSize;
+               
+               private boolean closed;
+
+               public MemoryCheckpointOutputStream(int maxSize) {
+                       this.maxSize = maxSize;
+               }
+
+               @Override
+               public void write(int b) {
+                       os.write(b);
+               }
+
+               @Override
+               public void write(byte[] b, int off, int len) {
+                       os.write(b, off, len);
+               }
+
+               // 
--------------------------------------------------------------------
+
+               @Override
+               public void close() {
+                       closed = true;
+                       os.reset();
+               }
+
+               @Override
+               public StreamStateHandle closeAndGetHandle() throws IOException 
{
+                       return new ByteStreamStateHandle(closeAndGetBytes());
+               }
+
+               /**
+                * Closes the stream and returns the byte array containing the 
stream's data.
+                * @return The byte array containing the stream's data.
+                * @throws IOException Thrown if the size of the data exceeds 
the maximal 
+                */
+               public byte[] closeAndGetBytes() throws IOException {
+                       if (!closed) {
+                               checkSize(os.size(), maxSize);
+                               byte[] bytes = os.toByteArray();
+                               close();
+                               return bytes;
+                       }
+                       else {
+                               throw new IllegalStateException("stream has 
already been closed");
+                       }
+               }
+       }
+       
+       // 
------------------------------------------------------------------------
+       //  Static default instance
+       // 
------------------------------------------------------------------------
+       
+       /** The default instance of this state backend, using the default 
maximal state size */
+       private static final MemoryStateBackend DEFAULT_INSTANCE = new 
MemoryStateBackend();
+
+       /**
+        * Gets the default instance of this state backend, using the default 
maximal state size.
+        * @return The default instance of this state backend.
+        */
+       public static MemoryStateBackend defaultInstance() {
+               return DEFAULT_INSTANCE;
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/a6890b28/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/SerializedStateHandle.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/SerializedStateHandle.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/SerializedStateHandle.java
new file mode 100644
index 0000000..c488dc9
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/SerializedStateHandle.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.memory;
+
+import org.apache.flink.runtime.state.StateHandle;
+import org.apache.flink.util.SerializedValue;
+
+import java.io.IOException;
+
+/**
+ * A state handle that represents its state in serialized form as bytes.
+ *
+ * @param <T> The type of state represented by this state handle.
+ */
+public class SerializedStateHandle<T> extends SerializedValue<T> implements 
StateHandle<T> {
+       
+       private static final long serialVersionUID = 4145685722538475769L;
+
+       public SerializedStateHandle(T value) throws IOException {
+               super(value);
+       }
+       
+       @Override
+       public T getState(ClassLoader classLoader) throws Exception {
+               return deserializeValue(classLoader);
+       }
+
+       /**
+        * Discarding heap-memory backed state is a no-op, so this method does 
nothing.
+        */
+       @Override
+       public void discardState() {}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/a6890b28/flink-runtime/src/main/java/org/apache/flink/runtime/util/ZooKeeperUtils.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/util/ZooKeeperUtils.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/util/ZooKeeperUtils.java
index 79b9b7e..a32fc65 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/util/ZooKeeperUtils.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/util/ZooKeeperUtils.java
@@ -34,11 +34,14 @@ import 
org.apache.flink.runtime.jobmanager.SubmittedJobGraph;
 import org.apache.flink.runtime.jobmanager.ZooKeeperSubmittedJobGraphStore;
 import org.apache.flink.runtime.leaderelection.ZooKeeperLeaderElectionService;
 import 
org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalService;
-import org.apache.flink.runtime.state.StateHandleProvider;
-import org.apache.flink.runtime.state.StateHandleProviderFactory;
+import org.apache.flink.runtime.zookeeper.StateStorageHelper;
+import 
org.apache.flink.runtime.zookeeper.filesystem.FileSystemStateStorageHelper;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.IOException;
+import java.io.Serializable;
+
 import static com.google.common.base.Preconditions.checkNotNull;
 
 public class ZooKeeperUtils {
@@ -170,7 +173,7 @@ public class ZooKeeperUtils {
                String latchPath = 
configuration.getString(ConfigConstants.ZOOKEEPER_LATCH_PATH,
                                ConfigConstants.DEFAULT_ZOOKEEPER_LATCH_PATH);
                String leaderPath = 
configuration.getString(ConfigConstants.ZOOKEEPER_LEADER_PATH,
-                               ConfigConstants.DEFAULT_ZOOKEEPER_LEADER_PATH);
+                       ConfigConstants.DEFAULT_ZOOKEEPER_LEADER_PATH);
 
                return new ZooKeeperLeaderElectionService(client, latchPath, 
leaderPath);
        }
@@ -188,8 +191,7 @@ public class ZooKeeperUtils {
 
                checkNotNull(configuration, "Configuration");
 
-               StateHandleProvider<SubmittedJobGraph> stateHandleProvider =
-                               
StateHandleProviderFactory.createRecoveryFileStateHandleProvider(configuration);
+               StateStorageHelper<SubmittedJobGraph> stateStorage = 
createFileSystemStateStorage(configuration, "submittedJobGraph");
 
                // ZooKeeper submitted jobs root dir
                String zooKeeperSubmittedJobsPath = configuration.getString(
@@ -197,7 +199,7 @@ public class ZooKeeperUtils {
                                
ConfigConstants.DEFAULT_ZOOKEEPER_JOBGRAPHS_PATH);
 
                return new ZooKeeperSubmittedJobGraphStore(
-                               client, zooKeeperSubmittedJobsPath, 
stateHandleProvider);
+                               client, zooKeeperSubmittedJobsPath, 
stateStorage);
        }
 
        /**
@@ -219,21 +221,23 @@ public class ZooKeeperUtils {
 
                checkNotNull(configuration, "Configuration");
 
-               StateHandleProvider<CompletedCheckpoint> stateHandleProvider =
-                               
StateHandleProviderFactory.createRecoveryFileStateHandleProvider(configuration);
+               String checkpointsPath = configuration.getString(
+                       ConfigConstants.ZOOKEEPER_CHECKPOINTS_PATH,
+                       ConfigConstants.DEFAULT_ZOOKEEPER_CHECKPOINTS_PATH);
 
-               String completedCheckpointsPath = configuration.getString(
-                               ConfigConstants.ZOOKEEPER_CHECKPOINTS_PATH,
-                               
ConfigConstants.DEFAULT_ZOOKEEPER_CHECKPOINTS_PATH);
 
-               completedCheckpointsPath += 
ZooKeeperSubmittedJobGraphStore.getPathForJob(jobId);
+               StateStorageHelper<CompletedCheckpoint> stateStorage = 
createFileSystemStateStorage(
+                       configuration,
+                       "completedCheckpoint");
+
+               checkpointsPath += 
ZooKeeperSubmittedJobGraphStore.getPathForJob(jobId);
 
                return new ZooKeeperCompletedCheckpointStore(
                                maxNumberOfCheckpointsToRetain,
                                userClassLoader,
                                client,
-                               completedCheckpointsPath,
-                               stateHandleProvider);
+                               checkpointsPath,
+                               stateStorage);
        }
 
        /**
@@ -259,6 +263,30 @@ public class ZooKeeperUtils {
        }
 
        /**
+        * Creates a {@link FileSystemStateStorageHelper} instance.
+        *
+        * @param configuration {@link Configuration} object
+        * @param prefix Prefix for the created files
+        * @param <T> Type of the state objects
+        * @return {@link FileSystemStateStorageHelper} instance
+        * @throws IOException
+        */
+       private static <T extends Serializable> FileSystemStateStorageHelper<T> 
createFileSystemStateStorage(
+                       Configuration configuration,
+                       String prefix) throws IOException {
+
+               String rootPath = configuration.getString(
+                       ConfigConstants.STATE_BACKEND_FS_RECOVERY_PATH, "");
+
+               if (rootPath.equals("")) {
+                       throw new IllegalConfigurationException("Missing 
recovery path. Specify via " +
+                               "configuration key '" + 
ConfigConstants.STATE_BACKEND_FS_RECOVERY_PATH + "'.");
+               } else {
+                       return new FileSystemStateStorageHelper<T>(rootPath, 
prefix);
+               }
+       }
+
+       /**
         * Private constructor to prevent instantiation.
         */
        private ZooKeeperUtils() {

http://git-wip-us.apache.org/repos/asf/flink/blob/a6890b28/flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/StateStorageHelper.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/StateStorageHelper.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/StateStorageHelper.java
new file mode 100644
index 0000000..d18cace
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/StateStorageHelper.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.zookeeper;
+
+import org.apache.flink.runtime.state.StateHandle;
+
+import java.io.Serializable;
+
+/**
+ * State storage helper which is used by {@ZooKeeperStateHandleStore} to 
persiste state before
+ * the state handle is written to ZooKeeper.
+ *
+ * @param <T>
+ */
+public interface StateStorageHelper<T extends Serializable> {
+
+       /**
+        * Stores the given state and returns a state handle to it.
+        *
+        * @param state State to be stored
+        * @return State handle to the stored state
+        * @throws Exception
+        */
+       StateHandle<T> store(T state) throws Exception;
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/a6890b28/flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStore.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStore.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStore.java
index 936fe1b..6073a39 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStore.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStore.java
@@ -23,12 +23,14 @@ import org.apache.curator.framework.api.BackgroundCallback;
 import org.apache.curator.utils.ZKPaths;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.runtime.state.StateHandle;
-import org.apache.flink.runtime.state.StateHandleProvider;
 import org.apache.flink.util.InstantiationUtil;
 import org.apache.zookeeper.CreateMode;
 import org.apache.zookeeper.KeeperException;
 import org.apache.zookeeper.data.Stat;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
+import java.io.IOException;
 import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.List;
@@ -65,11 +67,12 @@ import static 
com.google.common.base.Preconditions.checkNotNull;
  */
 public class ZooKeeperStateHandleStore<T extends Serializable> {
 
+       public static Logger LOG = 
LoggerFactory.getLogger(ZooKeeperStateHandleStore.class);
+
        /** Curator ZooKeeper client */
        private final CuratorFramework client;
 
-       /** State handle provider */
-       private final StateHandleProvider<T> stateHandleProvider;
+       private final StateStorageHelper<T> storage;
 
        /**
         * Creates a {@link ZooKeeperStateHandleStore}.
@@ -78,14 +81,13 @@ public class ZooKeeperStateHandleStore<T extends 
Serializable> {
         *                            expected that the client's namespace 
ensures that the root
         *                            path is exclusive for all state handles 
managed by this
         *                            instance, e.g. 
<code>client.usingNamespace("/stateHandles")</code>
-        * @param stateHandleProvider The state handle provider for the state
         */
        public ZooKeeperStateHandleStore(
-                       CuratorFramework client,
-                       StateHandleProvider<T> stateHandleProvider) {
+               CuratorFramework client,
+               StateStorageHelper storage) throws IOException {
 
                this.client = checkNotNull(client, "Curator client");
-               this.stateHandleProvider = checkNotNull(stateHandleProvider, 
"State handle provider");
+               this.storage = checkNotNull(storage, "State storage");
        }
 
        /**
@@ -112,12 +114,14 @@ public class ZooKeeperStateHandleStore<T extends 
Serializable> {
         * @return Created {@link StateHandle}
         * @throws Exception If a ZooKeeper or state handle operation fails
         */
-       public StateHandle<T> add(String pathInZooKeeper, T state, CreateMode 
createMode) throws Exception {
+       public StateHandle<T> add(
+                       String pathInZooKeeper,
+                       T state,
+                       CreateMode createMode) throws Exception {
                checkNotNull(pathInZooKeeper, "Path in ZooKeeper");
                checkNotNull(state, "State");
 
-               // Create the state handle. Nothing persisted yet.
-               StateHandle<T> stateHandle = 
stateHandleProvider.createStateHandle(state);
+               StateHandle<T> stateHandle = storage.store(state);
 
                boolean success = false;
 
@@ -159,7 +163,7 @@ public class ZooKeeperStateHandleStore<T extends 
Serializable> {
 
                StateHandle<T> oldStateHandle = get(pathInZooKeeper);
 
-               StateHandle<T> stateHandle = 
stateHandleProvider.createStateHandle(state);
+               StateHandle<T> stateHandle = storage.store(state);
 
                boolean success = false;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/a6890b28/flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/filesystem/FileSystemStateStorageHelper.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/filesystem/FileSystemStateStorageHelper.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/filesystem/FileSystemStateStorageHelper.java
new file mode 100644
index 0000000..d6b69e4
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/filesystem/FileSystemStateStorageHelper.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.zookeeper.filesystem;
+
+import com.google.common.base.Preconditions;
+import org.apache.flink.core.fs.FSDataOutputStream;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.state.StateHandle;
+import org.apache.flink.runtime.state.filesystem.FileSerializableStateHandle;
+import org.apache.flink.runtime.util.FileUtils;
+import org.apache.flink.runtime.zookeeper.StateStorageHelper;
+
+import java.io.IOException;
+import java.io.ObjectOutputStream;
+import java.io.Serializable;
+
+/**
+ * {@link StateStorageHelper} implementation which stores the state in the 
given filesystem path.
+ *
+ * @param <T>
+ */
+public class FileSystemStateStorageHelper<T extends Serializable> implements 
StateStorageHelper<T> {
+
+       private final Path rootPath;
+
+       private final String prefix;
+
+       private final FileSystem fs;
+
+       public FileSystemStateStorageHelper(String rootPath, String prefix) 
throws IOException {
+               this(new Path(rootPath), prefix);
+       }
+
+       public FileSystemStateStorageHelper(Path rootPath, String prefix) 
throws IOException {
+               this.rootPath = Preconditions.checkNotNull(rootPath, "Root 
path");
+               this.prefix = Preconditions.checkNotNull(prefix, "Prefix");
+
+               fs = FileSystem.get(rootPath.toUri());
+       }
+
+       @Override
+       public StateHandle<T> store(T state) throws Exception {
+               Exception latestException = null;
+
+               for (int attempt = 0; attempt < 10; attempt++) {
+                       Path filePath = getNewFilePath();
+                       FSDataOutputStream outStream;
+                       try {
+                               outStream = fs.create(filePath, false);
+                       }
+                       catch (Exception e) {
+                               latestException = e;
+                               continue;
+                       }
+
+                       try(ObjectOutputStream os = new 
ObjectOutputStream(outStream)) {
+                               os.writeObject(state);
+                       }
+
+                       return new FileSerializableStateHandle<>(filePath);
+               }
+
+               throw new Exception("Could not open output stream for state 
backend", latestException);
+       }
+
+       private Path getNewFilePath() {
+               return new Path(rootPath, FileUtils.getRandomFilename(prefix));
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/a6890b28/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
index ebc0ea9..d9b69ad 100644
--- 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
+++ 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
@@ -1542,30 +1542,25 @@ object JobManager {
       }
     }
 
-    val webMonitor: Option[WebMonitor] =
-      if (configuration.getInteger(ConfigConstants.JOB_MANAGER_WEB_PORT_KEY, 
0) >= 0) {
-        val address = AkkaUtils.getAddress(jobManagerSystem)
+    val address = AkkaUtils.getAddress(jobManagerSystem)
 
-        configuration.setString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, 
address.host.get)
-        configuration.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, 
address.port.get)
+    configuration.setString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, 
address.host.get)
+    configuration.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, 
address.port.get)
 
-        // start the job manager web frontend
-        if 
(configuration.getBoolean(ConfigConstants.JOB_MANAGER_NEW_WEB_FRONTEND_KEY, 
false)) {
-          val leaderRetrievalService = LeaderRetrievalUtils
-            .createLeaderRetrievalService(configuration)
+    val webMonitor: Option[WebMonitor] =
+      if (configuration.getInteger(ConfigConstants.JOB_MANAGER_WEB_PORT_KEY, 
0) >= 0) {
+        LOG.info("Starting JobManger web frontend")
+        val leaderRetrievalService = LeaderRetrievalUtils
+          .createLeaderRetrievalService(configuration)
 
-          LOG.info("Starting NEW JobManger web frontend")
-          // start the new web frontend. we need to load this dynamically
-          // because it is not in the same project/dependencies
-          Some(startWebRuntimeMonitor(configuration, leaderRetrievalService, 
jobManagerSystem))
-        }
-        else {
-          LOG.info("Starting JobManger web frontend")
+        // start the web frontend. we need to load this dynamically
+        // because it is not in the same project/dependencies
+        val webServer = WebMonitorUtils.startWebRuntimeMonitor(
+          configuration,
+          leaderRetrievalService,
+          jobManagerSystem)
 
-          // The old web frontend does not work with recovery mode
-          val leaderRetrievalService = 
StandaloneUtils.createLeaderRetrievalService(configuration)
-          Some(new WebInfoServer(configuration, leaderRetrievalService, 
jobManagerSystem))
-        }
+        Option(webServer)
       }
       else {
         None
@@ -1624,16 +1619,8 @@ object JobManager {
         monitor =>
           val jobManagerAkkaUrl = 
JobManager.getRemoteJobManagerAkkaURL(configuration)
           monitor.start(jobManagerAkkaUrl)
-        LOG.info("Starting JobManger web frontend")
-        // start the web frontend. we need to load this dynamically
-        // because it is not in the same project/dependencies
-        val webServer = WebMonitorUtils.startWebRuntimeMonitor(
-          configuration,
-          leaderRetrievalService,
-          jobManagerSystem)
       }
 
-
       (jobManagerSystem, jobManager, archive, webMonitor)
     }
     catch {

http://git-wip-us.apache.org/repos/asf/flink/blob/a6890b28/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreITCase.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreITCase.java
index 4c6ddfd..dc6f550 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreITCase.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreITCase.java
@@ -19,6 +19,8 @@
 package org.apache.flink.runtime.checkpoint;
 
 import org.apache.flink.runtime.state.LocalStateHandle;
+import org.apache.flink.runtime.state.StateHandle;
+import org.apache.flink.runtime.zookeeper.StateStorageHelper;
 import org.apache.flink.runtime.zookeeper.ZooKeeperTestEnvironment;
 import org.junit.AfterClass;
 import org.junit.Before;
@@ -56,8 +58,12 @@ public class ZooKeeperCompletedCheckpointStoreITCase extends 
CompletedCheckpoint
                        ClassLoader userLoader) throws Exception {
 
                return new 
ZooKeeperCompletedCheckpointStore(maxNumberOfCheckpointsToRetain, userLoader,
-                               ZooKeeper.createClient(), CheckpointsPath, new 
LocalStateHandle
-                               
.LocalStateHandleProvider<CompletedCheckpoint>());
+                       ZooKeeper.createClient(), CheckpointsPath, new 
StateStorageHelper<CompletedCheckpoint>() {
+                       @Override
+                       public StateHandle<CompletedCheckpoint> 
store(CompletedCheckpoint state) throws Exception {
+                               return new LocalStateHandle<>(state);
+                       }
+               });
        }
 
        // 
---------------------------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/a6890b28/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheRecoveryITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheRecoveryITCase.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheRecoveryITCase.java
index 4df8afb..ea4195c 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheRecoveryITCase.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheRecoveryITCase.java
@@ -18,7 +18,6 @@
 
 package org.apache.flink.runtime.execution.librarycache;
 
-import org.apache.commons.io.FileUtils;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
@@ -28,9 +27,9 @@ import org.apache.flink.runtime.blob.BlobKey;
 import org.apache.flink.runtime.blob.BlobServer;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
 import org.apache.flink.runtime.jobmanager.RecoveryMode;
-import org.junit.After;
-import org.junit.Before;
+import org.junit.Rule;
 import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
 
 import java.io.File;
 import java.io.FileInputStream;
@@ -46,23 +45,8 @@ import static org.junit.Assert.assertEquals;
 
 public class BlobLibraryCacheRecoveryITCase {
 
-       private File recoveryDir;
-
-       @Before
-       public void setUp() throws Exception {
-               recoveryDir = new File(FileUtils.getTempDirectory(), 
"BlobRecoveryITCaseDir");
-               if (!recoveryDir.exists() && !recoveryDir.mkdirs()) {
-                       throw new IllegalStateException("Failed to create temp 
directory for test");
-               }
-       }
-
-       @After
-       public void cleanUp() throws Exception {
-               if (recoveryDir != null) {
-                       FileUtils.deleteDirectory(recoveryDir);
-               }
-       }
-
+       @Rule
+       public TemporaryFolder temporaryFolder = new TemporaryFolder();
        /**
         * Tests that with {@link RecoveryMode#ZOOKEEPER} distributed JARs are 
recoverable from any
         * participating BlobLibraryCacheManager.
@@ -81,7 +65,7 @@ public class BlobLibraryCacheRecoveryITCase {
                        Configuration config = new Configuration();
                        config.setString(ConfigConstants.RECOVERY_MODE, 
"ZOOKEEPER");
                        config.setString(ConfigConstants.STATE_BACKEND, 
"FILESYSTEM");
-                       
config.setString(ConfigConstants.STATE_BACKEND_FS_RECOVERY_PATH, 
recoveryDir.getPath());
+                       
config.setString(ConfigConstants.STATE_BACKEND_FS_RECOVERY_PATH, 
temporaryFolder.getRoot().getAbsolutePath());
 
                        for (int i = 0; i < server.length; i++) {
                                server[i] = new BlobServer(config);
@@ -170,7 +154,7 @@ public class BlobLibraryCacheRecoveryITCase {
                }
 
                // Verify everything is clean
-               File[] recoveryFiles = recoveryDir.listFiles();
+               File[] recoveryFiles = temporaryFolder.getRoot().listFiles();
                assertEquals("Unclean state backend: " + 
Arrays.toString(recoveryFiles), 0, recoveryFiles.length);
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/a6890b28/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/ZooKeeperSubmittedJobGraphsStoreITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/ZooKeeperSubmittedJobGraphsStoreITCase.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/ZooKeeperSubmittedJobGraphsStoreITCase.java
index 861a713..356ba36 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/ZooKeeperSubmittedJobGraphsStoreITCase.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/ZooKeeperSubmittedJobGraphsStoreITCase.java
@@ -24,7 +24,9 @@ import org.apache.flink.runtime.akka.ListeningBehaviour;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobgraph.JobVertex;
 import 
org.apache.flink.runtime.jobmanager.SubmittedJobGraphStore.SubmittedJobGraphListener;
-import 
org.apache.flink.runtime.state.LocalStateHandle.LocalStateHandleProvider;
+import org.apache.flink.runtime.state.LocalStateHandle;
+import org.apache.flink.runtime.state.StateHandle;
+import org.apache.flink.runtime.zookeeper.StateStorageHelper;
 import org.apache.flink.runtime.zookeeper.ZooKeeperTestEnvironment;
 import org.apache.flink.util.TestLogger;
 import org.junit.AfterClass;
@@ -54,8 +56,13 @@ public class ZooKeeperSubmittedJobGraphsStoreITCase extends 
TestLogger {
 
        private final static ZooKeeperTestEnvironment ZooKeeper = new 
ZooKeeperTestEnvironment(1);
 
-       private final static LocalStateHandleProvider<SubmittedJobGraph> 
StateHandleProvider =
-                       new LocalStateHandleProvider<>();
+       private final static StateStorageHelper<SubmittedJobGraph> 
localStateStorage = new StateStorageHelper<SubmittedJobGraph>() {
+               @Override
+               public StateHandle<SubmittedJobGraph> store(SubmittedJobGraph 
state) throws Exception {
+                       return new LocalStateHandle<>(state);
+               }
+       };
+
 
        @AfterClass
        public static void tearDown() throws Exception {
@@ -72,8 +79,9 @@ public class ZooKeeperSubmittedJobGraphsStoreITCase extends 
TestLogger {
        @Test
        public void testPutAndRemoveJobGraph() throws Exception {
                ZooKeeperSubmittedJobGraphStore jobGraphs = new 
ZooKeeperSubmittedJobGraphStore(
-                               ZooKeeper.createClient(), 
"/testPutAndRemoveJobGraph",
-                               StateHandleProvider);
+                       ZooKeeper.createClient(),
+                       "/testPutAndRemoveJobGraph",
+                       localStateStorage);
 
                try {
                        SubmittedJobGraphListener listener = 
mock(SubmittedJobGraphListener.class);
@@ -125,7 +133,7 @@ public class ZooKeeperSubmittedJobGraphsStoreITCase extends 
TestLogger {
        @Test
        public void testRecoverJobGraphs() throws Exception {
                ZooKeeperSubmittedJobGraphStore jobGraphs = new 
ZooKeeperSubmittedJobGraphStore(
-                               ZooKeeper.createClient(), 
"/testRecoverJobGraphs", StateHandleProvider);
+                               ZooKeeper.createClient(), 
"/testRecoverJobGraphs", localStateStorage);
 
                try {
                        SubmittedJobGraphListener listener = 
mock(SubmittedJobGraphListener.class);
@@ -175,10 +183,10 @@ public class ZooKeeperSubmittedJobGraphsStoreITCase 
extends TestLogger {
 
                try {
                        jobGraphs = new ZooKeeperSubmittedJobGraphStore(
-                                       ZooKeeper.createClient(), 
"/testConcurrentAddJobGraph", StateHandleProvider);
+                                       ZooKeeper.createClient(), 
"/testConcurrentAddJobGraph", localStateStorage);
 
                        otherJobGraphs = new ZooKeeperSubmittedJobGraphStore(
-                                       ZooKeeper.createClient(), 
"/testConcurrentAddJobGraph", StateHandleProvider);
+                                       ZooKeeper.createClient(), 
"/testConcurrentAddJobGraph", localStateStorage);
 
 
                        SubmittedJobGraph jobGraph = 
createSubmittedJobGraph(new JobID(), 0);
@@ -234,10 +242,10 @@ public class ZooKeeperSubmittedJobGraphsStoreITCase 
extends TestLogger {
        @Test(expected = IllegalStateException.class)
        public void testUpdateJobGraphYouDidNotGetOrAdd() throws Exception {
                ZooKeeperSubmittedJobGraphStore jobGraphs = new 
ZooKeeperSubmittedJobGraphStore(
-                               ZooKeeper.createClient(), 
"/testUpdateJobGraphYouDidNotGetOrAdd", StateHandleProvider);
+                               ZooKeeper.createClient(), 
"/testUpdateJobGraphYouDidNotGetOrAdd", localStateStorage);
 
                ZooKeeperSubmittedJobGraphStore otherJobGraphs = new 
ZooKeeperSubmittedJobGraphStore(
-                               ZooKeeper.createClient(), 
"/testUpdateJobGraphYouDidNotGetOrAdd", StateHandleProvider);
+                               ZooKeeper.createClient(), 
"/testUpdateJobGraphYouDidNotGetOrAdd", localStateStorage);
 
                jobGraphs.start(null);
                otherJobGraphs.start(null);

Reply via email to