http://git-wip-us.apache.org/repos/asf/flink/blob/a6890b28/flink-runtime/src/test/java/org/apache/flink/runtime/state/FileStateBackendTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/FileStateBackendTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/FileStateBackendTest.java
new file mode 100644
index 0000000..481fb98
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/FileStateBackendTest.java
@@ -0,0 +1,423 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.base.FloatSerializer;
+import org.apache.flink.api.common.typeutils.base.IntSerializer;
+import org.apache.flink.api.common.typeutils.base.StringSerializer;
+import org.apache.flink.api.java.typeutils.runtime.ValueSerializer;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.core.testutils.CommonTestUtils;
+import org.apache.flink.runtime.state.KvState;
+import org.apache.flink.runtime.state.KvStateSnapshot;
+import org.apache.flink.runtime.state.StateBackend;
+import org.apache.flink.runtime.state.StateHandle;
+import org.apache.flink.runtime.state.StreamStateHandle;
+import org.apache.flink.runtime.state.filesystem.FileStreamStateHandle;
+import org.apache.flink.runtime.state.filesystem.FsStateBackend;
+import org.apache.flink.types.StringValue;
+import org.apache.flink.util.OperatingSystem;
+
+import org.junit.Test;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.URI;
+import java.util.Random;
+import java.util.UUID;
+
+import static org.junit.Assert.*;
+
+public class FileStateBackendTest {
+       
+       @Test
+       public void testSetupAndSerialization() {
+               File tempDir = new 
File(ConfigConstants.DEFAULT_TASK_MANAGER_TMP_PATH, 
UUID.randomUUID().toString());
+               try {
+                       final String backendDir = localFileUri(tempDir);
+                       FsStateBackend originalBackend = new 
FsStateBackend(backendDir);
+                       
+                       assertFalse(originalBackend.isInitialized());
+                       assertEquals(new URI(backendDir), 
originalBackend.getBasePath().toUri());
+                       assertNull(originalBackend.getCheckpointDirectory());
+                       
+                       // serialize / copy the backend
+                       FsStateBackend backend = 
CommonTestUtils.createCopySerializable(originalBackend);
+                       assertFalse(backend.isInitialized());
+                       assertEquals(new URI(backendDir), 
backend.getBasePath().toUri());
+                       assertNull(backend.getCheckpointDirectory());
+                       
+                       // no file operations should be possible right now
+                       try {
+                               backend.checkpointStateSerializable("exception 
train rolling in", 2L, System.currentTimeMillis());
+                               fail("should fail with an exception");
+                       } catch (IllegalStateException e) {
+                               // supreme!
+                       }
+                       
+                       backend.initializeForJob(new JobID());
+                       assertNotNull(backend.getCheckpointDirectory());
+                       
+                       File checkpointDir = new 
File(backend.getCheckpointDirectory().toUri().getPath());
+                       assertTrue(checkpointDir.exists());
+                       assertTrue(isDirectoryEmpty(checkpointDir));
+                       
+                       backend.disposeAllStateForCurrentJob();
+                       assertNull(backend.getCheckpointDirectory());
+                       
+                       assertTrue(isDirectoryEmpty(tempDir));
+               }
+               catch (Exception e) {
+                       e.printStackTrace();
+                       fail(e.getMessage());
+               }
+               finally {
+                       deleteDirectorySilently(tempDir);
+               }
+       }
+       
+       @Test
+       public void testSerializableState() {
+               File tempDir = new 
File(ConfigConstants.DEFAULT_TASK_MANAGER_TMP_PATH, 
UUID.randomUUID().toString());
+               try {
+                       FsStateBackend backend = 
CommonTestUtils.createCopySerializable(new 
FsStateBackend(localFileUri(tempDir)));
+                       backend.initializeForJob(new JobID());
+
+                       File checkpointDir = new 
File(backend.getCheckpointDirectory().toUri().getPath());
+
+                       String state1 = "dummy state";
+                       String state2 = "row row row your boat";
+                       Integer state3 = 42;
+                       
+                       StateHandle<String> handle1 = 
backend.checkpointStateSerializable(state1, 439568923746L, 
System.currentTimeMillis());
+                       StateHandle<String> handle2 = 
backend.checkpointStateSerializable(state2, 439568923746L, 
System.currentTimeMillis());
+                       StateHandle<Integer> handle3 = 
backend.checkpointStateSerializable(state3, 439568923746L, 
System.currentTimeMillis());
+
+                       assertFalse(isDirectoryEmpty(checkpointDir));
+                       assertEquals(state1, 
handle1.getState(getClass().getClassLoader()));
+                       handle1.discardState();
+                       
+                       assertFalse(isDirectoryEmpty(checkpointDir));
+                       assertEquals(state2, 
handle2.getState(getClass().getClassLoader()));
+                       handle2.discardState();
+                       
+                       assertFalse(isDirectoryEmpty(checkpointDir));
+                       assertEquals(state3, 
handle3.getState(getClass().getClassLoader()));
+                       handle3.discardState();
+                       
+                       assertTrue(isDirectoryEmpty(checkpointDir));
+               }
+               catch (Exception e) {
+                       e.printStackTrace();
+                       fail(e.getMessage());
+               }
+               finally {
+                       deleteDirectorySilently(tempDir);
+               }
+       }
+
+       @Test
+       public void testStateOutputStream() {
+               File tempDir = new 
File(ConfigConstants.DEFAULT_TASK_MANAGER_TMP_PATH, 
UUID.randomUUID().toString());
+               try {
+                       FsStateBackend backend = 
CommonTestUtils.createCopySerializable(new 
FsStateBackend(localFileUri(tempDir)));
+                       backend.initializeForJob(new JobID());
+
+                       File checkpointDir = new 
File(backend.getCheckpointDirectory().toUri().getPath());
+
+                       byte[] state1 = new byte[1274673];
+                       byte[] state2 = new byte[1];
+                       byte[] state3 = new byte[0];
+                       byte[] state4 = new byte[177];
+                       
+                       Random rnd = new Random();
+                       rnd.nextBytes(state1);
+                       rnd.nextBytes(state2);
+                       rnd.nextBytes(state3);
+                       rnd.nextBytes(state4);
+
+                       long checkpointId = 97231523452L;
+
+                       FsStateBackend.FsCheckpointStateOutputStream stream1 = 
+                                       
backend.createCheckpointStateOutputStream(checkpointId, 
System.currentTimeMillis());
+                       FsStateBackend.FsCheckpointStateOutputStream stream2 =
+                                       
backend.createCheckpointStateOutputStream(checkpointId, 
System.currentTimeMillis());
+                       FsStateBackend.FsCheckpointStateOutputStream stream3 =
+                                       
backend.createCheckpointStateOutputStream(checkpointId, 
System.currentTimeMillis());
+                       
+                       stream1.write(state1);
+                       stream2.write(state2);
+                       stream3.write(state3);
+                       
+                       FileStreamStateHandle handle1 = 
stream1.closeAndGetHandle();
+                       FileStreamStateHandle handle2 = 
stream2.closeAndGetHandle();
+                       FileStreamStateHandle handle3 = 
stream3.closeAndGetHandle();
+                       
+                       // use with try-with-resources
+                       StreamStateHandle handle4;
+                       try (StateBackend.CheckpointStateOutputStream stream4 =
+                                       
backend.createCheckpointStateOutputStream(checkpointId, 
System.currentTimeMillis())) {
+                               stream4.write(state4);
+                               handle4 = stream4.closeAndGetHandle();
+                       }
+                       
+                       // close before accessing handle
+                       StateBackend.CheckpointStateOutputStream stream5 =
+                                       
backend.createCheckpointStateOutputStream(checkpointId, 
System.currentTimeMillis());
+                       stream5.write(state4);
+                       stream5.close();
+                       try {
+                               stream5.closeAndGetHandle();
+                               fail();
+                       } catch (IOException e) {
+                               // uh-huh
+                       }
+                       
+                       
validateBytesInStream(handle1.getState(getClass().getClassLoader()), state1);
+                       handle1.discardState();
+                       assertFalse(isDirectoryEmpty(checkpointDir));
+                       ensureLocalFileDeleted(handle1.getFilePath());
+                       
+                       
validateBytesInStream(handle2.getState(getClass().getClassLoader()), state2);
+                       handle2.discardState();
+                       assertFalse(isDirectoryEmpty(checkpointDir));
+                       ensureLocalFileDeleted(handle2.getFilePath());
+                       
+                       
validateBytesInStream(handle3.getState(getClass().getClassLoader()), state3);
+                       handle3.discardState();
+                       assertFalse(isDirectoryEmpty(checkpointDir));
+                       ensureLocalFileDeleted(handle3.getFilePath());
+                       
+                       
validateBytesInStream(handle4.getState(getClass().getClassLoader()), state4);
+                       handle4.discardState();
+                       assertTrue(isDirectoryEmpty(checkpointDir));
+               }
+               catch (Exception e) {
+                       e.printStackTrace();
+                       fail(e.getMessage());
+               }
+               finally {
+                       deleteDirectorySilently(tempDir);
+               }
+       }
+
+       @Test
+       public void testKeyValueState() {
+               File tempDir = new 
File(ConfigConstants.DEFAULT_TASK_MANAGER_TMP_PATH, 
UUID.randomUUID().toString());
+               try {
+                       FsStateBackend backend = 
CommonTestUtils.createCopySerializable(new 
FsStateBackend(localFileUri(tempDir)));
+                       backend.initializeForJob(new JobID());
+
+                       File checkpointDir = new 
File(backend.getCheckpointDirectory().toUri().getPath());
+
+                       KvState<Integer, String, FsStateBackend> kv =
+                                       
backend.createKvState(IntSerializer.INSTANCE, StringSerializer.INSTANCE, null);
+
+                       assertEquals(0, kv.size());
+
+                       // some modifications to the state
+                       kv.setCurrentKey(1);
+                       assertNull(kv.value());
+                       kv.update("1");
+                       assertEquals(1, kv.size());
+                       kv.setCurrentKey(2);
+                       assertNull(kv.value());
+                       kv.update("2");
+                       assertEquals(2, kv.size());
+                       kv.setCurrentKey(1);
+                       assertEquals("1", kv.value());
+                       assertEquals(2, kv.size());
+
+                       // draw a snapshot
+                       KvStateSnapshot<Integer, String, FsStateBackend> 
snapshot1 =
+                                       kv.shapshot(682375462378L, 
System.currentTimeMillis());
+
+                       // make some more modifications
+                       kv.setCurrentKey(1);
+                       kv.update("u1");
+                       kv.setCurrentKey(2);
+                       kv.update("u2");
+                       kv.setCurrentKey(3);
+                       kv.update("u3");
+
+                       // draw another snapshot
+                       KvStateSnapshot<Integer, String, FsStateBackend> 
snapshot2 =
+                                       kv.shapshot(682375462379L, 
System.currentTimeMillis());
+
+                       // validate the original state
+                       assertEquals(3, kv.size());
+                       kv.setCurrentKey(1);
+                       assertEquals("u1", kv.value());
+                       kv.setCurrentKey(2);
+                       assertEquals("u2", kv.value());
+                       kv.setCurrentKey(3);
+                       assertEquals("u3", kv.value());
+
+                       // restore the first snapshot and validate it
+                       KvState<Integer, String, FsStateBackend> restored1 = 
snapshot1.restoreState(backend,
+                                       IntSerializer.INSTANCE, 
StringSerializer.INSTANCE, null, getClass().getClassLoader());
+
+                       assertEquals(2, restored1.size());
+                       restored1.setCurrentKey(1);
+                       assertEquals("1", restored1.value());
+                       restored1.setCurrentKey(2);
+                       assertEquals("2", restored1.value());
+
+                       // restore the first snapshot and validate it
+                       KvState<Integer, String, FsStateBackend> restored2 = 
snapshot2.restoreState(backend,
+                                       IntSerializer.INSTANCE, 
StringSerializer.INSTANCE, null, getClass().getClassLoader());
+
+                       assertEquals(3, restored2.size());
+                       restored2.setCurrentKey(1);
+                       assertEquals("u1", restored2.value());
+                       restored2.setCurrentKey(2);
+                       assertEquals("u2", restored2.value());
+                       restored2.setCurrentKey(3);
+                       assertEquals("u3", restored2.value());
+
+                       snapshot1.discardState();
+                       assertFalse(isDirectoryEmpty(checkpointDir));
+
+                       snapshot2.discardState();
+                       assertTrue(isDirectoryEmpty(checkpointDir));
+               }
+               catch (Exception e) {
+                       e.printStackTrace();
+                       fail(e.getMessage());
+               }
+               finally {
+                       deleteDirectorySilently(tempDir);
+               }
+       }
+
+       @Test
+       public void testRestoreWithWrongSerializers() {
+               File tempDir = new 
File(ConfigConstants.DEFAULT_TASK_MANAGER_TMP_PATH, 
UUID.randomUUID().toString());
+               try {
+                       FsStateBackend backend = 
CommonTestUtils.createCopySerializable(new 
FsStateBackend(localFileUri(tempDir)));
+                       backend.initializeForJob(new JobID());
+
+                       File checkpointDir = new 
File(backend.getCheckpointDirectory().toUri().getPath());
+                       
+                       KvState<Integer, String, FsStateBackend> kv =
+                                       
backend.createKvState(IntSerializer.INSTANCE, StringSerializer.INSTANCE, null);
+
+                       kv.setCurrentKey(1);
+                       kv.update("1");
+                       kv.setCurrentKey(2);
+                       kv.update("2");
+
+                       KvStateSnapshot<Integer, String, FsStateBackend> 
snapshot =
+                                       kv.shapshot(682375462378L, 
System.currentTimeMillis());
+
+
+                       @SuppressWarnings("unchecked")
+                       TypeSerializer<Integer> fakeIntSerializer =
+                                       (TypeSerializer<Integer>) 
(TypeSerializer<?>) FloatSerializer.INSTANCE;
+
+                       @SuppressWarnings("unchecked")
+                       TypeSerializer<String> fakeStringSerializer =
+                                       (TypeSerializer<String>) 
(TypeSerializer<?>) new ValueSerializer<StringValue>(StringValue.class);
+
+                       try {
+                               snapshot.restoreState(backend, 
fakeIntSerializer,
+                                               StringSerializer.INSTANCE, 
null, getClass().getClassLoader());
+                               fail("should recognize wrong serializers");
+                       } catch (IllegalArgumentException e) {
+                               // expected
+                       } catch (Exception e) {
+                               fail("wrong exception");
+                       }
+
+                       try {
+                               snapshot.restoreState(backend, 
IntSerializer.INSTANCE,
+                                               fakeStringSerializer, null, 
getClass().getClassLoader());
+                               fail("should recognize wrong serializers");
+                       } catch (IllegalArgumentException e) {
+                               // expected
+                       } catch (Exception e) {
+                               fail("wrong exception");
+                       }
+
+                       try {
+                               snapshot.restoreState(backend, 
fakeIntSerializer,
+                                               fakeStringSerializer, null, 
getClass().getClassLoader());
+                               fail("should recognize wrong serializers");
+                       } catch (IllegalArgumentException e) {
+                               // expected
+                       } catch (Exception e) {
+                               fail("wrong exception");
+                       }
+                       
+                       snapshot.discardState();
+
+                       assertTrue(isDirectoryEmpty(checkpointDir));
+               }
+               catch (Exception e) {
+                       e.printStackTrace();
+                       fail(e.getMessage());
+               }
+               finally {
+                       deleteDirectorySilently(tempDir);
+               }
+       }
+       
+       // 
------------------------------------------------------------------------
+       //  Utilities
+       // 
------------------------------------------------------------------------
+       
+       private static void ensureLocalFileDeleted(Path path) {
+               URI uri = path.toUri();
+               if ("file".equals(uri.getScheme())) {
+                       File file = new File(uri.getPath());
+                       assertFalse("file not properly deleted", file.exists());
+               }
+               else {
+                       throw new IllegalArgumentException("not a local path");
+               }
+       }
+       
+       private static void deleteDirectorySilently(File dir) {
+               try {
+                       FileUtils.deleteDirectory(dir);
+               }
+               catch (IOException ignored) {}
+       }
+       
+       private static boolean isDirectoryEmpty(File directory) {
+               String[] nested = directory.list();
+               return  nested == null || nested.length == 0;
+       }
+       
+       private static String localFileUri(File path) {
+               return (OperatingSystem.isWindows() ? "file:/" : "file://") + 
path.getAbsolutePath();
+       }
+       
+       private static void validateBytesInStream(InputStream is, byte[] data) 
throws IOException {
+               byte[] holder = new byte[data.length];
+               assertEquals("not enough data", holder.length, is.read(holder));
+               assertEquals("too much data", -1, is.read());
+               assertArrayEquals("wrong data", data, holder);
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/a6890b28/flink-runtime/src/test/java/org/apache/flink/runtime/state/MemoryStateBackendTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/MemoryStateBackendTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/MemoryStateBackendTest.java
new file mode 100644
index 0000000..5f95b33
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/MemoryStateBackendTest.java
@@ -0,0 +1,282 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.base.FloatSerializer;
+import org.apache.flink.api.common.typeutils.base.IntSerializer;
+import org.apache.flink.api.common.typeutils.base.StringSerializer;
+import org.apache.flink.api.java.typeutils.runtime.ValueSerializer;
+import org.apache.flink.runtime.state.KvState;
+import org.apache.flink.runtime.state.KvStateSnapshot;
+import org.apache.flink.runtime.state.StateBackend;
+import org.apache.flink.runtime.state.StateHandle;
+import org.apache.flink.runtime.state.StreamStateHandle;
+import org.apache.flink.runtime.state.memory.MemoryStateBackend;
+import org.apache.flink.types.StringValue;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.util.HashMap;
+
+import static org.junit.Assert.*;
+
+/**
+ * Tests for the {@link 
org.apache.flink.runtime.state.memory.MemoryStateBackend}.
+ */
+public class MemoryStateBackendTest {
+       
+       @Test
+       public void testSerializableState() {
+               try {
+                       MemoryStateBackend backend = new MemoryStateBackend();
+
+                       HashMap<String, Integer> state = new HashMap<>();
+                       state.put("hey there", 2);
+                       state.put("the crazy brown fox stumbles over a sentence 
that does not contain every letter", 77);
+                       
+                       StateHandle<HashMap<String, Integer>> handle = 
backend.checkpointStateSerializable(state, 12, 459);
+                       assertNotNull(handle);
+                       
+                       HashMap<String, Integer> restored = 
handle.getState(getClass().getClassLoader());
+                       assertEquals(state, restored);
+               }
+               catch (Exception e) {
+                       e.printStackTrace();
+                       fail(e.getMessage());
+               }
+       }
+
+       @Test
+       public void testOversizedState() {
+               try {
+                       MemoryStateBackend backend = new MemoryStateBackend(10);
+
+                       HashMap<String, Integer> state = new HashMap<>();
+                       state.put("hey there", 2);
+                       state.put("the crazy brown fox stumbles over a sentence 
that does not contain every letter", 77);
+
+                       try {
+                               backend.checkpointStateSerializable(state, 12, 
459);
+                               fail("this should cause an exception");
+                       }
+                       catch (IOException e) {
+                               // now darling, isn't that exactly what we 
wanted?
+                       }
+               }
+               catch (Exception e) {
+                       e.printStackTrace();
+                       fail(e.getMessage());
+               }
+       }
+
+       @Test
+       public void testStateStream() {
+               try {
+                       MemoryStateBackend backend = new MemoryStateBackend();
+
+                       HashMap<String, Integer> state = new HashMap<>();
+                       state.put("hey there", 2);
+                       state.put("the crazy brown fox stumbles over a sentence 
that does not contain every letter", 77);
+
+                       StateBackend.CheckpointStateOutputStream os = 
backend.createCheckpointStateOutputStream(1, 2);
+                       ObjectOutputStream oos = new ObjectOutputStream(os);
+                       oos.writeObject(state);
+                       oos.flush();
+                       StreamStateHandle handle = os.closeAndGetHandle();
+                       
+                       assertNotNull(handle);
+
+                       ObjectInputStream ois = new 
ObjectInputStream(handle.getState(getClass().getClassLoader()));
+                       assertEquals(state, ois.readObject());
+                       assertTrue(ois.available() <= 0);
+                       ois.close();
+               }
+               catch (Exception e) {
+                       e.printStackTrace();
+                       fail(e.getMessage());
+               }
+       }
+
+       @Test
+       public void testOversizedStateStream() {
+               try {
+                       MemoryStateBackend backend = new MemoryStateBackend(10);
+
+                       HashMap<String, Integer> state = new HashMap<>();
+                       state.put("hey there", 2);
+                       state.put("the crazy brown fox stumbles over a sentence 
that does not contain every letter", 77);
+
+                       StateBackend.CheckpointStateOutputStream os = 
backend.createCheckpointStateOutputStream(1, 2);
+                       ObjectOutputStream oos = new ObjectOutputStream(os);
+                       
+                       try {
+                               oos.writeObject(state);
+                               oos.flush();
+                               os.closeAndGetHandle();
+                               fail("this should cause an exception");
+                       }
+                       catch (IOException e) {
+                               // oh boy! what an exception!
+                       }
+               }
+               catch (Exception e) {
+                       e.printStackTrace();
+                       fail(e.getMessage());
+               }
+       }
+       
+       @Test
+       public void testKeyValueState() {
+               try {
+                       MemoryStateBackend backend = new MemoryStateBackend();
+                       
+                       KvState<Integer, String, MemoryStateBackend> kv = 
+                                       
backend.createKvState(IntSerializer.INSTANCE, StringSerializer.INSTANCE, null);
+                       
+                       assertEquals(0, kv.size());
+                       
+                       // some modifications to the state
+                       kv.setCurrentKey(1);
+                       assertNull(kv.value());
+                       kv.update("1");
+                       assertEquals(1, kv.size());
+                       kv.setCurrentKey(2);
+                       assertNull(kv.value());
+                       kv.update("2");
+                       assertEquals(2, kv.size());
+                       kv.setCurrentKey(1);
+                       assertEquals("1", kv.value());
+                       assertEquals(2, kv.size());
+                       
+                       // draw a snapshot
+                       KvStateSnapshot<Integer, String, MemoryStateBackend> 
snapshot1 = 
+                                       kv.shapshot(682375462378L, 
System.currentTimeMillis());
+                       
+                       // make some more modifications
+                       kv.setCurrentKey(1);
+                       kv.update("u1");
+                       kv.setCurrentKey(2);
+                       kv.update("u2");
+                       kv.setCurrentKey(3);
+                       kv.update("u3");
+
+                       // draw another snapshot
+                       KvStateSnapshot<Integer, String, MemoryStateBackend> 
snapshot2 =
+                                       kv.shapshot(682375462379L, 
System.currentTimeMillis());
+                       
+                       // validate the original state
+                       assertEquals(3, kv.size());
+                       kv.setCurrentKey(1);
+                       assertEquals("u1", kv.value());
+                       kv.setCurrentKey(2);
+                       assertEquals("u2", kv.value());
+                       kv.setCurrentKey(3);
+                       assertEquals("u3", kv.value());
+                       
+                       // restore the first snapshot and validate it
+                       KvState<Integer, String, MemoryStateBackend> restored1 
= snapshot1.restoreState(backend, 
+                                                       IntSerializer.INSTANCE, 
StringSerializer.INSTANCE, null, getClass().getClassLoader());
+
+                       assertEquals(2, restored1.size());
+                       restored1.setCurrentKey(1);
+                       assertEquals("1", restored1.value());
+                       restored1.setCurrentKey(2);
+                       assertEquals("2", restored1.value());
+
+                       // restore the first snapshot and validate it
+                       KvState<Integer, String, MemoryStateBackend> restored2 
= snapshot2.restoreState(backend,
+                                       IntSerializer.INSTANCE, 
StringSerializer.INSTANCE, null, getClass().getClassLoader());
+
+                       assertEquals(3, restored2.size());
+                       restored2.setCurrentKey(1);
+                       assertEquals("u1", restored2.value());
+                       restored2.setCurrentKey(2);
+                       assertEquals("u2", restored2.value());
+                       restored2.setCurrentKey(3);
+                       assertEquals("u3", restored2.value());
+               }
+               catch (Exception e) {
+                       e.printStackTrace();
+                       fail(e.getMessage());
+               }
+       }
+       
+       @Test
+       public void testRestoreWithWrongSerializers() {
+               try {
+                       MemoryStateBackend backend = new MemoryStateBackend();
+                       KvState<Integer, String, MemoryStateBackend> kv =
+                                       
backend.createKvState(IntSerializer.INSTANCE, StringSerializer.INSTANCE, null);
+                       
+                       kv.setCurrentKey(1);
+                       kv.update("1");
+                       kv.setCurrentKey(2);
+                       kv.update("2");
+                       
+                       KvStateSnapshot<Integer, String, MemoryStateBackend> 
snapshot =
+                                       kv.shapshot(682375462378L, 
System.currentTimeMillis());
+
+
+                       @SuppressWarnings("unchecked")
+                       TypeSerializer<Integer> fakeIntSerializer = 
+                                       (TypeSerializer<Integer>) 
(TypeSerializer<?>) FloatSerializer.INSTANCE;
+
+                       @SuppressWarnings("unchecked")
+                       TypeSerializer<String> fakeStringSerializer = 
+                                       (TypeSerializer<String>) 
(TypeSerializer<?>) new ValueSerializer<StringValue>(StringValue.class);
+
+                       try {
+                               snapshot.restoreState(backend, 
fakeIntSerializer,
+                                               StringSerializer.INSTANCE, 
null, getClass().getClassLoader());
+                               fail("should recognize wrong serializers");
+                       } catch (IllegalArgumentException e) {
+                               // expected
+                       } catch (Exception e) {
+                               fail("wrong exception");
+                       }
+
+                       try {
+                               snapshot.restoreState(backend, 
IntSerializer.INSTANCE,
+                                               fakeStringSerializer, null, 
getClass().getClassLoader());
+                               fail("should recognize wrong serializers");
+                       } catch (IllegalArgumentException e) {
+                               // expected
+                       } catch (Exception e) {
+                               fail("wrong exception");
+                       }
+
+                       try {
+                               snapshot.restoreState(backend, 
fakeIntSerializer,
+                                               fakeStringSerializer, null, 
getClass().getClassLoader());
+                               fail("should recognize wrong serializers");
+                       } catch (IllegalArgumentException e) {
+                               // expected
+                       } catch (Exception e) {
+                               fail("wrong exception");
+                       }
+               }
+               catch (Exception e) {
+                       e.printStackTrace();
+                       fail(e.getMessage());
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/a6890b28/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/ZooKeeperTestUtils.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/ZooKeeperTestUtils.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/ZooKeeperTestUtils.java
index d2e5b6a..a65ec01 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/ZooKeeperTestUtils.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/ZooKeeperTestUtils.java
@@ -21,6 +21,7 @@ package org.apache.flink.runtime.testutils;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.jobmanager.RecoveryMode;
+import org.apache.flink.runtime.state.filesystem.FsStateBackendFactory;
 
 import static com.google.common.base.Preconditions.checkNotNull;
 
@@ -79,7 +80,7 @@ public class ZooKeeperTestUtils {
 
                // File system state backend
                config.setString(ConfigConstants.STATE_BACKEND, "FILESYSTEM");
-               config.setString(ConfigConstants.STATE_BACKEND_FS_DIR, 
fsStateHandlePath + "/checkpoints");
+               
config.setString(FsStateBackendFactory.CHECKPOINT_DIRECTORY_URI_CONF_KEY, 
fsStateHandlePath + "/checkpoints");
                
config.setString(ConfigConstants.STATE_BACKEND_FS_RECOVERY_PATH, 
fsStateHandlePath + "/recovery");
 
                // Akka failure detection and execution retries

http://git-wip-us.apache.org/repos/asf/flink/blob/a6890b28/flink-runtime/src/test/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStoreITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStoreITCase.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStoreITCase.java
index f0130ec..788f70d 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStoreITCase.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStoreITCase.java
@@ -23,7 +23,6 @@ import org.apache.curator.framework.api.BackgroundCallback;
 import org.apache.curator.framework.api.CuratorEvent;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.runtime.state.StateHandle;
-import org.apache.flink.runtime.state.StateHandleProvider;
 import org.apache.flink.util.InstantiationUtil;
 import org.apache.flink.util.TestLogger;
 import org.apache.zookeeper.CreateMode;
@@ -83,11 +82,9 @@ public class ZooKeeperStateHandleStoreITCase extends 
TestLogger {
         */
        @Test
        public void testAdd() throws Exception {
-               // Setup
-               LongStateHandleProvider stateHandleProvider = new 
LongStateHandleProvider();
-
-               ZooKeeperStateHandleStore<Long> store = new 
ZooKeeperStateHandleStore<>(
-                               ZooKeeper.getClient(), stateHandleProvider);
+               LongStateStorage longStateStorage = new LongStateStorage();
+               ZooKeeperStateHandleStore<Long> store = new 
ZooKeeperStateHandleStore<Long>(
+                               ZooKeeper.getClient(), longStateStorage);
 
                // Config
                final String pathInZooKeeper = "/testAdd";
@@ -98,8 +95,8 @@ public class ZooKeeperStateHandleStoreITCase extends 
TestLogger {
 
                // Verify
                // State handle created
-               assertEquals(1, stateHandleProvider.getStateHandles().size());
-               assertEquals(state, 
stateHandleProvider.getStateHandles().get(0).getState(null));
+               assertEquals(1, store.getAll().size());
+               assertEquals(state, store.get(pathInZooKeeper).getState(null));
 
                // Path created and is persistent
                Stat stat = 
ZooKeeper.getClient().checkExists().forPath(pathInZooKeeper);
@@ -120,10 +117,9 @@ public class ZooKeeperStateHandleStoreITCase extends 
TestLogger {
         */
        @Test
        public void testAddWithCreateMode() throws Exception {
-               LongStateHandleProvider stateHandleProvider = new 
LongStateHandleProvider();
-
-               ZooKeeperStateHandleStore<Long> store = new 
ZooKeeperStateHandleStore<>(
-                               ZooKeeper.getClient(), stateHandleProvider);
+               LongStateStorage longStateStorage = new LongStateStorage();
+               ZooKeeperStateHandleStore<Long> store = new 
ZooKeeperStateHandleStore<Long>(
+                               ZooKeeper.getClient(), longStateStorage);
 
                // Config
                Long state = 3457347234L;
@@ -151,8 +147,8 @@ public class ZooKeeperStateHandleStoreITCase extends 
TestLogger {
 
                        // Verify
                        // State handle created
-                       assertEquals(i + 1, 
stateHandleProvider.getStateHandles().size());
-                       assertEquals(state, 
stateHandleProvider.getStateHandles().get(i).getState(null));
+                       assertEquals(i + 1, store.getAll().size());
+                       assertEquals(state, 
longStateStorage.getStateHandles().get(i).getState(null));
 
                        // Path created
                        Stat stat = 
ZooKeeper.getClient().checkExists().forPath(pathInZooKeeper);
@@ -182,7 +178,7 @@ public class ZooKeeperStateHandleStoreITCase extends 
TestLogger {
         */
        @Test(expected = Exception.class)
        public void testAddAlreadyExistingPath() throws Exception {
-               LongStateHandleProvider stateHandleProvider = new 
LongStateHandleProvider();
+               LongStateStorage stateHandleProvider = new LongStateStorage();
 
                ZooKeeperStateHandleStore<Long> store = new 
ZooKeeperStateHandleStore<>(
                                ZooKeeper.getClient(), stateHandleProvider);
@@ -198,7 +194,7 @@ public class ZooKeeperStateHandleStoreITCase extends 
TestLogger {
        @Test
        public void testAddDiscardStateHandleAfterFailure() throws Exception {
                // Setup
-               LongStateHandleProvider stateHandleProvider = new 
LongStateHandleProvider();
+               LongStateStorage stateHandleProvider = new LongStateStorage();
 
                CuratorFramework client = spy(ZooKeeper.getClient());
                when(client.create()).thenThrow(new RuntimeException("Expected 
test Exception."));
@@ -231,7 +227,7 @@ public class ZooKeeperStateHandleStoreITCase extends 
TestLogger {
        @Test
        public void testReplace() throws Exception {
                // Setup
-               LongStateHandleProvider stateHandleProvider = new 
LongStateHandleProvider();
+               LongStateStorage stateHandleProvider = new LongStateStorage();
 
                ZooKeeperStateHandleStore<Long> store = new 
ZooKeeperStateHandleStore<>(
                                ZooKeeper.getClient(), stateHandleProvider);
@@ -270,10 +266,10 @@ public class ZooKeeperStateHandleStoreITCase extends 
TestLogger {
         */
        @Test(expected = Exception.class)
        public void testReplaceNonExistingPath() throws Exception {
-               StateHandleProvider<Long> stateHandleProvider = new 
LongStateHandleProvider();
+               StateStorageHelper<Long> stateStorage = new LongStateStorage();
 
                ZooKeeperStateHandleStore<Long> store = new 
ZooKeeperStateHandleStore<>(
-                               ZooKeeper.getClient(), stateHandleProvider);
+                               ZooKeeper.getClient(), stateStorage);
 
                store.replace("/testReplaceNonExistingPath", 0, 1L);
        }
@@ -284,7 +280,7 @@ public class ZooKeeperStateHandleStoreITCase extends 
TestLogger {
        @Test
        public void testReplaceDiscardStateHandleAfterFailure() throws 
Exception {
                // Setup
-               LongStateHandleProvider stateHandleProvider = new 
LongStateHandleProvider();
+               LongStateStorage stateHandleProvider = new LongStateStorage();
 
                CuratorFramework client = spy(ZooKeeper.getClient());
                when(client.setData()).thenThrow(new RuntimeException("Expected 
test Exception."));
@@ -329,7 +325,7 @@ public class ZooKeeperStateHandleStoreITCase extends 
TestLogger {
        @Test
        public void testGetAndExists() throws Exception {
                // Setup
-               LongStateHandleProvider stateHandleProvider = new 
LongStateHandleProvider();
+               LongStateStorage stateHandleProvider = new LongStateStorage();
 
                ZooKeeperStateHandleStore<Long> store = new 
ZooKeeperStateHandleStore<>(
                                ZooKeeper.getClient(), stateHandleProvider);
@@ -354,7 +350,7 @@ public class ZooKeeperStateHandleStoreITCase extends 
TestLogger {
         */
        @Test(expected = Exception.class)
        public void testGetNonExistingPath() throws Exception {
-               LongStateHandleProvider stateHandleProvider = new 
LongStateHandleProvider();
+               LongStateStorage stateHandleProvider = new LongStateStorage();
 
                ZooKeeperStateHandleStore<Long> store = new 
ZooKeeperStateHandleStore<>(
                                ZooKeeper.getClient(), stateHandleProvider);
@@ -368,7 +364,7 @@ public class ZooKeeperStateHandleStoreITCase extends 
TestLogger {
        @Test
        public void testGetAll() throws Exception {
                // Setup
-               LongStateHandleProvider stateHandleProvider = new 
LongStateHandleProvider();
+               LongStateStorage stateHandleProvider = new LongStateStorage();
 
                ZooKeeperStateHandleStore<Long> store = new 
ZooKeeperStateHandleStore<>(
                                ZooKeeper.getClient(), stateHandleProvider);
@@ -399,7 +395,7 @@ public class ZooKeeperStateHandleStoreITCase extends 
TestLogger {
        @Test
        public void testGetAllSortedByName() throws Exception {
                // Setup
-               LongStateHandleProvider stateHandleProvider = new 
LongStateHandleProvider();
+               LongStateStorage stateHandleProvider = new LongStateStorage();
 
                ZooKeeperStateHandleStore<Long> store = new 
ZooKeeperStateHandleStore<>(
                                ZooKeeper.getClient(), stateHandleProvider);
@@ -429,7 +425,7 @@ public class ZooKeeperStateHandleStoreITCase extends 
TestLogger {
        @Test
        public void testRemove() throws Exception {
                // Setup
-               LongStateHandleProvider stateHandleProvider = new 
LongStateHandleProvider();
+               LongStateStorage stateHandleProvider = new LongStateStorage();
 
                ZooKeeperStateHandleStore<Long> store = new 
ZooKeeperStateHandleStore<>(
                                ZooKeeper.getClient(), stateHandleProvider);
@@ -453,7 +449,7 @@ public class ZooKeeperStateHandleStoreITCase extends 
TestLogger {
        @Test
        public void testRemoveWithCallback() throws Exception {
                // Setup
-               LongStateHandleProvider stateHandleProvider = new 
LongStateHandleProvider();
+               LongStateStorage stateHandleProvider = new LongStateStorage();
 
                ZooKeeperStateHandleStore<Long> store = new 
ZooKeeperStateHandleStore<>(
                                ZooKeeper.getClient(), stateHandleProvider);
@@ -492,7 +488,7 @@ public class ZooKeeperStateHandleStoreITCase extends 
TestLogger {
        @Test
        public void testRemoveAndDiscardState() throws Exception {
                // Setup
-               LongStateHandleProvider stateHandleProvider = new 
LongStateHandleProvider();
+               LongStateStorage stateHandleProvider = new LongStateStorage();
 
                ZooKeeperStateHandleStore<Long> store = new 
ZooKeeperStateHandleStore<>(
                                ZooKeeper.getClient(), stateHandleProvider);
@@ -514,7 +510,7 @@ public class ZooKeeperStateHandleStoreITCase extends 
TestLogger {
        @Test
        public void testRemoveAndDiscardAllState() throws Exception {
                // Setup
-               LongStateHandleProvider stateHandleProvider = new 
LongStateHandleProvider();
+               LongStateStorage stateHandleProvider = new LongStateStorage();
 
                ZooKeeperStateHandleStore<Long> store = new 
ZooKeeperStateHandleStore<>(
                                ZooKeeper.getClient(), stateHandleProvider);
@@ -543,21 +539,19 @@ public class ZooKeeperStateHandleStoreITCase extends 
TestLogger {
        // Simple test helpers
        // 
---------------------------------------------------------------------------------------------
 
-       private static class LongStateHandleProvider implements 
StateHandleProvider<Long> {
-
-               private static final long serialVersionUID = 
4572084854499402276L;
+       private static class LongStateStorage implements 
StateStorageHelper<Long> {
 
                private final List<LongStateHandle> stateHandles = new 
ArrayList<>();
 
                @Override
-               public StateHandle<Long> createStateHandle(Long state) {
+               public StateHandle<Long> store(Long state) throws Exception {
                        LongStateHandle stateHandle = new 
LongStateHandle(state);
                        stateHandles.add(stateHandle);
 
                        return stateHandle;
                }
 
-               public List<LongStateHandle> getStateHandles() {
+               List<LongStateHandle> getStateHandles() {
                        return stateHandles;
                }
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/a6890b28/flink-runtime/src/test/resources/log4j-test.properties
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/resources/log4j-test.properties 
b/flink-runtime/src/test/resources/log4j-test.properties
index 1ca02aa..f77ed07 100644
--- a/flink-runtime/src/test/resources/log4j-test.properties
+++ b/flink-runtime/src/test/resources/log4j-test.properties
@@ -16,7 +16,7 @@
 # limitations under the License.
 
################################################################################
 
-log4j.rootLogger=INFO, console
+log4j.rootLogger=OFF, console
 
 # -----------------------------------------------------------------------------
 # Console (use 'console')
@@ -36,3 +36,4 @@ log4j.appender.file.layout.ConversionPattern=%d{HH:mm:ss,SSS} 
%-5p %-60c %x - %m
 
 # suppress the irrelevant (wrong) warnings from the netty channel handler
 log4j.logger.org.jboss.netty.channel.DefaultChannelPipeline=ERROR, console
+log4j.logger.org.apache.flink.runtime.blob=DEBUG

http://git-wip-us.apache.org/repos/asf/flink/blob/a6890b28/flink-staging/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/FileStateBackendTest.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/FileStateBackendTest.java
 
b/flink-staging/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/FileStateBackendTest.java
index 8b7fb1c..4e4acd2 100644
--- 
a/flink-staging/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/FileStateBackendTest.java
+++ 
b/flink-staging/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/FileStateBackendTest.java
@@ -27,11 +27,11 @@ import org.apache.flink.core.fs.FileSystem;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.core.testutils.CommonTestUtils;
 import org.apache.flink.runtime.state.StateHandle;
-import org.apache.flink.streaming.api.state.filesystem.FileStreamStateHandle;
-import org.apache.flink.streaming.api.state.filesystem.FsStateBackend;
+import org.apache.flink.runtime.state.filesystem.FileStreamStateHandle;
+import org.apache.flink.runtime.state.filesystem.FsStateBackend;
 
-import org.apache.flink.streaming.api.state.StateBackend;
-import org.apache.flink.streaming.api.state.StreamStateHandle;
+import org.apache.flink.runtime.state.StateBackend;
+import org.apache.flink.runtime.state.StreamStateHandle;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/a6890b28/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
index 26e1c9e..98506e0 100644
--- 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
+++ 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
@@ -63,7 +63,7 @@ import 
org.apache.flink.streaming.api.functions.source.StatefulSequenceSource;
 import org.apache.flink.streaming.api.graph.StreamGraph;
 import org.apache.flink.streaming.api.graph.StreamGraphGenerator;
 import org.apache.flink.streaming.api.operators.StreamSource;
-import org.apache.flink.streaming.api.state.StateBackend;
+import org.apache.flink.runtime.state.StateBackend;
 import org.apache.flink.streaming.api.transformations.StreamTransformation;
 import org.apache.flink.types.StringValue;
 import org.apache.flink.util.SplittableIterator;
@@ -372,11 +372,11 @@ public abstract class StreamExecutionEnvironment {
         * the key/value state, and for checkpointed functions (implementing 
the interface
         * {@link org.apache.flink.streaming.api.checkpoint.Checkpointed}).
         *
-        * <p>The {@link 
org.apache.flink.streaming.api.state.memory.MemoryStateBackend} for example
+        * <p>The {@link 
org.apache.flink.runtime.state.memory.MemoryStateBackend} for example
         * maintains the state in heap memory, as objects. It is lightweight 
without extra dependencies,
         * but can checkpoint only small states (some counters).
         * 
-        * <p>In contrast, the {@link 
org.apache.flink.streaming.api.state.filesystem.FsStateBackend}
+        * <p>In contrast, the {@link 
org.apache.flink.runtime.state.filesystem.FsStateBackend}
         * stores checkpoints of the state (also maintained as heap objects) in 
files. When using a replicated
         * file system (like HDFS, S3, MapR FS, Tachyon, etc) this will 
guarantee that state is not lost upon
         * failures of individual nodes and that streaming program can be 
executed highly available and strongly

http://git-wip-us.apache.org/repos/asf/flink/blob/a6890b28/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/MessageAcknowledingSourceBase.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/MessageAcknowledingSourceBase.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/MessageAcknowledingSourceBase.java
index 3817ede..3ac63af 100644
--- 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/MessageAcknowledingSourceBase.java
+++ 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/MessageAcknowledingSourceBase.java
@@ -26,8 +26,7 @@ import org.apache.flink.api.java.typeutils.TypeExtractor;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.streaming.api.checkpoint.CheckpointNotifier;
 import org.apache.flink.streaming.api.checkpoint.Checkpointed;
-import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
-import org.apache.flink.streaming.api.state.SerializedCheckpointData;
+import org.apache.flink.runtime.state.SerializedCheckpointData;
 
 import java.util.ArrayDeque;
 import java.util.ArrayList;

http://git-wip-us.apache.org/repos/asf/flink/blob/a6890b28/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamConfig.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamConfig.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamConfig.java
index 76be598..11bf84f 100644
--- 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamConfig.java
+++ 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamConfig.java
@@ -31,7 +31,7 @@ import org.apache.flink.runtime.util.ClassLoaderUtil;
 import org.apache.flink.streaming.api.CheckpointingMode;
 import org.apache.flink.streaming.api.collector.selector.OutputSelectorWrapper;
 import org.apache.flink.streaming.api.operators.StreamOperator;
-import org.apache.flink.streaming.api.state.StateBackend;
+import org.apache.flink.runtime.state.StateBackend;
 import org.apache.flink.streaming.runtime.tasks.StreamTaskException;
 import org.apache.flink.util.InstantiationUtil;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/a6890b28/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java
index 0652406..be020d7 100644
--- 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java
+++ 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java
@@ -50,7 +50,7 @@ import 
org.apache.flink.streaming.api.operators.OutputTypeConfigurable;
 import org.apache.flink.streaming.api.operators.StreamOperator;
 import org.apache.flink.streaming.api.operators.StreamSource;
 import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
-import org.apache.flink.streaming.api.state.StateBackend;
+import org.apache.flink.runtime.state.StateBackend;
 import org.apache.flink.streaming.runtime.partitioner.ForwardPartitioner;
 import org.apache.flink.streaming.runtime.partitioner.RebalancePartitioner;
 import org.apache.flink.streaming.runtime.partitioner.StreamPartitioner;

http://git-wip-us.apache.org/repos/asf/flink/blob/a6890b28/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
index 9e60e9a..078679d 100644
--- 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
+++ 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
@@ -24,9 +24,9 @@ import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.java.functions.KeySelector;
 import org.apache.flink.streaming.api.graph.StreamConfig;
-import org.apache.flink.streaming.api.state.KvState;
-import org.apache.flink.streaming.api.state.KvStateSnapshot;
-import org.apache.flink.streaming.api.state.StateBackend;
+import org.apache.flink.runtime.state.KvState;
+import org.apache.flink.runtime.state.KvStateSnapshot;
+import org.apache.flink.runtime.state.StateBackend;
 import org.apache.flink.streaming.runtime.operators.Triggerable;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.streaming.runtime.tasks.StreamTask;

http://git-wip-us.apache.org/repos/asf/flink/blob/a6890b28/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperator.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperator.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperator.java
index a991fd3..17bd08d 100644
--- 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperator.java
+++ 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperator.java
@@ -27,7 +27,7 @@ import org.apache.flink.runtime.state.StateHandle;
 import org.apache.flink.streaming.api.checkpoint.CheckpointNotifier;
 import org.apache.flink.streaming.api.checkpoint.Checkpointed;
 import org.apache.flink.streaming.api.graph.StreamConfig;
-import org.apache.flink.streaming.api.state.StateBackend;
+import org.apache.flink.runtime.state.StateBackend;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.streaming.runtime.tasks.StreamTask;
 import org.apache.flink.streaming.runtime.tasks.StreamTaskState;

http://git-wip-us.apache.org/repos/asf/flink/blob/a6890b28/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/AbstractHeapKvState.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/AbstractHeapKvState.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/AbstractHeapKvState.java
deleted file mode 100644
index b974674..0000000
--- 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/AbstractHeapKvState.java
+++ /dev/null
@@ -1,145 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.state;
-
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.core.memory.DataOutputView;
-
-import java.io.IOException;
-import java.util.HashMap;
-import java.util.Map;
-
-import static java.util.Objects.requireNonNull;
-
-/**
- * Base class for key/value state implementations that are backed by a regular 
heap hash map. The
- * concrete implementations define how the state is checkpointed.
- * 
- * @param <K> The type of the key.
- * @param <V> The type of the value.
- * @param <Backend> The type of the backend that snapshots this key/value 
state.
- */
-public abstract class AbstractHeapKvState<K, V, Backend extends 
StateBackend<Backend>> implements KvState<K, V, Backend> {
-
-       /** Map containing the actual key/value pairs */
-       private final HashMap<K, V> state;
-       
-       /** The serializer for the keys */
-       private final TypeSerializer<K> keySerializer;
-
-       /** The serializer for the values */
-       private final TypeSerializer<V> valueSerializer;
-       
-       /** The value that is returned when no other value has been associated 
with a key, yet */
-       private final V defaultValue;
-       
-       /** The current key, which the next value methods will refer to */
-       private K currentKey;
-       
-       /**
-        * Creates a new empty key/value state.
-        * 
-        * @param keySerializer The serializer for the keys.
-        * @param valueSerializer The serializer for the values.
-        * @param defaultValue The value that is returned when no other value 
has been associated with a key, yet.
-        */
-       protected AbstractHeapKvState(TypeSerializer<K> keySerializer,
-                                                                       
TypeSerializer<V> valueSerializer,
-                                                                       V 
defaultValue) {
-               this(keySerializer, valueSerializer, defaultValue, new 
HashMap<K, V>());
-       }
-
-       /**
-        * Creates a new key/value state for the given hash map of key/value 
pairs.
-        * 
-        * @param keySerializer The serializer for the keys.
-        * @param valueSerializer The serializer for the values.
-        * @param defaultValue The value that is returned when no other value 
has been associated with a key, yet.
-        * @param state The state map to use in this kev/value state. May 
contain initial state.   
-        */
-       protected AbstractHeapKvState(TypeSerializer<K> keySerializer,
-                                                                       
TypeSerializer<V> valueSerializer,
-                                                                       V 
defaultValue,
-                                                                       
HashMap<K, V> state) {
-               this.state = requireNonNull(state);
-               this.keySerializer = requireNonNull(keySerializer);
-               this.valueSerializer = requireNonNull(valueSerializer);
-               this.defaultValue = defaultValue;
-       }
-
-       // 
------------------------------------------------------------------------
-       
-       @Override
-       public V value() {
-               V value = state.get(currentKey);
-               return value != null ? value : defaultValue;
-       }
-
-       @Override
-       public void update(V value) {
-               if (value != null) {
-                       state.put(currentKey, value);
-               }
-               else {
-                       state.remove(currentKey);
-               }
-       }
-
-       @Override
-       public void setCurrentKey(K currentKey) {
-               this.currentKey = currentKey;
-       }
-
-       @Override
-       public int size() {
-               return state.size();
-       }
-
-       @Override
-       public void dispose() {
-               state.clear();
-       }
-
-       /**
-        * Gets the serializer for the keys.
-        * @return The serializer for the keys.
-        */
-       public TypeSerializer<K> getKeySerializer() {
-               return keySerializer;
-       }
-
-       /**
-        * Gets the serializer for the values.
-        * @return The serializer for the values.
-        */
-       public TypeSerializer<V> getValueSerializer() {
-               return valueSerializer;
-       }
-
-       // 
------------------------------------------------------------------------
-       //  checkpointing utilities
-       // 
------------------------------------------------------------------------
-       
-       protected void writeStateToOutputView(final DataOutputView out) throws 
IOException {
-               for (Map.Entry<K, V> entry : state.entrySet()) {
-                       keySerializer.serialize(entry.getKey(), out);
-                       valueSerializer.serialize(entry.getValue(), out);
-               }
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/a6890b28/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/KvState.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/KvState.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/KvState.java
deleted file mode 100644
index 9c628f8..0000000
--- 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/KvState.java
+++ /dev/null
@@ -1,69 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.state;
-
-import org.apache.flink.api.common.state.OperatorState;
-
-/**
- * Key/Value state implementation for user-defined state. The state is backed 
by a state
- * backend, which typically follows one of the following patterns: Either the 
state is stored
- * in the key/value state object directly (meaning in the executing JVM) and 
snapshotted by the
- * state backend into some store (during checkpoints), or the key/value state 
is in fact backed
- * by an external key/value store as the state backend, and checkpoints merely 
record the
- * metadata of what is considered part of the checkpoint.
- * 
- * @param <K> The type of the key.
- * @param <V> The type of the value.
- */
-public interface KvState<K, V, Backend extends StateBackend<Backend>> extends 
OperatorState<V> {
-
-       /**
-        * Sets the current key, which will be used to retrieve values for the 
next calls to
-        * {@link #value()} and {@link #update(Object)}.
-        * 
-        * @param key The key.
-        */
-       void setCurrentKey(K key);
-
-       /**
-        * Creates a snapshot of this state.
-        * 
-        * @param checkpointId The ID of the checkpoint for which the snapshot 
should be created.
-        * @param timestamp The timestamp of the checkpoint.
-        * @return A snapshot handle for this key/value state.
-        * 
-        * @throws Exception Exceptions during snapshotting the state should be 
forwarded, so the system
-        *                   can react to failed snapshots.
-        */
-       KvStateSnapshot<K, V, Backend> shapshot(long checkpointId, long 
timestamp) throws Exception;
-
-       /**
-        * Gets the number of key/value pairs currently stored in the state. 
Note that is a key
-        * has been associated with "null", the key is removed from the state 
an will not
-        * be counted here.
-        *
-        * @return The number of key/value pairs currently stored in the state.
-        */
-       int size();
-
-       /**
-        * Disposes the key/value state, releasing all occupied resources.
-        */
-       void dispose();
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/a6890b28/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/KvStateSnapshot.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/KvStateSnapshot.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/KvStateSnapshot.java
deleted file mode 100644
index 6aa7a1e..0000000
--- 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/KvStateSnapshot.java
+++ /dev/null
@@ -1,69 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.state;
-
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-
-/**
- * This class represents a snapshot of the {@link KvState}, taken for a 
checkpoint. Where exactly
- * the snapshot stores the snapshot data (in this object, in an external data 
store, etc) depends
- * on the actual implementation. This snapshot defines merely how to restore 
the state and
- * how to discard the state.
- *
- * <p>One possible implementation is that this snapshot simply contains a copy 
of the key/value map.
- * 
- * <p>Another possible implementation for this snapshot is that the key/value 
map is serialized into
- * a file and this snapshot object contains a pointer to that file.
- *
- * @param <K> The type of the key
- * @param <V> The type of the value
- * @param <Backend> The type of the backend that can restore the state from 
this snapshot.
- */
-public interface KvStateSnapshot<K, V, Backend extends StateBackend<Backend>> 
extends java.io.Serializable {
-
-       /**
-        * Loads the key/value state back from this snapshot.
-        * 
-        * 
-        * @param stateBackend The state backend that created this snapshot and 
can restore the key/value state
-        *                     from this snapshot.
-        * @param keySerializer The serializer for the keys.
-        * @param valueSerializer The serializer for the values.
-        * @param defaultValue The value that is returned when no other value 
has been associated with a key, yet.   
-        * @param classLoader The class loader for user-defined types.
-        * 
-        * @return An instance of the key/value state loaded from this snapshot.
-        * 
-        * @throws Exception Exceptions can occur during the state loading and 
are forwarded. 
-        */
-       KvState<K, V, Backend> restoreState(
-                       Backend stateBackend,
-                       TypeSerializer<K> keySerializer,
-                       TypeSerializer<V> valueSerializer,
-                       V defaultValue,
-                       ClassLoader classLoader) throws Exception;
-
-
-       /**
-        * Discards the state snapshot, removing any resources occupied by it.
-        * 
-        * @throws Exception Exceptions occurring during the state disposal 
should be forwarded.
-        */
-       void discardState() throws Exception;
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/a6890b28/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/SerializedCheckpointData.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/SerializedCheckpointData.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/SerializedCheckpointData.java
deleted file mode 100644
index 2bbb4e2..0000000
--- 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/SerializedCheckpointData.java
+++ /dev/null
@@ -1,176 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.state;
-
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.runtime.util.DataInputDeserializer;
-import org.apache.flink.runtime.util.DataOutputSerializer;
-
-import java.io.IOException;
-import java.util.ArrayDeque;
-import java.util.ArrayList;
-import java.util.List;
-
-/**
- * This class represents serialized checkpoint data for a collection of 
elements.
- */
-public class SerializedCheckpointData implements java.io.Serializable {
-
-       private static final long serialVersionUID = -8783744683896503488L;
-       
-       /** ID of the checkpoint for which the IDs are stored */
-       private final long checkpointId;
-
-       /** The serialized elements */
-       private final byte[] serializedData;
-
-       /** The number of elements in the checkpoint */
-       private final int numIds;
-
-       /**
-        * Creates a SerializedCheckpointData object for the given serialized 
data.
-        * 
-        * @param checkpointId The checkpointId of the checkpoint.
-        * @param serializedData The serialized IDs in this checkpoint.
-        * @param numIds The number of IDs in the checkpoint.
-        */
-       public SerializedCheckpointData(long checkpointId, byte[] 
serializedData, int numIds) {
-               this.checkpointId = checkpointId;
-               this.serializedData = serializedData;
-               this.numIds = numIds;
-       }
-
-       /**
-        * Gets the checkpointId of the checkpoint.
-        * @return The checkpointId of the checkpoint.
-        */
-       public long getCheckpointId() {
-               return checkpointId;
-       }
-
-       /**
-        * Gets the binary data for the serialized elements.
-        * @return The binary data for the serialized elements.
-        */
-       public byte[] getSerializedData() {
-               return serializedData;
-       }
-
-       /**
-        * Gets the number of IDs in the checkpoint.
-        * @return The number of IDs in the checkpoint.
-        */
-       public int getNumIds() {
-               return numIds;
-       }
-
-       // 
------------------------------------------------------------------------
-       //  Serialize to Checkpoint
-       // 
------------------------------------------------------------------------
-
-       /**
-        * Converts a list of checkpoints with elements into an array of 
SerializedCheckpointData.
-        * 
-        * @param checkpoints The checkpoints to be converted into 
IdsCheckpointData.
-        * @param serializer The serializer to serialize the IDs.
-        * @param <T> The type of the ID.
-        * @return An array of serializable SerializedCheckpointData, one per 
entry in the 
-        * 
-        * @throws IOException Thrown, if the serialization fails.
-        */
-       public static <T> SerializedCheckpointData[] 
fromDeque(ArrayDeque<Tuple2<Long, List<T>>> checkpoints,
-                                                                               
                TypeSerializer<T> serializer) throws IOException {
-               return fromDeque(checkpoints, serializer, new 
DataOutputSerializer(128));
-       }
-
-       /**
-        * Converts a list of checkpoints into an array of 
SerializedCheckpointData.
-        *
-        * @param checkpoints The checkpoints to be converted into 
IdsCheckpointData.
-        * @param serializer The serializer to serialize the IDs.
-        * @param outputBuffer The reusable serialization buffer.
-        * @param <T> The type of the ID.
-        * @return An array of serializable SerializedCheckpointData, one per 
entry in the 
-        *
-        * @throws IOException Thrown, if the serialization fails.
-        */
-       public static <T> SerializedCheckpointData[] 
fromDeque(ArrayDeque<Tuple2<Long, List<T>>> checkpoints,
-                                                                               
                TypeSerializer<T> serializer,
-                                                                               
                DataOutputSerializer outputBuffer) throws IOException {
-               SerializedCheckpointData[] serializedCheckpoints = new 
SerializedCheckpointData[checkpoints.size()];
-               
-               int pos = 0;
-               for (Tuple2<Long, List<T>> checkpoint : checkpoints) {
-                       outputBuffer.clear();
-                       List<T> checkpointIds = checkpoint.f1;
-                       
-                       for (T id : checkpointIds) {
-                               serializer.serialize(id, outputBuffer);
-                       }
-
-                       serializedCheckpoints[pos++] = new 
SerializedCheckpointData(
-                                       checkpoint.f0, 
outputBuffer.getCopyOfBuffer(), checkpointIds.size());
-               }
-               
-               return serializedCheckpoints;
-       }
-
-       // 
------------------------------------------------------------------------
-       //  De-Serialize from Checkpoint
-       // 
------------------------------------------------------------------------
-
-       /**
-        * De-serializes an array of SerializedCheckpointData back into an 
ArrayDeque of element checkpoints.
-        * 
-        * @param data The data to be deserialized.
-        * @param serializer The serializer used to deserialize the data.
-        * @param <T> The type of the elements.
-        * @return An ArrayDeque of element checkpoints.
-        * 
-        * @throws IOException Thrown, if the serialization fails.
-        */
-       public static <T> ArrayDeque<Tuple2<Long, List<T>>> toDeque(
-                       SerializedCheckpointData[] data, TypeSerializer<T> 
serializer) throws IOException
-       {
-               ArrayDeque<Tuple2<Long, List<T>>> deque = new 
ArrayDeque<>(data.length);
-               DataInputDeserializer deser = null;
-               
-               for (SerializedCheckpointData checkpoint : data) {
-                       byte[] serializedData = checkpoint.getSerializedData();
-                       if (deser == null) {
-                               deser = new 
DataInputDeserializer(serializedData, 0, serializedData.length);
-                       }
-                       else {
-                               deser.setBuffer(serializedData, 0, 
serializedData.length);
-                       }
-                       
-                       final List<T> ids = new 
ArrayList<>(checkpoint.getNumIds());
-                       final int numIds = checkpoint.getNumIds();
-                       
-                       for (int i = 0; i < numIds; i++) {
-                               ids.add(serializer.deserialize(deser));
-                       }
-
-                       deque.addLast(new Tuple2<Long, 
List<T>>(checkpoint.checkpointId, ids));
-               }
-               
-               return deque;
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/a6890b28/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/StateBackend.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/StateBackend.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/StateBackend.java
deleted file mode 100644
index f4391ad..0000000
--- 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/StateBackend.java
+++ /dev/null
@@ -1,214 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.state;
-
-import org.apache.flink.api.common.JobID;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.core.memory.DataInputView;
-import org.apache.flink.core.memory.DataInputViewStreamWrapper;
-import org.apache.flink.core.memory.DataOutputView;
-import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
-import org.apache.flink.runtime.state.StateHandle;
-
-import java.io.IOException;
-import java.io.OutputStream;
-import java.io.Serializable;
-
-/**
- * A state backend defines how state is stored and snapshotted during 
checkpoints.
- * 
- * @param <Backend> The type of backend itself. This generic parameter is used 
to refer to the
- *                  type of backend when creating state backed by this backend.
- */
-public abstract class StateBackend<Backend extends StateBackend<Backend>> 
implements java.io.Serializable {
-       
-       private static final long serialVersionUID = 4620413814639220247L;
-       
-       // 
------------------------------------------------------------------------
-       //  initialization and cleanup
-       // 
------------------------------------------------------------------------
-       
-       /**
-        * This method is called by the task upon deployment to initialize the 
state backend for
-        * data for a specific job.
-        * 
-        * @param job The ID of the job for which the state backend instance 
checkpoints data.
-        * @throws Exception Overwritten versions of this method may throw 
exceptions, in which
-        *                   case the job that uses the state backend is 
considered failed during
-        *                   deployment.
-        */
-       public abstract void initializeForJob(JobID job) throws Exception;
-
-       /**
-        * Disposes all state associated with the current job.
-        * 
-        * @throws Exception Exceptions may occur during disposal of the state 
and should be forwarded.
-        */
-       public abstract void disposeAllStateForCurrentJob() throws Exception;
-
-       /**
-        * Closes the state backend, releasing all internal resources, but does 
not delete any persistent
-        * checkpoint data.
-        * 
-        * @throws Exception Exceptions can be forwarded and will be logged by 
the system
-        */
-       public abstract void close() throws Exception;
-       
-       // 
------------------------------------------------------------------------
-       //  key/value state
-       // 
------------------------------------------------------------------------
-
-       /**
-        * Creates a key/value state backed by this state backend.
-        * 
-        * @param keySerializer The serializer for the key.
-        * @param valueSerializer The serializer for the value.
-        * @param defaultValue The value that is returned when no other value 
has been associated with a key, yet.
-        * @param <K> The type of the key.
-        * @param <V> The type of the value.
-        * 
-        * @return A new key/value state backed by this backend.
-        * 
-        * @throws Exception Exceptions may occur during initialization of the 
state and should be forwarded.
-        */
-       public abstract <K, V> KvState<K, V, Backend> createKvState(
-                       TypeSerializer<K> keySerializer, TypeSerializer<V> 
valueSerializer,
-                       V defaultValue) throws Exception;
-       
-       
-       // 
------------------------------------------------------------------------
-       //  storing state for a checkpoint
-       // 
------------------------------------------------------------------------
-
-       /**
-        * Creates an output stream that writes into the state of the given 
checkpoint. When the stream
-        * is closes, it returns a state handle that can retrieve the state 
back.
-        * 
-        * @param checkpointID The ID of the checkpoint.
-        * @param timestamp The timestamp of the checkpoint.
-        * @return An output stream that writes state for the given checkpoint.
-        * 
-        * @throws Exception Exceptions may occur while creating the stream and 
should be forwarded.
-        */
-       public abstract CheckpointStateOutputStream 
createCheckpointStateOutputStream(
-                       long checkpointID, long timestamp) throws Exception;
-       
-       /**
-        * Creates a {@link DataOutputView} stream that writes into the state 
of the given checkpoint.
-        * When the stream is closes, it returns a state handle that can 
retrieve the state back.
-        *
-        * @param checkpointID The ID of the checkpoint.
-        * @param timestamp The timestamp of the checkpoint.
-        * @return An DataOutputView stream that writes state for the given 
checkpoint.
-        *
-        * @throws Exception Exceptions may occur while creating the stream and 
should be forwarded.
-        */
-       public CheckpointStateOutputView createCheckpointStateOutputView(
-                       long checkpointID, long timestamp) throws Exception {
-               return new 
CheckpointStateOutputView(createCheckpointStateOutputStream(checkpointID, 
timestamp));
-       }
-
-       /**
-        * Writes the given state into the checkpoint, and returns a handle 
that can retrieve the state back.
-        * 
-        * @param state The state to be checkpointed.
-        * @param checkpointID The ID of the checkpoint.
-        * @param timestamp The timestamp of the checkpoint.
-        * @param <S> The type of the state.
-        * 
-        * @return A state handle that can retrieve the checkpoined state.
-        * 
-        * @throws Exception Exceptions may occur during serialization / 
storing the state and should be forwarded.
-        */
-       public abstract <S extends Serializable> StateHandle<S> 
checkpointStateSerializable(
-                       S state, long checkpointID, long timestamp) throws 
Exception;
-       
-       
-       // 
------------------------------------------------------------------------
-       //  Checkpoint state output stream
-       // 
------------------------------------------------------------------------
-
-       /**
-        * A dedicated output stream that produces a {@link StreamStateHandle} 
when closed.
-        */
-       public static abstract class CheckpointStateOutputStream extends 
OutputStream {
-
-               /**
-                * Closes the stream and gets a state handle that can create an 
input stream
-                * producing the data written to this stream.
-                * 
-                * @return A state handle that can create an input stream 
producing the data written to this stream.
-                * @throws IOException Thrown, if the stream cannot be closed.
-                */
-               public abstract StreamStateHandle closeAndGetHandle() throws 
IOException;
-       }
-
-       /**
-        * A dedicated DataOutputView stream that produces a {@code 
StateHandle<DataInputView>} when closed.
-        */
-       public static final class CheckpointStateOutputView extends 
DataOutputViewStreamWrapper {
-               
-               private final CheckpointStateOutputStream out;
-               
-               public CheckpointStateOutputView(CheckpointStateOutputStream 
out) {
-                       super(out);
-                       this.out = out;
-               }
-
-               /**
-                * Closes the stream and gets a state handle that can create a 
DataInputView.
-                * producing the data written to this stream.
-                *
-                * @return A state handle that can create an input stream 
producing the data written to this stream.
-                * @throws IOException Thrown, if the stream cannot be closed.
-                */
-               public StateHandle<DataInputView> closeAndGetHandle() throws 
IOException {
-                       return new DataInputViewHandle(out.closeAndGetHandle());
-               }
-
-               @Override
-               public void close() throws IOException {
-                       out.close();
-               }
-       }
-
-       /**
-        * Simple state handle that resolved a {@link DataInputView} from a 
StreamStateHandle.
-        */
-       private static final class DataInputViewHandle implements 
StateHandle<DataInputView> {
-
-               private static final long serialVersionUID = 
2891559813513532079L;
-               
-               private final StreamStateHandle stream;
-
-               private DataInputViewHandle(StreamStateHandle stream) {
-                       this.stream = stream;
-               }
-
-               @Override
-               public DataInputView getState(ClassLoader userCodeClassLoader) 
throws Exception {
-                       return new 
DataInputViewStreamWrapper(stream.getState(userCodeClassLoader)); 
-               }
-
-               @Override
-               public void discardState() throws Exception {
-                       stream.discardState();
-               }
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/a6890b28/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/StateBackendFactory.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/StateBackendFactory.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/StateBackendFactory.java
deleted file mode 100644
index ad87eae..0000000
--- 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/StateBackendFactory.java
+++ /dev/null
@@ -1,40 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.state;
-
-import org.apache.flink.configuration.Configuration;
-
-/**
- * A factory to create a specific state backend. The state backend creation 
gets a Configuration
- * object that can be used to read further config values.
- * 
- * @param <T> The type of the state backend created.
- */
-public interface StateBackendFactory<T extends StateBackend<T>> {
-
-       /**
-        * Creates the state backend, optionally using the given configuration.
-        * 
-        * @param config The Flink configuration (loaded by the TaskManager).
-        * @return The created state backend. 
-        * 
-        * @throws Exception Exceptions during instantiation can be forwarded.
-        */
-       StateBackend<T> createFromConfig(Configuration config) throws Exception;
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/a6890b28/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/StreamStateHandle.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/StreamStateHandle.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/StreamStateHandle.java
deleted file mode 100644
index 0fa5952..0000000
--- 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/StreamStateHandle.java
+++ /dev/null
@@ -1,28 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.state;
-
-import org.apache.flink.runtime.state.StateHandle;
-
-import java.io.InputStream;
-
-/**
- * A state handle that produces an input stream when resolved.
- */
-public interface StreamStateHandle extends StateHandle<InputStream> {}

http://git-wip-us.apache.org/repos/asf/flink/blob/a6890b28/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/filesystem/AbstractFileState.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/filesystem/AbstractFileState.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/filesystem/AbstractFileState.java
deleted file mode 100644
index c4a376e..0000000
--- 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/filesystem/AbstractFileState.java
+++ /dev/null
@@ -1,83 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.state.filesystem;
-
-import org.apache.flink.core.fs.FileSystem;
-import org.apache.flink.core.fs.Path;
-
-import java.io.IOException;
-
-/**
- * Base class for state that is stored in a file.
- */
-public abstract class AbstractFileState implements java.io.Serializable {
-       
-       private static final long serialVersionUID = 350284443258002355L;
-       
-       /** The path to the file in the filesystem, fully describing the file 
system */
-       private final Path filePath;
-
-       /** Cached file system handle */
-       private transient FileSystem fs;
-
-       /**
-        * Creates a new file state for the given file path.
-        * 
-        * @param filePath The path to the file that stores the state.
-        */
-       protected AbstractFileState(Path filePath) {
-               this.filePath = filePath;
-       }
-
-       /**
-        * Gets the path where this handle's state is stored.
-        * @return The path where this handle's state is stored.
-        */
-       public Path getFilePath() {
-               return filePath;
-       }
-
-       /**
-        * Discard the state by deleting the file that stores the state. If the 
parent directory
-        * of the state is empty after deleting the state file, it is also 
deleted.
-        * 
-        * @throws Exception Thrown, if the file deletion (not the directory 
deletion) fails.
-        */
-       public void discardState() throws Exception {
-               getFileSystem().delete(filePath, false);
-
-               // send a call to delete the directory containing the file. 
this will
-               // fail (and be ignored) when some files still exist
-               try {
-                       getFileSystem().delete(filePath.getParent(), false);
-               } catch (IOException ignored) {}
-       }
-
-       /**
-        * Gets the file system that stores the file state.
-        * @return The file system that stores the file state.
-        * @throws IOException Thrown if the file system cannot be accessed.
-        */
-       protected FileSystem getFileSystem() throws IOException {
-               if (fs == null) {
-                       fs = FileSystem.get(filePath.toUri());
-               }
-               return fs;
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/a6890b28/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/filesystem/FileSerializableStateHandle.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/filesystem/FileSerializableStateHandle.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/filesystem/FileSerializableStateHandle.java
deleted file mode 100644
index 9bf5ec1..0000000
--- 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/filesystem/FileSerializableStateHandle.java
+++ /dev/null
@@ -1,53 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.state.filesystem;
-
-import org.apache.flink.core.fs.FSDataInputStream;
-import org.apache.flink.core.fs.Path;
-import org.apache.flink.runtime.state.StateHandle;
-import org.apache.flink.util.InstantiationUtil;
-
-import java.io.ObjectInputStream;
-
-/**
- * A state handle that points to state stored in a file via Java Serialization.
- * 
- * @param <T> The type of state pointed to by the state handle.
- */
-public class FileSerializableStateHandle<T> extends AbstractFileState 
implements StateHandle<T> {
-
-       private static final long serialVersionUID = -657631394290213622L;
-       
-       /**
-        * Creates a new FileSerializableStateHandle pointing to state at the 
given file path.
-        * 
-        * @param filePath The path to the file containing the checkpointed 
state.
-        */
-       public FileSerializableStateHandle(Path filePath) {
-               super(filePath);
-       }
-
-       @Override
-       @SuppressWarnings("unchecked")
-       public T getState(ClassLoader classLoader) throws Exception {
-               FSDataInputStream inStream = 
getFileSystem().open(getFilePath());
-               ObjectInputStream ois = new 
InstantiationUtil.ClassLoaderObjectInputStream(inStream, classLoader);
-               return (T) ois.readObject();
-       }
-}

Reply via email to