http://git-wip-us.apache.org/repos/asf/flink/blob/a6890b28/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/filesystem/FileStreamStateHandle.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/filesystem/FileStreamStateHandle.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/filesystem/FileStreamStateHandle.java
deleted file mode 100644
index 79512d7..0000000
--- 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/filesystem/FileStreamStateHandle.java
+++ /dev/null
@@ -1,46 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.state.filesystem;
-
-import org.apache.flink.core.fs.Path;
-import org.apache.flink.streaming.api.state.StreamStateHandle;
-
-import java.io.InputStream;
-
-/**
- * A state handle that points to state in a file system, accessible as an 
input stream.
- */
-public class FileStreamStateHandle extends AbstractFileState implements 
StreamStateHandle {
-       
-       private static final long serialVersionUID = -6826990484549987311L;
-
-       /**
-        * Creates a new FileStreamStateHandle pointing to state at the given 
file path.
-        * 
-        * @param filePath The path to the file containing the checkpointed 
state.
-        */
-       public FileStreamStateHandle(Path filePath) {
-               super(filePath);
-       }
-
-       @Override
-       public InputStream getState(ClassLoader userCodeClassLoader) throws 
Exception {
-               return getFileSystem().open(getFilePath());
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/a6890b28/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/filesystem/FsHeapKvState.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/filesystem/FsHeapKvState.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/filesystem/FsHeapKvState.java
deleted file mode 100644
index 107a3be..0000000
--- 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/filesystem/FsHeapKvState.java
+++ /dev/null
@@ -1,88 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.state.filesystem;
-
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.core.memory.OutputViewDataOutputStreamWrapper;
-import org.apache.flink.streaming.api.state.AbstractHeapKvState;
-
-import java.io.DataOutputStream;
-import java.util.HashMap;
-
-/**
- * Heap-backed key/value state that is snapshotted into files.
- * 
- * @param <K> The type of the key.
- * @param <V> The type of the value.
- */
-public class FsHeapKvState<K, V> extends AbstractHeapKvState<K, V, 
FsStateBackend> {
-       
-       /** The file system state backend backing snapshots of this state */
-       private final FsStateBackend backend;
-       
-       /**
-        * Creates a new and empty key/value state.
-        * 
-        * @param keySerializer The serializer for the key.
-        * @param valueSerializer The serializer for the value.
-        * @param defaultValue The value that is returned when no other value 
has been associated with a key, yet.
-        * @param backend The file system state backend backing snapshots of 
this state
-        */
-       public FsHeapKvState(TypeSerializer<K> keySerializer, TypeSerializer<V> 
valueSerializer,
-                                                       V defaultValue, 
FsStateBackend backend) {
-               super(keySerializer, valueSerializer, defaultValue);
-               this.backend = backend;
-       }
-
-       /**
-        * Creates a new key/value state with the given state contents.
-        * This method is used to re-create key/value state with existing data, 
for example from
-        * a snapshot.
-        * 
-        * @param keySerializer The serializer for the key.
-        * @param valueSerializer The serializer for the value.
-        * @param defaultValue The value that is returned when no other value 
has been associated with a key, yet.
-        * @param state The map of key/value pairs to initialize the state with.
-        * @param backend The file system state backend backing snapshots of 
this state
-        */
-       public FsHeapKvState(TypeSerializer<K> keySerializer, TypeSerializer<V> 
valueSerializer,
-                                                       V defaultValue, 
HashMap<K, V> state, FsStateBackend backend) {
-               super(keySerializer, valueSerializer, defaultValue, state);
-               this.backend = backend;
-       }
-
-       
-       @Override
-       public FsHeapKvStateSnapshot<K, V> shapshot(long checkpointId, long 
timestamp) throws Exception {
-               // first, create an output stream to write to
-               try (FsStateBackend.FsCheckpointStateOutputStream out = 
-                                       
backend.createCheckpointStateOutputStream(checkpointId, timestamp)) {
-
-                       // serialize the state to the output stream
-                       OutputViewDataOutputStreamWrapper outView = 
-                                       new 
OutputViewDataOutputStreamWrapper(new DataOutputStream(out));
-                       outView.writeInt(size());
-                       writeStateToOutputView(outView);
-                       outView.flush();
-                       
-                       // create a handle to the state
-                       return new FsHeapKvStateSnapshot<>(getKeySerializer(), 
getValueSerializer(), out.closeAndGetPath());
-               }
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/a6890b28/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/filesystem/FsHeapKvStateSnapshot.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/filesystem/FsHeapKvStateSnapshot.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/filesystem/FsHeapKvStateSnapshot.java
deleted file mode 100644
index c7117f8..0000000
--- 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/filesystem/FsHeapKvStateSnapshot.java
+++ /dev/null
@@ -1,95 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.state.filesystem;
-
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.core.fs.FSDataInputStream;
-import org.apache.flink.core.fs.Path;
-import org.apache.flink.core.memory.InputViewDataInputStreamWrapper;
-import org.apache.flink.streaming.api.state.KvStateSnapshot;
-
-import java.io.DataInputStream;
-import java.util.HashMap;
-
-/**
- * A snapshot of a heap key/value state stored in a file.
- * 
- * @param <K> The type of the key in the snapshot state.
- * @param <V> The type of the value in the snapshot state.
- */
-public class FsHeapKvStateSnapshot<K, V> extends AbstractFileState implements 
KvStateSnapshot<K, V, FsStateBackend> {
-       
-       private static final long serialVersionUID = 1L;
-
-       /** Name of the key serializer class */
-       private final String keySerializerClassName;
-
-       /** Name of the value serializer class */
-       private final String valueSerializerClassName;
-
-       /**
-        * Creates a new state snapshot with data in the file system.
-        *
-        * @param keySerializer The serializer for the keys.
-        * @param valueSerializer The serializer for the values.
-        * @param filePath The path where the snapshot data is stored.
-        */
-       public FsHeapKvStateSnapshot(TypeSerializer<K> keySerializer, 
TypeSerializer<V> valueSerializer, Path filePath) {
-               super(filePath);
-               this.keySerializerClassName = 
keySerializer.getClass().getName();
-               this.valueSerializerClassName = 
valueSerializer.getClass().getName();
-       }
-
-       @Override
-       public FsHeapKvState<K, V> restoreState(
-                       FsStateBackend stateBackend,
-                       final TypeSerializer<K> keySerializer,
-                       final TypeSerializer<V> valueSerializer,
-                       V defaultValue,
-                       ClassLoader classLoader) throws Exception {
-
-               // validity checks
-               if 
(!keySerializer.getClass().getName().equals(keySerializerClassName) ||
-                               
!valueSerializer.getClass().getName().equals(valueSerializerClassName)) {
-                       throw new IllegalArgumentException(
-                                       "Cannot restore the state from the 
snapshot with the given serializers. " +
-                                                       "State (K/V) was 
serialized with (" + valueSerializerClassName +
-                                                       "/" + 
keySerializerClassName + ")");
-               }
-               
-               // state restore
-               try (FSDataInputStream inStream = 
stateBackend.getFileSystem().open(getFilePath())) {
-                       InputViewDataInputStreamWrapper inView = new 
InputViewDataInputStreamWrapper(new DataInputStream(inStream));
-                       
-                       final int numEntries = inView.readInt();
-                       HashMap<K, V> stateMap = new HashMap<>(numEntries);
-                       
-                       for (int i = 0; i < numEntries; i++) {
-                               K key = keySerializer.deserialize(inView);
-                               V value = valueSerializer.deserialize(inView);
-                               stateMap.put(key, value);
-                       }
-                       
-                       return new FsHeapKvState<K, V>(keySerializer, 
valueSerializer, defaultValue, stateMap, stateBackend);
-               }
-               catch (Exception e) {
-                       throw new Exception("Failed to restore state from file 
system", e);
-               }
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/a6890b28/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/filesystem/FsStateBackend.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/filesystem/FsStateBackend.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/filesystem/FsStateBackend.java
deleted file mode 100644
index 3cbd227..0000000
--- 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/filesystem/FsStateBackend.java
+++ /dev/null
@@ -1,412 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.state.filesystem;
-
-import org.apache.flink.api.common.JobID;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.core.fs.FSDataOutputStream;
-import org.apache.flink.core.fs.FileSystem;
-import org.apache.flink.core.fs.Path;
-import org.apache.flink.runtime.state.StateHandle;
-import org.apache.flink.streaming.api.state.StateBackend;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.io.ObjectOutputStream;
-import java.io.Serializable;
-import java.net.URI;
-import java.net.URISyntaxException;
-import java.util.UUID;
-
-/**
- * The file state backend is a state backend that stores the state of 
streaming jobs in a file system.
- * 
- * <p>The state backend has one core directory into which it puts all 
checkpoint data. Inside that
- * directory, it creates a directory per job, inside which each checkpoint 
gets a directory, with
- * files for each state, for example:
- * 
- * {@code 
hdfs://namenode:port/flink-checkpoints/<job-id>/chk-17/6ba7b810-9dad-11d1-80b4-00c04fd430c8
 }
- */
-public class FsStateBackend extends StateBackend<FsStateBackend> {
-
-       private static final long serialVersionUID = -8191916350224044011L;
-       
-       private static final Logger LOG = 
LoggerFactory.getLogger(FsStateBackend.class);
-       
-       
-       /** The path to the directory for the checkpoint data, including the 
file system
-        * description via scheme and optional authority */
-       private final Path basePath;
-       
-       /** The directory (job specific) into this initialized instance of the 
backend stores its data */
-       private transient Path checkpointDirectory;
-       
-       /** Cached handle to the file system for file operations */
-       private transient FileSystem filesystem;
-
-
-       /**
-        * Creates a new state backend that stores its checkpoint data in the 
file system and location
-        * defined by the given URI.
-        *
-        * <p>A file system for the file system scheme in the URI (e.g., 
'file://', 'hdfs://', or 'S3://')
-        * must be accessible via {@link FileSystem#get(URI)}.
-        *
-        * <p>For a state backend targeting HDFS, this means that the URI must 
either specify the authority
-        * (host and port), or that the Hadoop configuration that describes 
that information must be in the
-        * classpath.
-        *
-        * @param checkpointDataUri The URI describing the filesystem (scheme 
and optionally authority),
-        *                          and the path to teh checkpoint data 
directory.
-        * @throws IOException Thrown, if no file system can be found for the 
scheme in the URI.
-        */
-       public FsStateBackend(String checkpointDataUri) throws IOException {
-               this(new Path(checkpointDataUri));
-       }
-
-       /**
-        * Creates a new state backend that stores its checkpoint data in the 
file system and location
-        * defined by the given URI.
-        *
-        * <p>A file system for the file system scheme in the URI (e.g., 
'file://', 'hdfs://', or 'S3://')
-        * must be accessible via {@link FileSystem#get(URI)}.
-        *
-        * <p>For a state backend targeting HDFS, this means that the URI must 
either specify the authority
-        * (host and port), or that the Hadoop configuration that describes 
that information must be in the
-        * classpath.
-        *
-        * @param checkpointDataUri The URI describing the filesystem (scheme 
and optionally authority),
-        *                          and the path to teh checkpoint data 
directory.
-        * @throws IOException Thrown, if no file system can be found for the 
scheme in the URI.
-        */
-       public FsStateBackend(Path checkpointDataUri) throws IOException {
-               this(checkpointDataUri.toUri());
-       }
-
-       /**
-        * Creates a new state backend that stores its checkpoint data in the 
file system and location
-        * defined by the given URI.
-        * 
-        * <p>A file system for the file system scheme in the URI (e.g., 
'file://', 'hdfs://', or 'S3://')
-        * must be accessible via {@link FileSystem#get(URI)}.
-        * 
-        * <p>For a state backend targeting HDFS, this means that the URI must 
either specify the authority
-        * (host and port), or that the Hadoop configuration that describes 
that information must be in the
-        * classpath.
-        * 
-        * @param checkpointDataUri The URI describing the filesystem (scheme 
and optionally authority),
-        *                          and the path to teh checkpoint data 
directory.
-        * @throws IOException Thrown, if no file system can be found for the 
scheme in the URI.
-        */
-       public FsStateBackend(URI checkpointDataUri) throws IOException {
-               final String scheme = checkpointDataUri.getScheme();
-               final String path = checkpointDataUri.getPath();
-               
-               // some validity checks
-               if (scheme == null) {
-                       throw new IllegalArgumentException("The scheme 
(hdfs://, file://, etc) is null. " +
-                                       "Please specify the file system scheme 
explicitly in the URI.");
-               }
-               if (path == null) {
-                       throw new IllegalArgumentException("The path to store 
the checkpoint data in is null. " +
-                                       "Please specify a directory path for 
the checkpoint data.");
-               }
-               if (path.length() == 0 || path.equals("/")) {
-                       throw new IllegalArgumentException("Cannot use the root 
directory for checkpoints.");
-               }
-               
-               // we do a bit of work to make sure that the URI for the 
filesystem refers to exactly the same
-               // (distributed) filesystem on all hosts and includes full 
host/port information, even if the
-               // original URI did not include that. We count on the 
filesystem loading from the configuration
-               // to fill in the missing data.
-               
-               // try to grab the file system for this path/URI
-               this.filesystem = FileSystem.get(checkpointDataUri);
-               if (this.filesystem == null) {
-                       throw new IOException("Could not find a file system for 
the given scheme in the available configurations.");
-               }
-
-               URI fsURI = this.filesystem.getUri();
-               try {
-                       URI baseURI = new URI(fsURI.getScheme(), 
fsURI.getAuthority(), path, null, null);
-                       this.basePath = new Path(baseURI);
-               }
-               catch (URISyntaxException e) {
-                       throw new IOException(
-                                       String.format("Cannot create file 
system URI for checkpointDataUri %s and filesystem URI %s", 
-                                                       checkpointDataUri, 
fsURI), e);
-               }
-       }
-
-       /**
-        * Gets the base directory where all state-containing files are stored.
-        * The job specific directory is created inside this directory.
-        * 
-        * @return The base directory.
-        */
-       public Path getBasePath() {
-               return basePath;
-       }
-
-       /**
-        * Gets the directory where this state backend stores its checkpoint 
data. Will be null if
-        * the state backend has not been initialized.
-        * 
-        * @return The directory where this state backend stores its checkpoint 
data.
-        */
-       public Path getCheckpointDirectory() {
-               return checkpointDirectory;
-       }
-
-       /**
-        * Checks whether this state backend is initialized. Note that 
initialization does not carry
-        * across serialization. After each serialization, the state backend 
needs to be initialized.
-        * 
-        * @return True, if the file state backend has been initialized, false 
otherwise.
-        */
-       public boolean isInitialized() {
-               return filesystem != null && checkpointDirectory != null; 
-       }
-
-       /**
-        * Gets the file system handle for the file system that stores the 
state for this backend.
-        * 
-        * @return This backend's file system handle.
-        */
-       public FileSystem getFileSystem() {
-               if (filesystem != null) {
-                       return filesystem;
-               }
-               else {
-                       throw new IllegalStateException("State backend has not 
been initialized.");
-               }
-       }
-
-       // 
------------------------------------------------------------------------
-       //  initialization and cleanup
-       // 
------------------------------------------------------------------------
-       
-       @Override
-       public void initializeForJob(JobID jobId) throws Exception {
-               Path dir = new Path(basePath, jobId.toString());
-               
-               LOG.info("Initializing file state backend to URI " + dir);
-               
-               filesystem = basePath.getFileSystem();
-               filesystem.mkdirs(dir);
-
-               checkpointDirectory = dir;
-       }
-
-       @Override
-       public void disposeAllStateForCurrentJob() throws Exception {
-               FileSystem fs = this.filesystem;
-               Path dir = this.checkpointDirectory;
-               
-               if (fs != null && dir != null) {
-                       this.filesystem = null;
-                       this.checkpointDirectory = null;
-                       fs.delete(dir, true);
-               }
-               else {
-                       throw new IllegalStateException("state backend has not 
been initialized");
-               }
-       }
-
-       @Override
-       public void close() throws Exception {}
-
-       // 
------------------------------------------------------------------------
-       //  state backend operations
-       // 
------------------------------------------------------------------------
-       
-       @Override
-       public <K, V> FsHeapKvState<K, V> createKvState(
-                       TypeSerializer<K> keySerializer, TypeSerializer<V> 
valueSerializer, V defaultValue) throws Exception {
-               return new FsHeapKvState<K, V>(keySerializer, valueSerializer, 
defaultValue, this);
-       }
-
-       @Override
-       public <S extends Serializable> StateHandle<S> 
checkpointStateSerializable(
-                       S state, long checkpointID, long timestamp) throws 
Exception
-       {
-               checkFileSystemInitialized();
-
-               // make sure the directory for that specific checkpoint exists
-               final Path checkpointDir = 
createCheckpointDirPath(checkpointID);
-               filesystem.mkdirs(checkpointDir);
-
-               
-               Exception latestException = null;
-
-               for (int attempt = 0; attempt < 10; attempt++) {
-                       Path targetPath = new Path(checkpointDir, 
UUID.randomUUID().toString());
-                       FSDataOutputStream outStream;
-                       try {
-                               outStream = filesystem.create(targetPath, 
false);
-                       }
-                       catch (Exception e) {
-                               latestException = e;
-                               continue;
-                       }
-
-                       ObjectOutputStream os = new 
ObjectOutputStream(outStream);
-                       os.writeObject(state);
-                       os.close();
-                       return new FileSerializableStateHandle<S>(targetPath);
-               }
-               
-               throw new Exception("Could not open output stream for state 
backend", latestException);
-       }
-       
-       @Override
-       public FsCheckpointStateOutputStream 
createCheckpointStateOutputStream(long checkpointID, long timestamp) throws 
Exception {
-               checkFileSystemInitialized();
-               
-               final Path checkpointDir = 
createCheckpointDirPath(checkpointID);
-               filesystem.mkdirs(checkpointDir);
-               
-               Exception latestException = null;
-               
-               for (int attempt = 0; attempt < 10; attempt++) {
-                       Path targetPath = new Path(checkpointDir, 
UUID.randomUUID().toString());
-                       try {
-                               FSDataOutputStream outStream = 
filesystem.create(targetPath, false);
-                               return new 
FsCheckpointStateOutputStream(outStream, targetPath, filesystem);
-                       }
-                       catch (Exception e) {
-                               latestException = e;
-                       }
-               }
-               throw new Exception("Could not open output stream for state 
backend", latestException);
-       }
-       
-       // 
------------------------------------------------------------------------
-       //  utilities
-       // 
------------------------------------------------------------------------
-
-       private void checkFileSystemInitialized() throws IllegalStateException {
-               if (filesystem == null || checkpointDirectory == null) {
-                       throw new IllegalStateException("filesystem has not 
been re-initialized after deserialization");
-               }
-       }
-       
-       private Path createCheckpointDirPath(long checkpointID) {
-               return new Path(checkpointDirectory, "chk-" + checkpointID);
-       }
-       
-       @Override
-       public String toString() {
-               return checkpointDirectory == null ?
-                       "File State Backend @ " + basePath : 
-                       "File State Backend (initialized) @ " + 
checkpointDirectory;
-       }
-       
-       // 
------------------------------------------------------------------------
-       //  Output stream for state checkpointing
-       // 
------------------------------------------------------------------------
-
-       /**
-        * A CheckpointStateOutputStream that writes into a file and returns 
the path to that file upon
-        * closing.
-        */
-       public static final class FsCheckpointStateOutputStream extends 
CheckpointStateOutputStream {
-
-               private final FSDataOutputStream outStream;
-               
-               private final Path filePath;
-               
-               private final FileSystem fs;
-               
-               private boolean closed;
-
-               FsCheckpointStateOutputStream(FSDataOutputStream outStream, 
Path filePath, FileSystem fs) {
-                       this.outStream = outStream;
-                       this.filePath = filePath;
-                       this.fs = fs;
-               }
-
-
-               @Override
-               public void write(int b) throws IOException {
-                       outStream.write(b);
-               }
-
-               @Override
-               public void write(byte[] b, int off, int len) throws 
IOException {
-                       outStream.write(b, off, len);
-               }
-
-               @Override
-               public void flush() throws IOException {
-                       outStream.flush();
-               }
-
-               /**
-                * If the stream is only closed, we remove the produced file 
(cleanup through the auto close
-                * feature, for example). This method throws no exception if 
the deletion fails, but only
-                * logs the error.
-                */
-               @Override
-               public void close() {
-                       synchronized (this) {
-                               if (!closed) {
-                                       closed = true;
-                                       try {
-                                               outStream.close();
-                                               fs.delete(filePath, false);
-                                               
-                                               // attempt to delete the parent 
(will fail and be ignored if the parent has more files)
-                                               try {
-                                                       
fs.delete(filePath.getParent(), false);
-                                               } catch (IOException ignored) {}
-                                       }
-                                       catch (Exception e) {
-                                               LOG.warn("Cannot delete closed 
and discarded state stream to " + filePath, e);
-                                       }
-                               }
-                       }
-               }
-
-               @Override
-               public FileStreamStateHandle closeAndGetHandle() throws 
IOException {
-                       return new FileStreamStateHandle(closeAndGetPath());
-               }
-
-               /**
-                * Closes the stream and returns the path to the file that 
contains the stream's data.
-                * @return The path to the file that contains the stream's data.
-                * @throws IOException Thrown if the stream cannot be 
successfully closed.
-                */
-               public Path closeAndGetPath() throws IOException {
-                       synchronized (this) {
-                               if (!closed) {
-                                       closed = true;
-                                       outStream.close();
-                                       return filePath;
-                               }
-                               else {
-                                       throw new IOException("Stream has 
already been closed and discarded.");
-                               }
-                       }
-               }
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/a6890b28/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/filesystem/FsStateBackendFactory.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/filesystem/FsStateBackendFactory.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/filesystem/FsStateBackendFactory.java
deleted file mode 100644
index f0ad6bd..0000000
--- 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/filesystem/FsStateBackendFactory.java
+++ /dev/null
@@ -1,56 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.state.filesystem;
-
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.IllegalConfigurationException;
-import org.apache.flink.core.fs.Path;
-import org.apache.flink.streaming.api.state.StateBackendFactory;
-
-/**
- * A factory that creates an {@link 
org.apache.flink.streaming.api.state.filesystem.FsStateBackend}
- * from a configuration.
- */
-public class FsStateBackendFactory implements 
StateBackendFactory<FsStateBackend> {
-       
-       /** The key under which the config stores the directory where 
checkpoints should be stored */
-       public static final String CHECKPOINT_DIRECTORY_URI_CONF_KEY = 
"state.backend.fs.checkpointdir";
-       
-       
-       @Override
-       public FsStateBackend createFromConfig(Configuration config) throws 
Exception {
-               String checkpointDirURI = 
config.getString(CHECKPOINT_DIRECTORY_URI_CONF_KEY, null);
-
-               if (checkpointDirURI == null) {
-                       throw new IllegalConfigurationException(
-                                       "Cannot create the file system state 
backend: The configuration does not specify the " +
-                                                       "checkpoint directory 
'" + CHECKPOINT_DIRECTORY_URI_CONF_KEY + '\'');
-               }
-               
-               try {
-                       Path path = new Path(checkpointDirURI);
-                       return new FsStateBackend(path);
-               }
-               catch (IllegalArgumentException e) {
-                       throw new Exception("Cannot initialize File System 
State Backend with URI '"
-                                       + checkpointDirURI + '.', e);
-               }
-               
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/a6890b28/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/memory/ByteStreamStateHandle.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/memory/ByteStreamStateHandle.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/memory/ByteStreamStateHandle.java
deleted file mode 100644
index 7952e58..0000000
--- 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/memory/ByteStreamStateHandle.java
+++ /dev/null
@@ -1,52 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.state.memory;
-
-import org.apache.flink.streaming.api.state.StreamStateHandle;
-
-import java.io.ByteArrayInputStream;
-import java.io.InputStream;
-
-/**
- * A state handle that contains stream state in a byte array.
- */
-public final class ByteStreamStateHandle implements StreamStateHandle {
-
-       private static final long serialVersionUID = -5280226231200217594L;
-       
-       /** the state data */
-       private final byte[] data;
-
-       /**
-        * Creates a new ByteStreamStateHandle containing the given data.
-        * 
-        * @param data The state data.
-        */
-       public ByteStreamStateHandle(byte[] data) {
-               this.data = data;
-       }
-
-       @Override
-       public InputStream getState(ClassLoader userCodeClassLoader) {
-               return new ByteArrayInputStream(data);
-       }
-
-       @Override
-       public void discardState() {}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/a6890b28/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/memory/MemHeapKvState.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/memory/MemHeapKvState.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/memory/MemHeapKvState.java
deleted file mode 100644
index e611887..0000000
--- 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/memory/MemHeapKvState.java
+++ /dev/null
@@ -1,52 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.state.memory;
-
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.runtime.util.DataOutputSerializer;
-import org.apache.flink.streaming.api.state.AbstractHeapKvState;
-
-import java.util.HashMap;
-
-/**
- * Heap-backed key/value state that is snapshotted into a serialized memory 
copy.
- *
- * @param <K> The type of the key.
- * @param <V> The type of the value.
- */
-public class MemHeapKvState<K, V> extends AbstractHeapKvState<K, V, 
MemoryStateBackend> {
-       
-       public MemHeapKvState(TypeSerializer<K> keySerializer, 
TypeSerializer<V> valueSerializer, V defaultValue) {
-               super(keySerializer, valueSerializer, defaultValue);
-       }
-
-       public MemHeapKvState(TypeSerializer<K> keySerializer, 
TypeSerializer<V> valueSerializer,
-                                                       V defaultValue, 
HashMap<K, V> state) {
-               super(keySerializer, valueSerializer, defaultValue, state);
-       }
-       
-       @Override
-       public MemoryHeapKvStateSnapshot<K, V> shapshot(long checkpointId, long 
timestamp) throws Exception {
-               DataOutputSerializer ser = new 
DataOutputSerializer(Math.max(size() * 16, 16));
-               writeStateToOutputView(ser);
-               byte[] bytes = ser.getCopyOfBuffer();
-               
-               return new MemoryHeapKvStateSnapshot<K, V>(getKeySerializer(), 
getValueSerializer(), bytes, size());
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/a6890b28/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/memory/MemoryHeapKvStateSnapshot.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/memory/MemoryHeapKvStateSnapshot.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/memory/MemoryHeapKvStateSnapshot.java
deleted file mode 100644
index 7f50379..0000000
--- 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/memory/MemoryHeapKvStateSnapshot.java
+++ /dev/null
@@ -1,102 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.state.memory;
-
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.runtime.util.DataInputDeserializer;
-import org.apache.flink.streaming.api.state.KvStateSnapshot;
-
-import java.util.HashMap;
-
-/**
- * A snapshot of a {@link MemHeapKvState} for a checkpoint. The data is stored 
in a heap byte
- * array, in serialized form.
- * 
- * @param <K> The type of the key in the snapshot state.
- * @param <V> The type of the value in the snapshot state.
- */
-public class MemoryHeapKvStateSnapshot<K, V> implements KvStateSnapshot<K, V, 
MemoryStateBackend> {
-       
-       private static final long serialVersionUID = 1L;
-       
-       /** Name of the key serializer class */
-       private final String keySerializerClassName;
-
-       /** Name of the value serializer class */
-       private final String valueSerializerClassName;
-       
-       /** The serialized data of the state key/value pairs */
-       private final byte[] data;
-       
-       /** The number of key/value pairs */
-       private final int numEntries;
-
-       /**
-        * Creates a new heap memory state snapshot.
-        *
-        * @param keySerializer The serializer for the keys.
-        * @param valueSerializer The serializer for the values.
-        * @param data The serialized data of the state key/value pairs
-        * @param numEntries The number of key/value pairs
-        */
-       public MemoryHeapKvStateSnapshot(TypeSerializer<K> keySerializer,
-                                               TypeSerializer<V> 
valueSerializer, byte[] data, int numEntries) {
-               this.keySerializerClassName = 
keySerializer.getClass().getName();
-               this.valueSerializerClassName = 
valueSerializer.getClass().getName();
-               this.data = data;
-               this.numEntries = numEntries;
-       }
-
-
-       @Override
-       public MemHeapKvState<K, V> restoreState(
-                       MemoryStateBackend stateBackend,
-                       final TypeSerializer<K> keySerializer,
-                       final TypeSerializer<V> valueSerializer,
-                       V defaultValue,
-                       ClassLoader classLoader) throws Exception {
-
-               // validity checks
-               if 
(!keySerializer.getClass().getName().equals(keySerializerClassName) ||
-                       
!valueSerializer.getClass().getName().equals(valueSerializerClassName)) {
-                               throw new IllegalArgumentException(
-                                               "Cannot restore the state from 
the snapshot with the given serializers. " +
-                                               "State (K/V) was serialized 
with (" + valueSerializerClassName + 
-                                               "/" + keySerializerClassName + 
")");
-               }
-               
-               // restore state
-               HashMap<K, V> stateMap = new HashMap<>(numEntries);
-               DataInputDeserializer in = new DataInputDeserializer(data, 0, 
data.length);
-               
-               for (int i = 0; i < numEntries; i++) {
-                       K key = keySerializer.deserialize(in);
-                       V value = valueSerializer.deserialize(in);
-                       stateMap.put(key, value);
-               }
-               
-               return new MemHeapKvState<K, V>(keySerializer, valueSerializer, 
defaultValue, stateMap);
-       }
-
-       /**
-        * Discarding the heap state is a no-op.
-        */
-       @Override
-       public void discardState() {}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/a6890b28/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/memory/MemoryStateBackend.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/memory/MemoryStateBackend.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/memory/MemoryStateBackend.java
deleted file mode 100644
index 05368bd..0000000
--- 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/memory/MemoryStateBackend.java
+++ /dev/null
@@ -1,209 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.state.memory;
-
-import org.apache.flink.api.common.JobID;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.runtime.state.StateHandle;
-import org.apache.flink.streaming.api.state.StateBackend;
-import org.apache.flink.streaming.api.state.StreamStateHandle;
-
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-import java.io.Serializable;
-
-/**
- * A {@link StateBackend} that stores all its data and checkpoints in memory 
and has no
- * capabilities to spill to disk. Checkpoints are serialized and the 
serialized data is
- * transferred 
- */
-public class MemoryStateBackend extends StateBackend<MemoryStateBackend> {
-
-       private static final long serialVersionUID = 4109305377809414635L;
-       
-       /** The default maximal size that the snapshotted memory state may have 
(5 MiBytes) */
-       private static final int DEFAULT_MAX_STATE_SIZE = 5 * 1024 * 1024;
-       
-       /** The maximal size that the snapshotted memory state may have */
-       private final int maxStateSize;
-
-       /**
-        * Creates a new memory state backend that accepts states whose 
serialized forms are
-        * up to the default state size (5 MB).
-        */
-       public MemoryStateBackend() {
-               this(DEFAULT_MAX_STATE_SIZE);
-       }
-
-       /**
-        * Creates a new memory state backend that accepts states whose 
serialized forms are
-        * up to the given number of bytes.
-        * 
-        * @param maxStateSize The maximal size of the serialized state
-        */
-       public MemoryStateBackend(int maxStateSize) {
-               this.maxStateSize = maxStateSize;
-       }
-
-       // 
------------------------------------------------------------------------
-       //  initialization and cleanup
-       // 
------------------------------------------------------------------------
-
-       @Override
-       public void initializeForJob(JobID job) {
-               // nothing to do here
-       }
-
-       @Override
-       public void disposeAllStateForCurrentJob() {
-               // nothing to do here, GC will do it
-       }
-
-       @Override
-       public void close() throws Exception {}
-
-       // 
------------------------------------------------------------------------
-       //  State backend operations
-       // 
------------------------------------------------------------------------
-       
-       @Override
-       public <K, V> MemHeapKvState<K, V> createKvState(
-                       TypeSerializer<K> keySerializer, TypeSerializer<V> 
valueSerializer, V defaultValue) {
-               return new MemHeapKvState<K, V>(keySerializer, valueSerializer, 
defaultValue);
-       }
-       
-       /**
-        * Serialized the given state into bytes using Java serialization and 
creates a state handle that
-        * can re-create that state.
-        * 
-        * @param state The state to checkpoint.
-        * @param checkpointID The ID of the checkpoint.
-        * @param timestamp The timestamp of the checkpoint.
-        * @param <S> The type of the state.
-        * 
-        * @return A state handle that contains the given state serialized as 
bytes.
-        * @throws Exception Thrown, if the serialization fails.
-        */
-       @Override
-       public <S extends Serializable> StateHandle<S> 
checkpointStateSerializable(
-                       S state, long checkpointID, long timestamp) throws 
Exception
-       {
-               SerializedStateHandle<S> handle = new 
SerializedStateHandle<>(state);
-               checkSize(handle.getSizeOfSerializedState(), maxStateSize);
-               return new SerializedStateHandle<S>(state);
-       }
-
-       @Override
-       public CheckpointStateOutputStream createCheckpointStateOutputStream(
-                       long checkpointID, long timestamp) throws Exception
-       {
-               return new MemoryCheckpointOutputStream(maxStateSize);
-       }
-
-       // 
------------------------------------------------------------------------
-       //  Utilities
-       // 
------------------------------------------------------------------------
-       
-       @Override
-       public String toString() {
-               return "MemoryStateBackend (data in heap memory / checkpoints 
to JobManager)";
-       }
-
-       static void checkSize(int size, int maxSize) throws IOException {
-               if (size > maxSize) {
-                       throw new IOException(
-                                       "Size of the state is larger than the 
maximum permitted memory-backed state. Size="
-                                                       + size + " , maxSize=" 
+ maxSize
-                                                       + " . Consider using a 
different state backend, like the File System State backend.");
-               }
-       }
-       
-       // 
------------------------------------------------------------------------
-
-       /**
-        * A CheckpointStateOutputStream that writes into a byte array.
-        */
-       public static final class MemoryCheckpointOutputStream extends 
CheckpointStateOutputStream {
-               
-               private final ByteArrayOutputStream os = new 
ByteArrayOutputStream();
-               
-               private final int maxSize;
-               
-               private boolean closed;
-
-               public MemoryCheckpointOutputStream(int maxSize) {
-                       this.maxSize = maxSize;
-               }
-
-               @Override
-               public void write(int b) {
-                       os.write(b);
-               }
-
-               @Override
-               public void write(byte[] b, int off, int len) {
-                       os.write(b, off, len);
-               }
-
-               // 
--------------------------------------------------------------------
-
-               @Override
-               public void close() {
-                       closed = true;
-                       os.reset();
-               }
-
-               @Override
-               public StreamStateHandle closeAndGetHandle() throws IOException 
{
-                       return new ByteStreamStateHandle(closeAndGetBytes());
-               }
-
-               /**
-                * Closes the stream and returns the byte array containing the 
stream's data.
-                * @return The byte array containing the stream's data.
-                * @throws IOException Thrown if the size of the data exceeds 
the maximal 
-                */
-               public byte[] closeAndGetBytes() throws IOException {
-                       if (!closed) {
-                               checkSize(os.size(), maxSize);
-                               byte[] bytes = os.toByteArray();
-                               close();
-                               return bytes;
-                       }
-                       else {
-                               throw new IllegalStateException("stream has 
already been closed");
-                       }
-               }
-       }
-       
-       // 
------------------------------------------------------------------------
-       //  Static default instance
-       // 
------------------------------------------------------------------------
-       
-       /** The default instance of this state backend, using the default 
maximal state size */
-       private static final MemoryStateBackend DEFAULT_INSTANCE = new 
MemoryStateBackend();
-
-       /**
-        * Gets the default instance of this state backend, using the default 
maximal state size.
-        * @return The default instance of this state backend.
-        */
-       public static MemoryStateBackend defaultInstance() {
-               return DEFAULT_INSTANCE;
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/a6890b28/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/memory/SerializedStateHandle.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/memory/SerializedStateHandle.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/memory/SerializedStateHandle.java
deleted file mode 100644
index 163cadd..0000000
--- 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/memory/SerializedStateHandle.java
+++ /dev/null
@@ -1,49 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.state.memory;
-
-import org.apache.flink.runtime.state.StateHandle;
-import org.apache.flink.util.SerializedValue;
-
-import java.io.IOException;
-
-/**
- * A state handle that represents its state in serialized form as bytes.
- *
- * @param <T> The type of state represented by this state handle.
- */
-public class SerializedStateHandle<T> extends SerializedValue<T> implements 
StateHandle<T> {
-       
-       private static final long serialVersionUID = 4145685722538475769L;
-
-       public SerializedStateHandle(T value) throws IOException {
-               super(value);
-       }
-       
-       @Override
-       public T getState(ClassLoader classLoader) throws Exception {
-               return deserializeValue(classLoader);
-       }
-
-       /**
-        * Discarding heap-memory backed state is a no-op, so this method does 
nothing.
-        */
-       @Override
-       public void discardState() {}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/a6890b28/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AbstractAlignedProcessingTimeWindowOperator.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AbstractAlignedProcessingTimeWindowOperator.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AbstractAlignedProcessingTimeWindowOperator.java
index cf8575e..9964760 100644
--- 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AbstractAlignedProcessingTimeWindowOperator.java
+++ 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AbstractAlignedProcessingTimeWindowOperator.java
@@ -29,7 +29,7 @@ import org.apache.flink.runtime.util.MathUtils;
 import org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator;
 import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
 import org.apache.flink.streaming.api.operators.TimestampedCollector;
-import org.apache.flink.streaming.api.state.StateBackend;
+import org.apache.flink.runtime.state.StateBackend;
 import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
 import org.apache.flink.streaming.runtime.operators.Triggerable;

http://git-wip-us.apache.org/repos/asf/flink/blob/a6890b28/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
index 72a8c25..8c58e29 100644
--- 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
+++ 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
@@ -30,17 +30,16 @@ import 
org.apache.flink.runtime.accumulators.AccumulatorRegistry;
 import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
 import org.apache.flink.runtime.jobgraph.tasks.StatefulTask;
-import org.apache.flink.runtime.state.StateBackend;
 import org.apache.flink.runtime.taskmanager.DispatcherThreadFactory;
 import org.apache.flink.runtime.util.event.EventListener;
 import org.apache.flink.streaming.api.graph.StreamConfig;
 import org.apache.flink.streaming.api.operators.Output;
 import org.apache.flink.streaming.api.operators.StreamOperator;
-import org.apache.flink.streaming.api.state.StateBackend;
-import org.apache.flink.streaming.api.state.StateBackendFactory;
-import org.apache.flink.streaming.api.state.filesystem.FsStateBackend;
-import org.apache.flink.streaming.api.state.filesystem.FsStateBackendFactory;
-import org.apache.flink.streaming.api.state.memory.MemoryStateBackend;
+import org.apache.flink.runtime.state.StateBackend;
+import org.apache.flink.runtime.state.StateBackendFactory;
+import org.apache.flink.runtime.state.filesystem.FsStateBackend;
+import org.apache.flink.runtime.state.filesystem.FsStateBackendFactory;
+import org.apache.flink.runtime.state.memory.MemoryStateBackend;
 import org.apache.flink.streaming.runtime.io.RecordWriterOutput;
 import org.apache.flink.streaming.runtime.operators.Triggerable;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
@@ -493,55 +492,52 @@ public abstract class StreamTask<OUT, Operator extends 
StreamOperator<OUT>>
        
        private StateBackend<?> createStateBackend() throws Exception {
                StateBackend<?> configuredBackend = 
configuration.getStateBackend(userClassLoader);
-               
+
                if (configuredBackend != null) {
                        // backend has been configured on the environment
                        LOG.info("Using user-defined state backend: " + 
configuredBackend);
                        return configuredBackend;
-               }
-               else {
+               } else {
                        // see if we have a backend specified in the 
configuration
                        Configuration flinkConfig = 
getEnvironment().getTaskManagerInfo().getConfiguration();
                        String backendName = 
flinkConfig.getString(ConfigConstants.STATE_BACKEND, null);
-                       
+
                        if (backendName == null) {
                                LOG.warn("No state backend has been specified, 
using default state backend (Memory / JobManager)");
                                backendName = "jobmanager";
                        }
-                       
+
                        backendName = backendName.toLowerCase();
                        switch (backendName) {
                                case "jobmanager":
                                        LOG.info("State backend is set to heap 
memory (checkpoint to jobmanager)");
                                        return 
MemoryStateBackend.defaultInstance();
-                               
+
                                case "filesystem":
                                        FsStateBackend backend = new 
FsStateBackendFactory().createFromConfig(flinkConfig);
                                        LOG.info("State backend is set to heap 
memory (checkpoints to filesystem \""
-                                                       + backend.getBasePath() 
+ "\")");
+                                               + backend.getBasePath() + 
"\")");
                                        return backend;
-                               
+
                                default:
                                        try {
                                                @SuppressWarnings("rawtypes")
                                                Class<? extends 
StateBackendFactory> clazz =
-                                                               
Class.forName(backendName, false, 
userClassLoader).asSubclass(StateBackendFactory.class);
+                                                       
Class.forName(backendName, false, 
userClassLoader).asSubclass(StateBackendFactory.class);
 
                                                return (StateBackend<?>) 
clazz.newInstance();
-                                       }
-                                       catch (ClassNotFoundException e) {
+                                       } catch (ClassNotFoundException e) {
                                                throw new 
IllegalConfigurationException("Cannot find configured state backend: " + 
backendName);
-                                       }
-                                       catch (ClassCastException e) {
+                                       } catch (ClassCastException e) {
                                                throw new 
IllegalConfigurationException("The class configured under '" +
-                                                               
ConfigConstants.STATE_BACKEND + "' is not a valid state backend factory (" +
-                                                               backendName + 
')');
-                                       }
-                                       catch (Throwable t) {
+                                                       
ConfigConstants.STATE_BACKEND + "' is not a valid state backend factory (" +
+                                                       backendName + ')');
+                                       } catch (Throwable t) {
                                                throw new 
IllegalConfigurationException("Cannot create configured state backend", t);
                                        }
                        }
                }
+       }
 
        /**
         * Registers a timer.

http://git-wip-us.apache.org/repos/asf/flink/blob/a6890b28/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTaskState.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTaskState.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTaskState.java
index 334fd44..afeabd9 100644
--- 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTaskState.java
+++ 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTaskState.java
@@ -19,7 +19,7 @@
 package org.apache.flink.streaming.runtime.tasks;
 
 import org.apache.flink.runtime.state.StateHandle;
-import org.apache.flink.streaming.api.state.KvStateSnapshot;
+import org.apache.flink.runtime.state.KvStateSnapshot;
 
 import java.io.Serializable;
 import java.util.ConcurrentModificationException;

http://git-wip-us.apache.org/repos/asf/flink/blob/a6890b28/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/state/FileStateBackendTest.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/state/FileStateBackendTest.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/state/FileStateBackendTest.java
deleted file mode 100644
index 73100d1..0000000
--- 
a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/state/FileStateBackendTest.java
+++ /dev/null
@@ -1,419 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.state;
-
-import org.apache.commons.io.FileUtils;
-import org.apache.flink.api.common.JobID;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.common.typeutils.base.FloatSerializer;
-import org.apache.flink.api.common.typeutils.base.IntSerializer;
-import org.apache.flink.api.common.typeutils.base.StringSerializer;
-import org.apache.flink.api.java.typeutils.runtime.ValueSerializer;
-import org.apache.flink.configuration.ConfigConstants;
-import org.apache.flink.core.fs.Path;
-import org.apache.flink.core.testutils.CommonTestUtils;
-import org.apache.flink.runtime.state.StateHandle;
-import org.apache.flink.streaming.api.state.filesystem.FileStreamStateHandle;
-import org.apache.flink.streaming.api.state.filesystem.FsStateBackend;
-import org.apache.flink.types.StringValue;
-import org.apache.flink.util.OperatingSystem;
-
-import org.junit.Test;
-
-import java.io.File;
-import java.io.IOException;
-import java.io.InputStream;
-import java.net.URI;
-import java.util.Random;
-import java.util.UUID;
-
-import static org.junit.Assert.*;
-
-public class FileStateBackendTest {
-       
-       @Test
-       public void testSetupAndSerialization() {
-               File tempDir = new 
File(ConfigConstants.DEFAULT_TASK_MANAGER_TMP_PATH, 
UUID.randomUUID().toString());
-               try {
-                       final String backendDir = localFileUri(tempDir);
-                       FsStateBackend originalBackend = new 
FsStateBackend(backendDir);
-                       
-                       assertFalse(originalBackend.isInitialized());
-                       assertEquals(new URI(backendDir), 
originalBackend.getBasePath().toUri());
-                       assertNull(originalBackend.getCheckpointDirectory());
-                       
-                       // serialize / copy the backend
-                       FsStateBackend backend = 
CommonTestUtils.createCopySerializable(originalBackend);
-                       assertFalse(backend.isInitialized());
-                       assertEquals(new URI(backendDir), 
backend.getBasePath().toUri());
-                       assertNull(backend.getCheckpointDirectory());
-                       
-                       // no file operations should be possible right now
-                       try {
-                               backend.checkpointStateSerializable("exception 
train rolling in", 2L, System.currentTimeMillis());
-                               fail("should fail with an exception");
-                       } catch (IllegalStateException e) {
-                               // supreme!
-                       }
-                       
-                       backend.initializeForJob(new JobID());
-                       assertNotNull(backend.getCheckpointDirectory());
-                       
-                       File checkpointDir = new 
File(backend.getCheckpointDirectory().toUri().getPath());
-                       assertTrue(checkpointDir.exists());
-                       assertTrue(isDirectoryEmpty(checkpointDir));
-                       
-                       backend.disposeAllStateForCurrentJob();
-                       assertNull(backend.getCheckpointDirectory());
-                       
-                       assertTrue(isDirectoryEmpty(tempDir));
-               }
-               catch (Exception e) {
-                       e.printStackTrace();
-                       fail(e.getMessage());
-               }
-               finally {
-                       deleteDirectorySilently(tempDir);
-               }
-       }
-       
-       @Test
-       public void testSerializableState() {
-               File tempDir = new 
File(ConfigConstants.DEFAULT_TASK_MANAGER_TMP_PATH, 
UUID.randomUUID().toString());
-               try {
-                       FsStateBackend backend = 
CommonTestUtils.createCopySerializable(new 
FsStateBackend(localFileUri(tempDir)));
-                       backend.initializeForJob(new JobID());
-
-                       File checkpointDir = new 
File(backend.getCheckpointDirectory().toUri().getPath());
-
-                       String state1 = "dummy state";
-                       String state2 = "row row row your boat";
-                       Integer state3 = 42;
-                       
-                       StateHandle<String> handle1 = 
backend.checkpointStateSerializable(state1, 439568923746L, 
System.currentTimeMillis());
-                       StateHandle<String> handle2 = 
backend.checkpointStateSerializable(state2, 439568923746L, 
System.currentTimeMillis());
-                       StateHandle<Integer> handle3 = 
backend.checkpointStateSerializable(state3, 439568923746L, 
System.currentTimeMillis());
-
-                       assertFalse(isDirectoryEmpty(checkpointDir));
-                       assertEquals(state1, 
handle1.getState(getClass().getClassLoader()));
-                       handle1.discardState();
-                       
-                       assertFalse(isDirectoryEmpty(checkpointDir));
-                       assertEquals(state2, 
handle2.getState(getClass().getClassLoader()));
-                       handle2.discardState();
-                       
-                       assertFalse(isDirectoryEmpty(checkpointDir));
-                       assertEquals(state3, 
handle3.getState(getClass().getClassLoader()));
-                       handle3.discardState();
-                       
-                       assertTrue(isDirectoryEmpty(checkpointDir));
-               }
-               catch (Exception e) {
-                       e.printStackTrace();
-                       fail(e.getMessage());
-               }
-               finally {
-                       deleteDirectorySilently(tempDir);
-               }
-       }
-
-       @Test
-       public void testStateOutputStream() {
-               File tempDir = new 
File(ConfigConstants.DEFAULT_TASK_MANAGER_TMP_PATH, 
UUID.randomUUID().toString());
-               try {
-                       FsStateBackend backend = 
CommonTestUtils.createCopySerializable(new 
FsStateBackend(localFileUri(tempDir)));
-                       backend.initializeForJob(new JobID());
-
-                       File checkpointDir = new 
File(backend.getCheckpointDirectory().toUri().getPath());
-
-                       byte[] state1 = new byte[1274673];
-                       byte[] state2 = new byte[1];
-                       byte[] state3 = new byte[0];
-                       byte[] state4 = new byte[177];
-                       
-                       Random rnd = new Random();
-                       rnd.nextBytes(state1);
-                       rnd.nextBytes(state2);
-                       rnd.nextBytes(state3);
-                       rnd.nextBytes(state4);
-
-                       long checkpointId = 97231523452L;
-
-                       FsStateBackend.FsCheckpointStateOutputStream stream1 = 
-                                       
backend.createCheckpointStateOutputStream(checkpointId, 
System.currentTimeMillis());
-                       FsStateBackend.FsCheckpointStateOutputStream stream2 =
-                                       
backend.createCheckpointStateOutputStream(checkpointId, 
System.currentTimeMillis());
-                       FsStateBackend.FsCheckpointStateOutputStream stream3 =
-                                       
backend.createCheckpointStateOutputStream(checkpointId, 
System.currentTimeMillis());
-                       
-                       stream1.write(state1);
-                       stream2.write(state2);
-                       stream3.write(state3);
-                       
-                       FileStreamStateHandle handle1 = 
stream1.closeAndGetHandle();
-                       FileStreamStateHandle handle2 = 
stream2.closeAndGetHandle();
-                       FileStreamStateHandle handle3 = 
stream3.closeAndGetHandle();
-                       
-                       // use with try-with-resources
-                       StreamStateHandle handle4;
-                       try (StateBackend.CheckpointStateOutputStream stream4 =
-                                       
backend.createCheckpointStateOutputStream(checkpointId, 
System.currentTimeMillis())) {
-                               stream4.write(state4);
-                               handle4 = stream4.closeAndGetHandle();
-                       }
-                       
-                       // close before accessing handle
-                       StateBackend.CheckpointStateOutputStream stream5 =
-                                       
backend.createCheckpointStateOutputStream(checkpointId, 
System.currentTimeMillis());
-                       stream5.write(state4);
-                       stream5.close();
-                       try {
-                               stream5.closeAndGetHandle();
-                               fail();
-                       } catch (IOException e) {
-                               // uh-huh
-                       }
-                       
-                       
validateBytesInStream(handle1.getState(getClass().getClassLoader()), state1);
-                       handle1.discardState();
-                       assertFalse(isDirectoryEmpty(checkpointDir));
-                       ensureLocalFileDeleted(handle1.getFilePath());
-                       
-                       
validateBytesInStream(handle2.getState(getClass().getClassLoader()), state2);
-                       handle2.discardState();
-                       assertFalse(isDirectoryEmpty(checkpointDir));
-                       ensureLocalFileDeleted(handle2.getFilePath());
-                       
-                       
validateBytesInStream(handle3.getState(getClass().getClassLoader()), state3);
-                       handle3.discardState();
-                       assertFalse(isDirectoryEmpty(checkpointDir));
-                       ensureLocalFileDeleted(handle3.getFilePath());
-                       
-                       
validateBytesInStream(handle4.getState(getClass().getClassLoader()), state4);
-                       handle4.discardState();
-                       assertTrue(isDirectoryEmpty(checkpointDir));
-               }
-               catch (Exception e) {
-                       e.printStackTrace();
-                       fail(e.getMessage());
-               }
-               finally {
-                       deleteDirectorySilently(tempDir);
-               }
-       }
-
-       @Test
-       public void testKeyValueState() {
-               File tempDir = new 
File(ConfigConstants.DEFAULT_TASK_MANAGER_TMP_PATH, 
UUID.randomUUID().toString());
-               try {
-                       FsStateBackend backend = 
CommonTestUtils.createCopySerializable(new 
FsStateBackend(localFileUri(tempDir)));
-                       backend.initializeForJob(new JobID());
-
-                       File checkpointDir = new 
File(backend.getCheckpointDirectory().toUri().getPath());
-
-                       KvState<Integer, String, FsStateBackend> kv =
-                                       
backend.createKvState(IntSerializer.INSTANCE, StringSerializer.INSTANCE, null);
-
-                       assertEquals(0, kv.size());
-
-                       // some modifications to the state
-                       kv.setCurrentKey(1);
-                       assertNull(kv.value());
-                       kv.update("1");
-                       assertEquals(1, kv.size());
-                       kv.setCurrentKey(2);
-                       assertNull(kv.value());
-                       kv.update("2");
-                       assertEquals(2, kv.size());
-                       kv.setCurrentKey(1);
-                       assertEquals("1", kv.value());
-                       assertEquals(2, kv.size());
-
-                       // draw a snapshot
-                       KvStateSnapshot<Integer, String, FsStateBackend> 
snapshot1 =
-                                       kv.shapshot(682375462378L, 
System.currentTimeMillis());
-
-                       // make some more modifications
-                       kv.setCurrentKey(1);
-                       kv.update("u1");
-                       kv.setCurrentKey(2);
-                       kv.update("u2");
-                       kv.setCurrentKey(3);
-                       kv.update("u3");
-
-                       // draw another snapshot
-                       KvStateSnapshot<Integer, String, FsStateBackend> 
snapshot2 =
-                                       kv.shapshot(682375462379L, 
System.currentTimeMillis());
-
-                       // validate the original state
-                       assertEquals(3, kv.size());
-                       kv.setCurrentKey(1);
-                       assertEquals("u1", kv.value());
-                       kv.setCurrentKey(2);
-                       assertEquals("u2", kv.value());
-                       kv.setCurrentKey(3);
-                       assertEquals("u3", kv.value());
-
-                       // restore the first snapshot and validate it
-                       KvState<Integer, String, FsStateBackend> restored1 = 
snapshot1.restoreState(backend,
-                                       IntSerializer.INSTANCE, 
StringSerializer.INSTANCE, null, getClass().getClassLoader());
-
-                       assertEquals(2, restored1.size());
-                       restored1.setCurrentKey(1);
-                       assertEquals("1", restored1.value());
-                       restored1.setCurrentKey(2);
-                       assertEquals("2", restored1.value());
-
-                       // restore the first snapshot and validate it
-                       KvState<Integer, String, FsStateBackend> restored2 = 
snapshot2.restoreState(backend,
-                                       IntSerializer.INSTANCE, 
StringSerializer.INSTANCE, null, getClass().getClassLoader());
-
-                       assertEquals(3, restored2.size());
-                       restored2.setCurrentKey(1);
-                       assertEquals("u1", restored2.value());
-                       restored2.setCurrentKey(2);
-                       assertEquals("u2", restored2.value());
-                       restored2.setCurrentKey(3);
-                       assertEquals("u3", restored2.value());
-
-                       snapshot1.discardState();
-                       assertFalse(isDirectoryEmpty(checkpointDir));
-
-                       snapshot2.discardState();
-                       assertTrue(isDirectoryEmpty(checkpointDir));
-               }
-               catch (Exception e) {
-                       e.printStackTrace();
-                       fail(e.getMessage());
-               }
-               finally {
-                       deleteDirectorySilently(tempDir);
-               }
-       }
-
-       @Test
-       public void testRestoreWithWrongSerializers() {
-               File tempDir = new 
File(ConfigConstants.DEFAULT_TASK_MANAGER_TMP_PATH, 
UUID.randomUUID().toString());
-               try {
-                       FsStateBackend backend = 
CommonTestUtils.createCopySerializable(new 
FsStateBackend(localFileUri(tempDir)));
-                       backend.initializeForJob(new JobID());
-
-                       File checkpointDir = new 
File(backend.getCheckpointDirectory().toUri().getPath());
-                       
-                       KvState<Integer, String, FsStateBackend> kv =
-                                       
backend.createKvState(IntSerializer.INSTANCE, StringSerializer.INSTANCE, null);
-
-                       kv.setCurrentKey(1);
-                       kv.update("1");
-                       kv.setCurrentKey(2);
-                       kv.update("2");
-
-                       KvStateSnapshot<Integer, String, FsStateBackend> 
snapshot =
-                                       kv.shapshot(682375462378L, 
System.currentTimeMillis());
-
-
-                       @SuppressWarnings("unchecked")
-                       TypeSerializer<Integer> fakeIntSerializer =
-                                       (TypeSerializer<Integer>) 
(TypeSerializer<?>) FloatSerializer.INSTANCE;
-
-                       @SuppressWarnings("unchecked")
-                       TypeSerializer<String> fakeStringSerializer =
-                                       (TypeSerializer<String>) 
(TypeSerializer<?>) new ValueSerializer<StringValue>(StringValue.class);
-
-                       try {
-                               snapshot.restoreState(backend, 
fakeIntSerializer,
-                                               StringSerializer.INSTANCE, 
null, getClass().getClassLoader());
-                               fail("should recognize wrong serializers");
-                       } catch (IllegalArgumentException e) {
-                               // expected
-                       } catch (Exception e) {
-                               fail("wrong exception");
-                       }
-
-                       try {
-                               snapshot.restoreState(backend, 
IntSerializer.INSTANCE,
-                                               fakeStringSerializer, null, 
getClass().getClassLoader());
-                               fail("should recognize wrong serializers");
-                       } catch (IllegalArgumentException e) {
-                               // expected
-                       } catch (Exception e) {
-                               fail("wrong exception");
-                       }
-
-                       try {
-                               snapshot.restoreState(backend, 
fakeIntSerializer,
-                                               fakeStringSerializer, null, 
getClass().getClassLoader());
-                               fail("should recognize wrong serializers");
-                       } catch (IllegalArgumentException e) {
-                               // expected
-                       } catch (Exception e) {
-                               fail("wrong exception");
-                       }
-                       
-                       snapshot.discardState();
-
-                       assertTrue(isDirectoryEmpty(checkpointDir));
-               }
-               catch (Exception e) {
-                       e.printStackTrace();
-                       fail(e.getMessage());
-               }
-               finally {
-                       deleteDirectorySilently(tempDir);
-               }
-       }
-       
-       // 
------------------------------------------------------------------------
-       //  Utilities
-       // 
------------------------------------------------------------------------
-       
-       private static void ensureLocalFileDeleted(Path path) {
-               URI uri = path.toUri();
-               if ("file".equals(uri.getScheme())) {
-                       File file = new File(uri.getPath());
-                       assertFalse("file not properly deleted", file.exists());
-               }
-               else {
-                       throw new IllegalArgumentException("not a local path");
-               }
-       }
-       
-       private static void deleteDirectorySilently(File dir) {
-               try {
-                       FileUtils.deleteDirectory(dir);
-               }
-               catch (IOException ignored) {}
-       }
-       
-       private static boolean isDirectoryEmpty(File directory) {
-               String[] nested = directory.list();
-               return  nested == null || nested.length == 0;
-       }
-       
-       private static String localFileUri(File path) {
-               return (OperatingSystem.isWindows() ? "file:/" : "file://") + 
path.getAbsolutePath();
-       }
-       
-       private static void validateBytesInStream(InputStream is, byte[] data) 
throws IOException {
-               byte[] holder = new byte[data.length];
-               assertEquals("not enough data", holder.length, is.read(holder));
-               assertEquals("too much data", -1, is.read());
-               assertArrayEquals("wrong data", data, holder);
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/a6890b28/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/state/MemoryStateBackendTest.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/state/MemoryStateBackendTest.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/state/MemoryStateBackendTest.java
deleted file mode 100644
index 3410d09..0000000
--- 
a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/state/MemoryStateBackendTest.java
+++ /dev/null
@@ -1,278 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.state;
-
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.common.typeutils.base.FloatSerializer;
-import org.apache.flink.api.common.typeutils.base.IntSerializer;
-import org.apache.flink.api.common.typeutils.base.StringSerializer;
-import org.apache.flink.api.java.typeutils.runtime.ValueSerializer;
-import org.apache.flink.runtime.state.StateHandle;
-import org.apache.flink.streaming.api.state.memory.MemoryStateBackend;
-import org.apache.flink.types.StringValue;
-import org.junit.Test;
-
-import java.io.IOException;
-import java.io.ObjectInputStream;
-import java.io.ObjectOutputStream;
-import java.util.HashMap;
-
-import static org.junit.Assert.*;
-
-/**
- * Tests for the {@link 
org.apache.flink.streaming.api.state.memory.MemoryStateBackend}.
- */
-public class MemoryStateBackendTest {
-       
-       @Test
-       public void testSerializableState() {
-               try {
-                       MemoryStateBackend backend = new MemoryStateBackend();
-
-                       HashMap<String, Integer> state = new HashMap<>();
-                       state.put("hey there", 2);
-                       state.put("the crazy brown fox stumbles over a sentence 
that does not contain every letter", 77);
-                       
-                       StateHandle<HashMap<String, Integer>> handle = 
backend.checkpointStateSerializable(state, 12, 459);
-                       assertNotNull(handle);
-                       
-                       HashMap<String, Integer> restored = 
handle.getState(getClass().getClassLoader());
-                       assertEquals(state, restored);
-               }
-               catch (Exception e) {
-                       e.printStackTrace();
-                       fail(e.getMessage());
-               }
-       }
-
-       @Test
-       public void testOversizedState() {
-               try {
-                       MemoryStateBackend backend = new MemoryStateBackend(10);
-
-                       HashMap<String, Integer> state = new HashMap<>();
-                       state.put("hey there", 2);
-                       state.put("the crazy brown fox stumbles over a sentence 
that does not contain every letter", 77);
-
-                       try {
-                               backend.checkpointStateSerializable(state, 12, 
459);
-                               fail("this should cause an exception");
-                       }
-                       catch (IOException e) {
-                               // now darling, isn't that exactly what we 
wanted?
-                       }
-               }
-               catch (Exception e) {
-                       e.printStackTrace();
-                       fail(e.getMessage());
-               }
-       }
-
-       @Test
-       public void testStateStream() {
-               try {
-                       MemoryStateBackend backend = new MemoryStateBackend();
-
-                       HashMap<String, Integer> state = new HashMap<>();
-                       state.put("hey there", 2);
-                       state.put("the crazy brown fox stumbles over a sentence 
that does not contain every letter", 77);
-
-                       StateBackend.CheckpointStateOutputStream os = 
backend.createCheckpointStateOutputStream(1, 2);
-                       ObjectOutputStream oos = new ObjectOutputStream(os);
-                       oos.writeObject(state);
-                       oos.flush();
-                       StreamStateHandle handle = os.closeAndGetHandle();
-                       
-                       assertNotNull(handle);
-
-                       ObjectInputStream ois = new 
ObjectInputStream(handle.getState(getClass().getClassLoader()));
-                       assertEquals(state, ois.readObject());
-                       assertTrue(ois.available() <= 0);
-                       ois.close();
-               }
-               catch (Exception e) {
-                       e.printStackTrace();
-                       fail(e.getMessage());
-               }
-       }
-
-       @Test
-       public void testOversizedStateStream() {
-               try {
-                       MemoryStateBackend backend = new MemoryStateBackend(10);
-
-                       HashMap<String, Integer> state = new HashMap<>();
-                       state.put("hey there", 2);
-                       state.put("the crazy brown fox stumbles over a sentence 
that does not contain every letter", 77);
-
-                       StateBackend.CheckpointStateOutputStream os = 
backend.createCheckpointStateOutputStream(1, 2);
-                       ObjectOutputStream oos = new ObjectOutputStream(os);
-                       
-                       try {
-                               oos.writeObject(state);
-                               oos.flush();
-                               os.closeAndGetHandle();
-                               fail("this should cause an exception");
-                       }
-                       catch (IOException e) {
-                               // oh boy! what an exception!
-                       }
-               }
-               catch (Exception e) {
-                       e.printStackTrace();
-                       fail(e.getMessage());
-               }
-       }
-       
-       @Test
-       public void testKeyValueState() {
-               try {
-                       MemoryStateBackend backend = new MemoryStateBackend();
-                       
-                       KvState<Integer, String, MemoryStateBackend> kv = 
-                                       
backend.createKvState(IntSerializer.INSTANCE, StringSerializer.INSTANCE, null);
-                       
-                       assertEquals(0, kv.size());
-                       
-                       // some modifications to the state
-                       kv.setCurrentKey(1);
-                       assertNull(kv.value());
-                       kv.update("1");
-                       assertEquals(1, kv.size());
-                       kv.setCurrentKey(2);
-                       assertNull(kv.value());
-                       kv.update("2");
-                       assertEquals(2, kv.size());
-                       kv.setCurrentKey(1);
-                       assertEquals("1", kv.value());
-                       assertEquals(2, kv.size());
-                       
-                       // draw a snapshot
-                       KvStateSnapshot<Integer, String, MemoryStateBackend> 
snapshot1 = 
-                                       kv.shapshot(682375462378L, 
System.currentTimeMillis());
-                       
-                       // make some more modifications
-                       kv.setCurrentKey(1);
-                       kv.update("u1");
-                       kv.setCurrentKey(2);
-                       kv.update("u2");
-                       kv.setCurrentKey(3);
-                       kv.update("u3");
-
-                       // draw another snapshot
-                       KvStateSnapshot<Integer, String, MemoryStateBackend> 
snapshot2 =
-                                       kv.shapshot(682375462379L, 
System.currentTimeMillis());
-                       
-                       // validate the original state
-                       assertEquals(3, kv.size());
-                       kv.setCurrentKey(1);
-                       assertEquals("u1", kv.value());
-                       kv.setCurrentKey(2);
-                       assertEquals("u2", kv.value());
-                       kv.setCurrentKey(3);
-                       assertEquals("u3", kv.value());
-                       
-                       // restore the first snapshot and validate it
-                       KvState<Integer, String, MemoryStateBackend> restored1 
= snapshot1.restoreState(backend, 
-                                                       IntSerializer.INSTANCE, 
StringSerializer.INSTANCE, null, getClass().getClassLoader());
-
-                       assertEquals(2, restored1.size());
-                       restored1.setCurrentKey(1);
-                       assertEquals("1", restored1.value());
-                       restored1.setCurrentKey(2);
-                       assertEquals("2", restored1.value());
-
-                       // restore the first snapshot and validate it
-                       KvState<Integer, String, MemoryStateBackend> restored2 
= snapshot2.restoreState(backend,
-                                       IntSerializer.INSTANCE, 
StringSerializer.INSTANCE, null, getClass().getClassLoader());
-
-                       assertEquals(3, restored2.size());
-                       restored2.setCurrentKey(1);
-                       assertEquals("u1", restored2.value());
-                       restored2.setCurrentKey(2);
-                       assertEquals("u2", restored2.value());
-                       restored2.setCurrentKey(3);
-                       assertEquals("u3", restored2.value());
-               }
-               catch (Exception e) {
-                       e.printStackTrace();
-                       fail(e.getMessage());
-               }
-       }
-       
-       @Test
-       public void testRestoreWithWrongSerializers() {
-               try {
-                       MemoryStateBackend backend = new MemoryStateBackend();
-                       KvState<Integer, String, MemoryStateBackend> kv =
-                                       
backend.createKvState(IntSerializer.INSTANCE, StringSerializer.INSTANCE, null);
-                       
-                       kv.setCurrentKey(1);
-                       kv.update("1");
-                       kv.setCurrentKey(2);
-                       kv.update("2");
-                       
-                       KvStateSnapshot<Integer, String, MemoryStateBackend> 
snapshot =
-                                       kv.shapshot(682375462378L, 
System.currentTimeMillis());
-
-
-                       @SuppressWarnings("unchecked")
-                       TypeSerializer<Integer> fakeIntSerializer = 
-                                       (TypeSerializer<Integer>) 
(TypeSerializer<?>) FloatSerializer.INSTANCE;
-
-                       @SuppressWarnings("unchecked")
-                       TypeSerializer<String> fakeStringSerializer = 
-                                       (TypeSerializer<String>) 
(TypeSerializer<?>) new ValueSerializer<StringValue>(StringValue.class);
-
-                       try {
-                               snapshot.restoreState(backend, 
fakeIntSerializer,
-                                               StringSerializer.INSTANCE, 
null, getClass().getClassLoader());
-                               fail("should recognize wrong serializers");
-                       } catch (IllegalArgumentException e) {
-                               // expected
-                       } catch (Exception e) {
-                               fail("wrong exception");
-                       }
-
-                       try {
-                               snapshot.restoreState(backend, 
IntSerializer.INSTANCE,
-                                               fakeStringSerializer, null, 
getClass().getClassLoader());
-                               fail("should recognize wrong serializers");
-                       } catch (IllegalArgumentException e) {
-                               // expected
-                       } catch (Exception e) {
-                               fail("wrong exception");
-                       }
-
-                       try {
-                               snapshot.restoreState(backend, 
fakeIntSerializer,
-                                               fakeStringSerializer, null, 
getClass().getClassLoader());
-                               fail("should recognize wrong serializers");
-                       } catch (IllegalArgumentException e) {
-                               // expected
-                       } catch (Exception e) {
-                               fail("wrong exception");
-                       }
-               }
-               catch (Exception e) {
-                       e.printStackTrace();
-                       fail(e.getMessage());
-               }
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/a6890b28/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingAlignedProcessingTimeWindowOperatorTest.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingAlignedProcessingTimeWindowOperatorTest.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingAlignedProcessingTimeWindowOperatorTest.java
index dd76a67..ad3c838 100644
--- 
a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingAlignedProcessingTimeWindowOperatorTest.java
+++ 
b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingAlignedProcessingTimeWindowOperatorTest.java
@@ -28,8 +28,8 @@ import org.apache.flink.runtime.execution.Environment;
 import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
 import org.apache.flink.streaming.api.graph.StreamConfig;
 import org.apache.flink.streaming.api.operators.Output;
-import org.apache.flink.streaming.api.state.StateBackend;
-import org.apache.flink.streaming.api.state.memory.MemoryStateBackend;
+import org.apache.flink.runtime.state.StateBackend;
+import org.apache.flink.runtime.state.memory.MemoryStateBackend;
 import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
 import org.apache.flink.streaming.runtime.operators.Triggerable;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;

http://git-wip-us.apache.org/repos/asf/flink/blob/a6890b28/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AggregatingAlignedProcessingTimeWindowOperatorTest.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AggregatingAlignedProcessingTimeWindowOperatorTest.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AggregatingAlignedProcessingTimeWindowOperatorTest.java
index ab8e551..4bd260f 100644
--- 
a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AggregatingAlignedProcessingTimeWindowOperatorTest.java
+++ 
b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AggregatingAlignedProcessingTimeWindowOperatorTest.java
@@ -28,8 +28,8 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.execution.Environment;
 import org.apache.flink.streaming.api.graph.StreamConfig;
 import org.apache.flink.streaming.api.operators.Output;
-import org.apache.flink.streaming.api.state.StateBackend;
-import org.apache.flink.streaming.api.state.memory.MemoryStateBackend;
+import org.apache.flink.runtime.state.StateBackend;
+import org.apache.flink.runtime.state.memory.MemoryStateBackend;
 import org.apache.flink.streaming.runtime.operators.Triggerable;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.streaming.runtime.tasks.StreamTask;

http://git-wip-us.apache.org/repos/asf/flink/blob/a6890b28/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/MockContext.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/MockContext.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/MockContext.java
index 81d3a69..0c708c6 100644
--- 
a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/MockContext.java
+++ 
b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/MockContext.java
@@ -36,8 +36,8 @@ import 
org.apache.flink.runtime.operators.testutils.MockInputSplitProvider;
 import org.apache.flink.streaming.api.graph.StreamConfig;
 import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
 import org.apache.flink.streaming.api.operators.Output;
-import org.apache.flink.streaming.api.state.StateBackend;
-import org.apache.flink.streaming.api.state.memory.MemoryStateBackend;
+import org.apache.flink.runtime.state.StateBackend;
+import org.apache.flink.runtime.state.memory.MemoryStateBackend;
 import org.apache.flink.streaming.runtime.operators.Triggerable;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.streaming.runtime.tasks.StreamTask;

http://git-wip-us.apache.org/repos/asf/flink/blob/a6890b28/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java
index b83feca..01f95bc 100644
--- 
a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java
+++ 
b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java
@@ -30,8 +30,8 @@ import 
org.apache.flink.runtime.operators.testutils.MockInputSplitProvider;
 import org.apache.flink.streaming.api.graph.StreamConfig;
 import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
 import org.apache.flink.streaming.api.operators.Output;
-import org.apache.flink.streaming.api.state.StateBackend;
-import org.apache.flink.streaming.api.state.memory.MemoryStateBackend;
+import org.apache.flink.runtime.state.StateBackend;
+import org.apache.flink.runtime.state.memory.MemoryStateBackend;
 import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.streaming.runtime.tasks.StreamTask;

Reply via email to