Repository: flink
Updated Branches:
  refs/heads/release-1.2 b703a24d4 -> c6a807250


http://git-wip-us.apache.org/repos/asf/flink/blob/c6a80725/flink-runtime/src/test/java/org/apache/flink/runtime/state/AsyncFileStateBackendTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/AsyncFileStateBackendTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/AsyncFileStateBackendTest.java
new file mode 100644
index 0000000..255bd46
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/AsyncFileStateBackendTest.java
@@ -0,0 +1,213 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.core.testutils.CommonTestUtils;
+import org.apache.flink.runtime.state.filesystem.FileStateHandle;
+import org.apache.flink.runtime.state.filesystem.FsStateBackend;
+import org.apache.flink.runtime.state.filesystem.async.AsyncFsStateBackend;
+import org.apache.flink.runtime.state.memory.ByteStreamStateHandle;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.URI;
+import java.util.Random;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+public class AsyncFileStateBackendTest extends 
StateBackendTestBase<AsyncFsStateBackend> {
+
+       @Rule
+       public TemporaryFolder tempFolder = new TemporaryFolder();
+
+       @Override
+       protected AsyncFsStateBackend getStateBackend() throws Exception {
+               File checkpointPath = tempFolder.newFolder();
+               return new AsyncFsStateBackend(localFileUri(checkpointPath));
+       }
+
+       // disable these because the verification does not work for this state 
backend
+       @Override
+       @Test
+       public void testValueStateRestoreWithWrongSerializers() {}
+
+       @Override
+       @Test
+       public void testListStateRestoreWithWrongSerializers() {}
+
+       @Override
+       @Test
+       public void testReducingStateRestoreWithWrongSerializers() {}
+
+       @Test
+       public void testStateOutputStream() throws IOException {
+               File basePath = tempFolder.newFolder().getAbsoluteFile();
+
+               try {
+                       // the state backend has a very low in-mem state 
threshold (15 bytes)
+                       FsStateBackend backend = 
CommonTestUtils.createCopySerializable(new FsStateBackend(basePath.toURI(), 
15));
+                       JobID jobId = new JobID();
+
+                       // we know how FsCheckpointStreamFactory is implemented 
so we know where it
+                       // will store checkpoints
+                       File checkpointPath = new 
File(basePath.getAbsolutePath(), jobId.toString());
+
+                       CheckpointStreamFactory streamFactory = 
backend.createStreamFactory(jobId, "test_op");
+
+                       byte[] state1 = new byte[1274673];
+                       byte[] state2 = new byte[1];
+                       byte[] state3 = new byte[0];
+                       byte[] state4 = new byte[177];
+
+                       Random rnd = new Random();
+                       rnd.nextBytes(state1);
+                       rnd.nextBytes(state2);
+                       rnd.nextBytes(state3);
+                       rnd.nextBytes(state4);
+
+                       long checkpointId = 97231523452L;
+
+                       CheckpointStreamFactory.CheckpointStateOutputStream 
stream1 =
+                                       
streamFactory.createCheckpointStateOutputStream(checkpointId, 
System.currentTimeMillis());
+
+                       CheckpointStreamFactory.CheckpointStateOutputStream 
stream2 =
+                                       
streamFactory.createCheckpointStateOutputStream(checkpointId, 
System.currentTimeMillis());
+
+                       CheckpointStreamFactory.CheckpointStateOutputStream 
stream3 =
+                                       
streamFactory.createCheckpointStateOutputStream(checkpointId, 
System.currentTimeMillis());
+
+                       stream1.write(state1);
+                       stream2.write(state2);
+                       stream3.write(state3);
+
+                       FileStateHandle handle1 = (FileStateHandle) 
stream1.closeAndGetHandle();
+                       ByteStreamStateHandle handle2 = (ByteStreamStateHandle) 
stream2.closeAndGetHandle();
+                       ByteStreamStateHandle handle3 = (ByteStreamStateHandle) 
stream3.closeAndGetHandle();
+
+                       // use with try-with-resources
+                       StreamStateHandle handle4;
+                       try 
(CheckpointStreamFactory.CheckpointStateOutputStream stream4 =
+                                       
streamFactory.createCheckpointStateOutputStream(checkpointId, 
System.currentTimeMillis())) {
+                               stream4.write(state4);
+                               handle4 = stream4.closeAndGetHandle();
+                       }
+
+                       // close before accessing handle
+                       CheckpointStreamFactory.CheckpointStateOutputStream 
stream5 =
+                                       
streamFactory.createCheckpointStateOutputStream(checkpointId, 
System.currentTimeMillis());
+                       stream5.write(state4);
+                       stream5.close();
+                       try {
+                               stream5.closeAndGetHandle();
+                               fail();
+                       } catch (IOException e) {
+                               // uh-huh
+                       }
+
+                       validateBytesInStream(handle1.openInputStream(), 
state1);
+                       handle1.discardState();
+                       assertFalse(isDirectoryEmpty(basePath));
+                       ensureLocalFileDeleted(handle1.getFilePath());
+
+                       validateBytesInStream(handle2.openInputStream(), 
state2);
+                       handle2.discardState();
+
+                       // nothing was written to the stream, so it will return 
nothing
+                       assertNull(handle3);
+
+                       validateBytesInStream(handle4.openInputStream(), 
state4);
+                       handle4.discardState();
+                       assertTrue(isDirectoryEmpty(checkpointPath));
+               }
+               catch (Exception e) {
+                       e.printStackTrace();
+                       fail(e.getMessage());
+               }
+       }
+
+       // 
------------------------------------------------------------------------
+       //  Utilities
+       // 
------------------------------------------------------------------------
+
+       private static void ensureLocalFileDeleted(Path path) {
+               URI uri = path.toUri();
+               if ("file".equals(uri.getScheme())) {
+                       File file = new File(uri.getPath());
+                       assertFalse("file not properly deleted", file.exists());
+               }
+               else {
+                       throw new IllegalArgumentException("not a local path");
+               }
+       }
+
+       private static void deleteDirectorySilently(File dir) {
+               try {
+                       FileUtils.deleteDirectory(dir);
+               }
+               catch (IOException ignored) {}
+       }
+
+       private static boolean isDirectoryEmpty(File directory) {
+               if (!directory.exists()) {
+                       return true;
+               }
+               String[] nested = directory.list();
+               return nested == null || nested.length == 0;
+       }
+
+       private static String localFileUri(File path) {
+               return path.toURI().toString();
+       }
+
+       private static void validateBytesInStream(InputStream is, byte[] data) 
throws IOException {
+               try {
+                       byte[] holder = new byte[data.length];
+
+                       int pos = 0;
+                       int read;
+                       while (pos < holder.length && (read = is.read(holder, 
pos, holder.length - pos)) != -1) {
+                               pos += read;
+                       }
+
+                       assertEquals("not enough data", holder.length, pos);
+                       assertEquals("too much data", -1, is.read());
+                       assertArrayEquals("wrong data", data, holder);
+               } finally {
+                       is.close();
+               }
+       }
+
+       @Test
+       public void testConcurrentMapIfQueryable() throws Exception {
+               //unsupported
+       }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/c6a80725/flink-runtime/src/test/java/org/apache/flink/runtime/state/AsyncMemoryStateBackendTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/AsyncMemoryStateBackendTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/AsyncMemoryStateBackendTest.java
new file mode 100644
index 0000000..b1a323b
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/AsyncMemoryStateBackendTest.java
@@ -0,0 +1,197 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.state.ValueState;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.typeutils.base.IntSerializer;
+import org.apache.flink.runtime.state.heap.async.AsyncHeapKeyedStateBackend;
+import org.apache.flink.runtime.state.memory.MemoryStateBackend;
+import org.apache.flink.runtime.state.memory.async.AsyncMemoryStateBackend;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.util.HashMap;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+/**
+ * Tests for the {@link MemoryStateBackend}.
+ */
+public class AsyncMemoryStateBackendTest extends 
StateBackendTestBase<AsyncMemoryStateBackend> {
+
+       @Override
+       protected AsyncMemoryStateBackend getStateBackend() throws Exception {
+               return new AsyncMemoryStateBackend();
+       }
+
+       // disable these because the verification does not work for this state 
backend
+       @Override
+       @Test
+       public void testValueStateRestoreWithWrongSerializers() {}
+
+       @Override
+       @Test
+       public void testListStateRestoreWithWrongSerializers() {}
+
+       @Override
+       @Test
+       public void testReducingStateRestoreWithWrongSerializers() {}
+
+       @Test
+       @SuppressWarnings("unchecked, deprecation")
+       public void testNumStateEntries() throws Exception {
+               KeyedStateBackend<Integer> backend = 
createKeyedBackend(IntSerializer.INSTANCE);
+
+               ValueStateDescriptor<String> kvId = new 
ValueStateDescriptor<>("id", String.class, null);
+               kvId.initializeSerializerUnlessSet(new ExecutionConfig());
+
+               AsyncHeapKeyedStateBackend<Integer> heapBackend = 
(AsyncHeapKeyedStateBackend<Integer>) backend;
+
+               assertEquals(0, heapBackend.numStateEntries());
+
+               ValueState<String> state = 
backend.getPartitionedState(VoidNamespace.INSTANCE, 
VoidNamespaceSerializer.INSTANCE, kvId);
+
+               backend.setCurrentKey(0);
+               state.update("hello");
+               state.update("ciao");
+
+               assertEquals(1, heapBackend.numStateEntries());
+
+               backend.setCurrentKey(42);
+               state.update("foo");
+
+               assertEquals(2, heapBackend.numStateEntries());
+
+               backend.setCurrentKey(0);
+               state.clear();
+
+               assertEquals(1, heapBackend.numStateEntries());
+
+               backend.setCurrentKey(42);
+               state.clear();
+
+               assertEquals(0, heapBackend.numStateEntries());
+
+               backend.dispose();
+       }
+
+       @Test
+       public void testOversizedState() {
+               try {
+                       MemoryStateBackend backend = new MemoryStateBackend(10);
+                       CheckpointStreamFactory streamFactory = 
backend.createStreamFactory(new JobID(), "test_op");
+
+                       HashMap<String, Integer> state = new HashMap<>();
+                       state.put("hey there", 2);
+                       state.put("the crazy brown fox stumbles over a sentence 
that does not contain every letter", 77);
+
+                       try {
+                               
CheckpointStreamFactory.CheckpointStateOutputStream outStream =
+                                               
streamFactory.createCheckpointStateOutputStream(12, 459);
+
+                               ObjectOutputStream oos = new 
ObjectOutputStream(outStream);
+                               oos.writeObject(state);
+
+                               oos.flush();
+
+                               outStream.closeAndGetHandle();
+
+                               fail("this should cause an exception");
+                       }
+                       catch (IOException e) {
+                               // now darling, isn't that exactly what we 
wanted?
+                       }
+               }
+               catch (Exception e) {
+                       e.printStackTrace();
+                       fail(e.getMessage());
+               }
+       }
+
+       @Test
+       public void testStateStream() {
+               try {
+                       MemoryStateBackend backend = new MemoryStateBackend();
+                       CheckpointStreamFactory streamFactory = 
backend.createStreamFactory(new JobID(), "test_op");
+
+                       HashMap<String, Integer> state = new HashMap<>();
+                       state.put("hey there", 2);
+                       state.put("the crazy brown fox stumbles over a sentence 
that does not contain every letter", 77);
+
+                       CheckpointStreamFactory.CheckpointStateOutputStream os 
= streamFactory.createCheckpointStateOutputStream(1, 2);
+                       ObjectOutputStream oos = new ObjectOutputStream(os);
+                       oos.writeObject(state);
+                       oos.flush();
+                       StreamStateHandle handle = os.closeAndGetHandle();
+
+                       assertNotNull(handle);
+
+                       try (ObjectInputStream ois = new 
ObjectInputStream(handle.openInputStream())) {
+                               assertEquals(state, ois.readObject());
+                               assertTrue(ois.available() <= 0);
+                       }
+               }
+               catch (Exception e) {
+                       e.printStackTrace();
+                       fail(e.getMessage());
+               }
+       }
+
+       @Test
+       public void testOversizedStateStream() {
+               try {
+                       MemoryStateBackend backend = new MemoryStateBackend(10);
+                       CheckpointStreamFactory streamFactory = 
backend.createStreamFactory(new JobID(), "test_op");
+
+                       HashMap<String, Integer> state = new HashMap<>();
+                       state.put("hey there", 2);
+                       state.put("the crazy brown fox stumbles over a sentence 
that does not contain every letter", 77);
+
+                       CheckpointStreamFactory.CheckpointStateOutputStream os 
= streamFactory.createCheckpointStateOutputStream(1, 2);
+                       ObjectOutputStream oos = new ObjectOutputStream(os);
+
+                       try {
+                               oos.writeObject(state);
+                               oos.flush();
+                               os.closeAndGetHandle();
+                               fail("this should cause an exception");
+                       }
+                       catch (IOException e) {
+                               // oh boy! what an exception!
+                       }
+               }
+               catch (Exception e) {
+                       e.printStackTrace();
+                       fail(e.getMessage());
+               }
+       }
+
+       @Test
+       public void testConcurrentMapIfQueryable() throws Exception {
+               //unsupported
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/c6a80725/flink-runtime/src/test/java/org/apache/flink/runtime/state/MemoryStateBackendTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/MemoryStateBackendTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/MemoryStateBackendTest.java
index c267afc..b196e71 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/MemoryStateBackendTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/MemoryStateBackendTest.java
@@ -61,7 +61,7 @@ public class MemoryStateBackendTest extends 
StateBackendTestBase<MemoryStateBack
        public void testReducingStateRestoreWithWrongSerializers() {}
 
        @Test
-       @SuppressWarnings("unchecked")
+       @SuppressWarnings("unchecked, deprecation")
        public void testNumStateEntries() throws Exception {
                KeyedStateBackend<Integer> backend = 
createKeyedBackend(IntSerializer.INSTANCE);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/c6a80725/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
index e821bcf..61de1e3 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
@@ -39,6 +39,7 @@ import 
org.apache.flink.api.common.typeutils.base.LongSerializer;
 import org.apache.flink.api.common.typeutils.base.StringSerializer;
 import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
 import org.apache.flink.core.testutils.CheckedThread;
+import org.apache.flink.core.testutils.OneShotLatch;
 import org.apache.flink.runtime.checkpoint.StateAssignmentOperation;
 import org.apache.flink.runtime.execution.Environment;
 import org.apache.flink.runtime.operators.testutils.DummyEnvironment;
@@ -48,9 +49,13 @@ import 
org.apache.flink.runtime.query.KvStateRegistryListener;
 import org.apache.flink.runtime.query.netty.message.KvStateRequestSerializer;
 import org.apache.flink.runtime.state.heap.AbstractHeapState;
 import org.apache.flink.runtime.state.heap.StateTable;
+import org.apache.flink.runtime.state.heap.async.AsyncHeapKeyedStateBackend;
+import org.apache.flink.runtime.util.BlockerCheckpointStreamFactory;
 import org.apache.flink.types.IntValue;
 import org.apache.flink.util.FutureUtil;
+import org.apache.flink.util.IOUtils;
 import org.apache.flink.util.TestLogger;
+import org.junit.Assert;
 import org.junit.Test;
 
 import java.io.IOException;
@@ -60,6 +65,7 @@ import java.util.Random;
 import java.util.Timer;
 import java.util.TimerTask;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutionException;
 import java.util.concurrent.RunnableFuture;
 
 import static org.hamcrest.Matchers.containsInAnyOrder;
@@ -1432,6 +1438,150 @@ public abstract class StateBackendTestBase<B extends 
AbstractStateBackend> exten
                }
        }
 
+       @Test
+       public void testAsyncSnapshot() throws Exception {
+               OneShotLatch waiter = new OneShotLatch();
+               BlockerCheckpointStreamFactory streamFactory = new 
BlockerCheckpointStreamFactory(1024 * 1024);
+               streamFactory.setWaiterLatch(waiter);
+
+               AbstractKeyedStateBackend<Integer> backend = null;
+               KeyGroupsStateHandle stateHandle = null;
+
+               try {
+                       backend = createKeyedBackend(IntSerializer.INSTANCE);
+
+                       if (!(backend instanceof AsyncHeapKeyedStateBackend)) {
+                               return;
+                       }
+
+                       ValueState<Integer> valueState = 
backend.createValueState(
+                               VoidNamespaceSerializer.INSTANCE,
+                               new ValueStateDescriptor<>("test", 
IntSerializer.INSTANCE));
+
+                       
((KvState)valueState).setCurrentNamespace(VoidNamespace.INSTANCE);
+
+                       for (int i = 0; i < 10; ++i) {
+                               backend.setCurrentKey(i);
+                               valueState.update(i);
+                       }
+
+                       RunnableFuture<KeyGroupsStateHandle> snapshot =
+                               backend.snapshot(0L, 0L, streamFactory);
+                       Thread runner = new Thread(snapshot);
+                       runner.start();
+                       for (int i = 0; i < 20; ++i) {
+                               backend.setCurrentKey(i);
+                               valueState.update(i + 1);
+                               if (10 == i) {
+                                       waiter.await();
+                               }
+                       }
+
+                       runner.join();
+                       stateHandle = snapshot.get();
+
+                       // test isolation
+                       for (int i = 0; i < 20; ++i) {
+                               backend.setCurrentKey(i);
+                               Assert.assertEquals(i + 1, (int) 
valueState.value());
+                       }
+
+               } finally {
+                       if (null != backend) {
+                               IOUtils.closeQuietly(backend);
+                               backend.dispose();
+                       }
+               }
+
+               Assert.assertNotNull(stateHandle);
+
+               backend = createKeyedBackend(IntSerializer.INSTANCE);
+               try {
+                       backend.restore(Collections.singleton(stateHandle));
+                       ValueState<Integer> valueState = 
backend.createValueState(
+                               VoidNamespaceSerializer.INSTANCE,
+                               new ValueStateDescriptor<>("test", 
IntSerializer.INSTANCE));
+
+                       
((KvState)valueState).setCurrentNamespace(VoidNamespace.INSTANCE);
+
+                       for (int i = 0; i < 10; ++i) {
+                               backend.setCurrentKey(i);
+                               Assert.assertEquals(i, (int) 
valueState.value());
+                       }
+
+                       backend.setCurrentKey(11);
+                       Assert.assertEquals(null, valueState.value());
+               } finally {
+                       if (null != backend) {
+                               IOUtils.closeQuietly(backend);
+                               backend.dispose();
+                       }
+               }
+       }
+
+       @Test
+       public void testAsyncSnapshotCancellation() throws Exception {
+               OneShotLatch blocker = new OneShotLatch();
+               OneShotLatch waiter = new OneShotLatch();
+               BlockerCheckpointStreamFactory streamFactory = new 
BlockerCheckpointStreamFactory(1024 * 1024);
+               streamFactory.setWaiterLatch(waiter);
+               streamFactory.setBlockerLatch(blocker);
+               streamFactory.setAfterNumberInvocations(100);
+
+               AbstractKeyedStateBackend<Integer> backend = null;
+               try {
+                       backend = createKeyedBackend(IntSerializer.INSTANCE);
+
+                       if (!(backend instanceof AsyncHeapKeyedStateBackend)) {
+                               return;
+                       }
+
+                       ValueState<Integer> valueState = 
backend.createValueState(
+                               VoidNamespaceSerializer.INSTANCE,
+                               new ValueStateDescriptor<>("test", 
IntSerializer.INSTANCE));
+
+                       
((KvState)valueState).setCurrentNamespace(VoidNamespace.INSTANCE);
+
+                       for (int i = 0; i < 10; ++i) {
+                               backend.setCurrentKey(i);
+                               valueState.update(i);
+                       }
+
+                       RunnableFuture<KeyGroupsStateHandle> snapshot =
+                               backend.snapshot(0L, 0L, streamFactory);
+
+                       Thread runner = new Thread(snapshot);
+                       runner.start();
+
+                       // wait until the code reached some stream read
+                       waiter.await();
+
+                       // close the backend to see if the close is propagated 
to the stream
+                       backend.close();
+
+                       //unblock the stream so that it can run into the 
IOException
+                       blocker.trigger();
+
+                       //dispose the backend
+                       backend.dispose();
+
+                       runner.join();
+
+                       try {
+                               snapshot.get();
+                               fail("Close was not propagated.");
+                       } catch (ExecutionException ex) {
+                               //ignore
+                       }
+
+               } finally {
+                       if (null != backend) {
+                               IOUtils.closeQuietly(backend);
+                               backend.dispose();
+                       }
+               }
+       }
+
        private static class AppendingReduce implements ReduceFunction<String> {
                @Override
                public String reduce(String value1, String value2) throws 
Exception {

http://git-wip-us.apache.org/repos/asf/flink/blob/c6a80725/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/async/CopyOnWriteStateTableTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/async/CopyOnWriteStateTableTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/async/CopyOnWriteStateTableTest.java
new file mode 100644
index 0000000..fb36d67
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/async/CopyOnWriteStateTableTest.java
@@ -0,0 +1,486 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.heap.async;
+
+import org.apache.flink.api.common.state.StateDescriptor;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.base.IntSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.runtime.state.ArrayListSerializer;
+import org.apache.flink.runtime.state.KeyGroupRange;
+import org.apache.flink.runtime.state.RegisteredBackendStateMetaInfo;
+import org.apache.flink.runtime.state.StateTransformationFunction;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Random;
+
+public class CopyOnWriteStateTableTest {
+
+       /**
+        * Testing the basic map operations.
+        */
+       @Test
+       public void testPutGetRemoveContainsTransform() throws Exception {
+               RegisteredBackendStateMetaInfo<Integer, ArrayList<Integer>> 
metaInfo =
+                               new RegisteredBackendStateMetaInfo<>(
+                                               StateDescriptor.Type.UNKNOWN,
+                                               "test",
+                                               IntSerializer.INSTANCE,
+                                               new 
ArrayListSerializer<>(IntSerializer.INSTANCE)); // we use mutable state objects.
+
+               final MockInternalKeyContext<Integer> keyContext = new 
MockInternalKeyContext<>(IntSerializer.INSTANCE);
+
+               final CopyOnWriteStateTable<Integer, Integer, 
ArrayList<Integer>> stateTable =
+                               new CopyOnWriteStateTable<>(keyContext, 
metaInfo);
+
+               ArrayList<Integer> state_1_1 = new ArrayList<>();
+               state_1_1.add(41);
+               ArrayList<Integer> state_2_1 = new ArrayList<>();
+               state_2_1.add(42);
+               ArrayList<Integer> state_1_2 = new ArrayList<>();
+               state_1_2.add(43);
+
+               Assert.assertNull(stateTable.putAndGetOld(1, 1, state_1_1));
+               Assert.assertEquals(state_1_1, stateTable.get(1, 1));
+               Assert.assertEquals(1, stateTable.size());
+
+               Assert.assertNull(stateTable.putAndGetOld(2, 1, state_2_1));
+               Assert.assertEquals(state_2_1, stateTable.get(2, 1));
+               Assert.assertEquals(2, stateTable.size());
+
+               Assert.assertNull(stateTable.putAndGetOld(1, 2, state_1_2));
+               Assert.assertEquals(state_1_2, stateTable.get(1, 2));
+               Assert.assertEquals(3, stateTable.size());
+
+               Assert.assertTrue(stateTable.containsKey(2, 1));
+               Assert.assertFalse(stateTable.containsKey(3, 1));
+               Assert.assertFalse(stateTable.containsKey(2, 3));
+               stateTable.put(2, 1, null);
+               Assert.assertTrue(stateTable.containsKey(2, 1));
+               Assert.assertEquals(3, stateTable.size());
+               Assert.assertNull(stateTable.get(2, 1));
+               stateTable.put(2, 1, state_2_1);
+               Assert.assertEquals(3, stateTable.size());
+
+               Assert.assertEquals(state_2_1, stateTable.removeAndGetOld(2, 
1));
+               Assert.assertFalse(stateTable.containsKey(2, 1));
+               Assert.assertEquals(2, stateTable.size());
+
+               stateTable.remove(1, 2);
+               Assert.assertFalse(stateTable.containsKey(1, 2));
+               Assert.assertEquals(1, stateTable.size());
+
+               Assert.assertNull(stateTable.removeAndGetOld(4, 2));
+               Assert.assertEquals(1, stateTable.size());
+
+               StateTransformationFunction<ArrayList<Integer>, Integer> 
function =
+                               new 
StateTransformationFunction<ArrayList<Integer>, Integer>() {
+                                       @Override
+                                       public ArrayList<Integer> 
apply(ArrayList<Integer> previousState, Integer value) throws Exception {
+                                               previousState.add(value);
+                                               return previousState;
+                                       }
+                               };
+
+               final int value = 4711;
+               stateTable.transform(1, 1, value, function);
+               state_1_1 = function.apply(state_1_1, value);
+               Assert.assertEquals(state_1_1, stateTable.get(1, 1));
+       }
+
+       /**
+        * This test triggers incremental rehash and tests for corruptions.
+        */
+       @Test
+       public void testIncrementalRehash() {
+               RegisteredBackendStateMetaInfo<Integer, ArrayList<Integer>> 
metaInfo =
+                               new RegisteredBackendStateMetaInfo<>(
+                                               StateDescriptor.Type.UNKNOWN,
+                                               "test",
+                                               IntSerializer.INSTANCE,
+                                               new 
ArrayListSerializer<>(IntSerializer.INSTANCE)); // we use mutable state objects.
+
+               final MockInternalKeyContext<Integer> keyContext = new 
MockInternalKeyContext<>(IntSerializer.INSTANCE);
+
+               final CopyOnWriteStateTable<Integer, Integer, 
ArrayList<Integer>> stateTable =
+                               new CopyOnWriteStateTable<>(keyContext, 
metaInfo);
+
+               int insert = 0;
+               int remove = 0;
+               while (!stateTable.isRehashing()) {
+                       stateTable.put(insert++, 0, new ArrayList<Integer>());
+                       if (insert % 8 == 0) {
+                               stateTable.remove(remove++, 0);
+                       }
+               }
+               Assert.assertEquals(insert - remove, stateTable.size());
+               while (stateTable.isRehashing()) {
+                       stateTable.put(insert++, 0, new ArrayList<Integer>());
+                       if (insert % 8 == 0) {
+                               stateTable.remove(remove++, 0);
+                       }
+               }
+               Assert.assertEquals(insert - remove, stateTable.size());
+
+               for (int i = 0; i < insert; ++i) {
+                       if (i < remove) {
+                               Assert.assertFalse(stateTable.containsKey(i, 
0));
+                       } else {
+                               Assert.assertTrue(stateTable.containsKey(i, 0));
+                       }
+               }
+       }
+
+       /**
+        * This test does some random modifications to a state table and a 
reference (hash map). Then draws snapshots,
+        * performs more modifications and checks snapshot integrity.
+        */
+       @Test
+       public void testRandomModificationsAndCopyOnWriteIsolation() throws 
Exception {
+
+               final RegisteredBackendStateMetaInfo<Integer, 
ArrayList<Integer>> metaInfo =
+                               new RegisteredBackendStateMetaInfo<>(
+                                               StateDescriptor.Type.UNKNOWN,
+                                               "test",
+                                               IntSerializer.INSTANCE,
+                                               new 
ArrayListSerializer<>(IntSerializer.INSTANCE)); // we use mutable state objects.
+
+               final MockInternalKeyContext<Integer> keyContext = new 
MockInternalKeyContext<>(IntSerializer.INSTANCE);
+
+               final CopyOnWriteStateTable<Integer, Integer, 
ArrayList<Integer>> stateTable =
+                               new CopyOnWriteStateTable<>(keyContext, 
metaInfo);
+
+               final HashMap<Tuple2<Integer, Integer>, ArrayList<Integer>> 
referenceMap = new HashMap<>();
+
+               final Random random = new Random(42);
+
+               // holds snapshots from the map under test
+               CopyOnWriteStateTable.StateTableEntry<Integer, Integer, 
ArrayList<Integer>>[] snapshot = null;
+               int snapshotSize = 0;
+
+               // holds a reference snapshot from our reference map that we 
compare against
+               Tuple3<Integer, Integer, ArrayList<Integer>>[] reference = null;
+
+               int val = 0;
+
+
+               int snapshotCounter = 0;
+               int referencedSnapshotId = 0;
+
+               final StateTransformationFunction<ArrayList<Integer>, Integer> 
transformationFunction =
+                               new 
StateTransformationFunction<ArrayList<Integer>, Integer>() {
+                                       @Override
+                                       public ArrayList<Integer> 
apply(ArrayList<Integer> previousState, Integer value) throws Exception {
+                                               if (previousState == null) {
+                                                       previousState = new 
ArrayList<>();
+                                               }
+                                               previousState.add(value);
+                                               // we give back the original, 
attempting to spot errors in to copy-on-write
+                                               return previousState;
+                                       }
+                               };
+
+               // the main loop for modifications
+               for (int i = 0; i < 10_000_000; ++i) {
+
+                       int key = random.nextInt(20);
+                       int namespace = random.nextInt(4);
+                       Tuple2<Integer, Integer> compositeKey = new 
Tuple2<>(key, namespace);
+
+                       int op = random.nextInt(7);
+
+                       ArrayList<Integer> state = null;
+                       ArrayList<Integer> referenceState = null;
+
+                       switch (op) {
+                               case 0:
+                               case 1: {
+                                       state = stateTable.get(key, namespace);
+                                       referenceState = 
referenceMap.get(compositeKey);
+                                       if (null == state) {
+                                               state = new ArrayList<>();
+                                               stateTable.put(key, namespace, 
state);
+                                               referenceState = new 
ArrayList<>();
+                                               referenceMap.put(compositeKey, 
referenceState);
+                                       }
+                                       break;
+                               }
+                               case 2: {
+                                       stateTable.put(key, namespace, new 
ArrayList<Integer>());
+                                       referenceMap.put(compositeKey, new 
ArrayList<Integer>());
+                                       break;
+                               }
+                               case 3: {
+                                       state = stateTable.putAndGetOld(key, 
namespace, new ArrayList<Integer>());
+                                       referenceState = 
referenceMap.put(compositeKey, new ArrayList<Integer>());
+                                       break;
+                               }
+                               case 4: {
+                                       stateTable.remove(key, namespace);
+                                       referenceMap.remove(compositeKey);
+                                       break;
+                               }
+                               case 5: {
+                                       state = stateTable.removeAndGetOld(key, 
namespace);
+                                       referenceState = 
referenceMap.remove(compositeKey);
+                                       break;
+                               }
+                               case 6: {
+                                       final int updateValue = 
random.nextInt(1000);
+                                       stateTable.transform(key, namespace, 
updateValue, transformationFunction);
+                                       referenceMap.put(compositeKey, 
transformationFunction.apply(
+                                                       
referenceMap.remove(compositeKey), updateValue));
+                                       break;
+                               }
+                               default: {
+                                       Assert.fail("Unknown op-code " + op);
+                               }
+                       }
+
+                       Assert.assertEquals(referenceMap.size(), 
stateTable.size());
+
+                       if (state != null) {
+                               // mutate the states a bit...
+                               if (random.nextBoolean() && !state.isEmpty()) {
+                                       state.remove(state.size() - 1);
+                                       
referenceState.remove(referenceState.size() - 1);
+                               } else {
+                                       state.add(val);
+                                       referenceState.add(val);
+                                       ++val;
+                               }
+                       }
+
+                       Assert.assertEquals(referenceState, state);
+
+                       // snapshot triggering / comparison / release
+                       if (i > 0 && i % 500 == 0) {
+
+                               if (snapshot != null) {
+                                       // check our referenced snapshot
+                                       deepCheck(reference, convert(snapshot, 
snapshotSize));
+
+                                       if (i % 1_000 == 0) {
+                                               // draw and release some other 
snapshot while holding on the old snapshot
+                                               ++snapshotCounter;
+                                               
stateTable.snapshotTableArrays();
+                                               
stateTable.releaseSnapshot(snapshotCounter);
+                                       }
+
+                                       //release the snapshot after some time
+                                       if (i % 5_000 == 0) {
+                                               snapshot = null;
+                                               reference = null;
+                                               snapshotSize = 0;
+                                               
stateTable.releaseSnapshot(referencedSnapshotId);
+                                       }
+
+                               } else {
+                                       // if there is no more referenced 
snapshot, we create one
+                                       ++snapshotCounter;
+                                       referencedSnapshotId = snapshotCounter;
+                                       snapshot = 
stateTable.snapshotTableArrays();
+                                       snapshotSize = stateTable.size();
+                                       reference = 
manualDeepDump(referenceMap);
+                               }
+                       }
+               }
+       }
+
+       /**
+        * This tests for the copy-on-write contracts, e.g. ensures that no 
copy-on-write is active after all snapshots are
+        * released.
+        */
+       @Test
+       public void testCopyOnWriteContracts() {
+               RegisteredBackendStateMetaInfo<Integer, ArrayList<Integer>> 
metaInfo =
+                               new RegisteredBackendStateMetaInfo<>(
+                                               StateDescriptor.Type.UNKNOWN,
+                                               "test",
+                                               IntSerializer.INSTANCE,
+                                               new 
ArrayListSerializer<>(IntSerializer.INSTANCE)); // we use mutable state objects.
+
+               final MockInternalKeyContext<Integer> keyContext = new 
MockInternalKeyContext<>(IntSerializer.INSTANCE);
+
+               final CopyOnWriteStateTable<Integer, Integer, 
ArrayList<Integer>> stateTable =
+                               new CopyOnWriteStateTable<>(keyContext, 
metaInfo);
+
+               ArrayList<Integer> originalState1 = new ArrayList<>(1);
+               ArrayList<Integer> originalState2 = new ArrayList<>(1);
+               ArrayList<Integer> originalState3 = new ArrayList<>(1);
+               ArrayList<Integer> originalState4 = new ArrayList<>(1);
+               ArrayList<Integer> originalState5 = new ArrayList<>(1);
+
+               originalState1.add(1);
+               originalState2.add(2);
+               originalState3.add(3);
+               originalState4.add(4);
+               originalState5.add(5);
+
+               stateTable.put(1, 1, originalState1);
+               stateTable.put(2, 1, originalState2);
+               stateTable.put(4, 1, originalState4);
+               stateTable.put(5, 1, originalState5);
+
+               // no snapshot taken, we get the original back
+               Assert.assertTrue(stateTable.get(1, 1) == originalState1);
+               CopyOnWriteStateTableSnapshot<Integer, Integer, 
ArrayList<Integer>> snapshot1 = stateTable.createSnapshot();
+               // after snapshot1 is taken, we get a copy...
+               final ArrayList<Integer> copyState = stateTable.get(1, 1);
+               Assert.assertFalse(copyState == originalState1);
+               // ...and the copy is equal
+               Assert.assertEquals(originalState1, copyState);
+
+               // we make an insert AFTER snapshot1
+               stateTable.put(3, 1, originalState3);
+
+               // on repeated lookups, we get the same copy because no further 
snapshot was taken
+               Assert.assertTrue(copyState == stateTable.get(1, 1));
+
+               // we take snapshot2
+               CopyOnWriteStateTableSnapshot<Integer, Integer, 
ArrayList<Integer>> snapshot2 = stateTable.createSnapshot();
+               // after the second snapshot, copy-on-write is active again for 
old entries
+               Assert.assertFalse(copyState == stateTable.get(1, 1));
+               // and equality still holds
+               Assert.assertEquals(copyState, stateTable.get(1, 1));
+
+               // after releasing snapshot2
+               stateTable.releaseSnapshot(snapshot2);
+               // we still get the original of the untouched late insert 
(after snapshot1)
+               Assert.assertTrue(originalState3 == stateTable.get(3, 1));
+               // but copy-on-write is still active for older inserts (before 
snapshot1)
+               Assert.assertFalse(originalState4 == stateTable.get(4, 1));
+
+               // after releasing snapshot1
+               stateTable.releaseSnapshot(snapshot1);
+               // no copy-on-write is active
+               Assert.assertTrue(originalState5 == stateTable.get(5, 1));
+       }
+
+       @SuppressWarnings("unchecked")
+       private static <K, N, S> Tuple3<K, N, S>[] 
convert(CopyOnWriteStateTable.StateTableEntry<K, N, S>[] snapshot, int mapSize) 
{
+
+               Tuple3<K, N, S>[] result = new Tuple3[mapSize];
+               int pos = 0;
+               for (CopyOnWriteStateTable.StateTableEntry<K, N, S> entry : 
snapshot) {
+                       while (null != entry) {
+                               result[pos++] = new Tuple3<>(entry.getKey(), 
entry.getNamespace(), entry.getState());
+                               entry = entry.next;
+                       }
+               }
+               Assert.assertEquals(mapSize, pos);
+               return result;
+       }
+
+       @SuppressWarnings("unchecked")
+       private Tuple3<Integer, Integer, ArrayList<Integer>>[] manualDeepDump(
+                       HashMap<Tuple2<Integer, Integer>,
+                                       ArrayList<Integer>> map) {
+
+               Tuple3<Integer, Integer, ArrayList<Integer>>[] result = new 
Tuple3[map.size()];
+               int pos = 0;
+               for (Map.Entry<Tuple2<Integer, Integer>, ArrayList<Integer>> 
entry : map.entrySet()) {
+                       Integer key = entry.getKey().f0;
+                       Integer namespace = entry.getKey().f1;
+                       result[pos++] = new Tuple3<>(key, namespace, new 
ArrayList<>(entry.getValue()));
+               }
+               return result;
+       }
+
+       private void deepCheck(
+                       Tuple3<Integer, Integer, ArrayList<Integer>>[] a,
+                       Tuple3<Integer, Integer, ArrayList<Integer>>[] b) {
+
+               if (a == b) {
+                       return;
+               }
+
+               Assert.assertEquals(a.length, b.length);
+
+               Comparator<Tuple3<Integer, Integer, ArrayList<Integer>>> 
comparator =
+                               new Comparator<Tuple3<Integer, Integer, 
ArrayList<Integer>>>() {
+
+                                       @Override
+                                       public int compare(Tuple3<Integer, 
Integer, ArrayList<Integer>> o1, Tuple3<Integer, Integer, ArrayList<Integer>> 
o2) {
+                                               int namespaceDiff = o1.f1 - 
o2.f1;
+                                               return namespaceDiff != 0 ? 
namespaceDiff : o1.f0 - o2.f0;
+                                       }
+                               };
+
+               Arrays.sort(a, comparator);
+               Arrays.sort(b, comparator);
+
+               for (int i = 0; i < a.length; ++i) {
+                       Tuple3<Integer, Integer, ArrayList<Integer>> av = a[i];
+                       Tuple3<Integer, Integer, ArrayList<Integer>> bv = b[i];
+
+                       Assert.assertEquals(av.f0, bv.f0);
+                       Assert.assertEquals(av.f1, bv.f1);
+                       Assert.assertEquals(av.f2, bv.f2);
+               }
+       }
+
+       static class MockInternalKeyContext<T> implements InternalKeyContext<T> 
{
+
+               private T key;
+               private final TypeSerializer<T> serializer;
+               private final KeyGroupRange keyGroupRange;
+
+               public MockInternalKeyContext(TypeSerializer<T> serializer) {
+                       this.serializer = serializer;
+                       this.keyGroupRange = new KeyGroupRange(0, 0);
+               }
+
+               public void setKey(T key) {
+                       this.key = key;
+               }
+
+               @Override
+               public T getCurrentKey() {
+                       return key;
+               }
+
+               @Override
+               public int getCurrentKeyGroupIndex() {
+                       return 0;
+               }
+
+               @Override
+               public int getNumberOfKeyGroups() {
+                       return 1;
+               }
+
+               @Override
+               public KeyGroupRange getKeyGroupRange() {
+                       return keyGroupRange;
+               }
+
+               @Override
+               public TypeSerializer<T> getKeySerializer() {
+                       return serializer;
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/c6a80725/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/async/HeapListStateTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/async/HeapListStateTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/async/HeapListStateTest.java
new file mode 100644
index 0000000..a7c2d15
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/async/HeapListStateTest.java
@@ -0,0 +1,238 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.heap.async;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.typeutils.base.IntSerializer;
+import org.apache.flink.runtime.state.VoidNamespace;
+import org.apache.flink.runtime.state.VoidNamespaceSerializer;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.Set;
+
+import static java.util.Arrays.asList;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Tests for the simple Java heap objects implementation of the {@link 
ListState}.
+ */
+@SuppressWarnings("unchecked")
+public class HeapListStateTest extends HeapStateBackendTestBase {
+
+       @Test
+       public void testAddAndGet() throws Exception {
+
+               final ListStateDescriptor<Long> stateDescr = new 
ListStateDescriptor<>("my-state", Long.class);
+               stateDescr.initializeSerializerUnlessSet(new ExecutionConfig());
+
+               final AsyncHeapKeyedStateBackend<String> keyedBackend = 
createKeyedBackend();
+
+               try {
+                       ListState<Long> state =
+                                       
keyedBackend.createListState(VoidNamespaceSerializer.INSTANCE, stateDescr);
+
+                       AbstractHeapMergingState<Long, VoidNamespace, ?, ?, ?, 
?, ?> mergingState =
+                               (AbstractHeapMergingState<Long, VoidNamespace, 
?, ?, ?, ?, ?>) state;
+
+                       
mergingState.setCurrentNamespace(VoidNamespace.INSTANCE);
+
+                       keyedBackend.setCurrentKey("abc");
+                       assertNull(state.get());
+
+                       keyedBackend.setCurrentKey("def");
+                       assertNull(state.get());
+                       state.add(17L);
+                       state.add(11L);
+                       assertEquals(asList(17L, 11L), state.get());
+
+                       keyedBackend.setCurrentKey("abc");
+                       assertNull(state.get());
+
+                       keyedBackend.setCurrentKey("g");
+                       assertNull(state.get());
+                       state.add(1L);
+                       state.add(2L);
+
+                       keyedBackend.setCurrentKey("def");
+                       assertEquals(asList(17L, 11L), state.get());
+                       state.clear();
+                       assertNull(state.get());
+
+                       keyedBackend.setCurrentKey("g");
+                       state.add(3L);
+                       state.add(2L);
+                       state.add(1L);
+
+                       keyedBackend.setCurrentKey("def");
+                       assertNull(state.get());
+
+                       keyedBackend.setCurrentKey("g");
+                       assertEquals(asList(1L, 2L, 3L, 2L, 1L), state.get());
+                       state.clear();
+
+                       // make sure all lists / maps are cleared
+
+                       StateTable<String, VoidNamespace, ArrayList<Long>> 
stateTable =
+                                       ((HeapListState<String, VoidNamespace, 
Long>) state).getStateTable();
+
+                       assertTrue(mergingState.getStateTable().isEmpty());
+               }
+               finally {
+                       keyedBackend.close();
+                       keyedBackend.dispose();
+               }
+       }
+
+       @Test
+       public void testMerging() throws Exception {
+
+               final ListStateDescriptor<Long> stateDescr = new 
ListStateDescriptor<>("my-state", Long.class);
+               stateDescr.initializeSerializerUnlessSet(new ExecutionConfig());
+
+               final Integer namespace1 = 1;
+               final Integer namespace2 = 2;
+               final Integer namespace3 = 3;
+
+               final Set<Long> expectedResult = new HashSet<>(asList(11L, 22L, 
33L, 44L, 55L));
+
+               final AsyncHeapKeyedStateBackend<String> keyedBackend = 
createKeyedBackend();
+
+               try {
+                       ListState<Long> state = 
keyedBackend.createListState(IntSerializer.INSTANCE, stateDescr);
+
+                       AbstractHeapMergingState<Long, Integer, ?, ?, ?, ?, ?> 
mergingState =
+                               (AbstractHeapMergingState<Long, Integer, ?, ?, 
?, ?, ?>) state;
+
+                       // populate the different namespaces
+                       //  - abc spreads the values over three namespaces
+                       //  - def spreads teh values over two namespaces (one 
empty)
+                       //  - ghi is empty
+                       //  - jkl has all elements already in the target 
namespace
+                       //  - mno has all elements already in one source 
namespace
+
+                       keyedBackend.setCurrentKey("abc");
+                       mergingState.setCurrentNamespace(namespace1);
+                       state.add(33L);
+                       state.add(55L);
+
+                       mergingState.setCurrentNamespace(namespace2);
+                       state.add(22L);
+                       state.add(11L);
+
+                       mergingState.setCurrentNamespace(namespace3);
+                       state.add(44L);
+
+                       keyedBackend.setCurrentKey("def");
+                       mergingState.setCurrentNamespace(namespace1);
+                       state.add(11L);
+                       state.add(44L);
+
+                       mergingState.setCurrentNamespace(namespace3);
+                       state.add(22L);
+                       state.add(55L);
+                       state.add(33L);
+
+                       keyedBackend.setCurrentKey("jkl");
+                       mergingState.setCurrentNamespace(namespace1);
+                       state.add(11L);
+                       state.add(22L);
+                       state.add(33L);
+                       state.add(44L);
+                       state.add(55L);
+
+                       keyedBackend.setCurrentKey("mno");
+                       mergingState.setCurrentNamespace(namespace3);
+                       state.add(11L);
+                       state.add(22L);
+                       state.add(33L);
+                       state.add(44L);
+                       state.add(55L);
+
+                       keyedBackend.setCurrentKey("abc");
+                       //TODO
+                       mergingState.mergeNamespaces(namespace1, 
asList(namespace2, namespace3));
+                       mergingState.setCurrentNamespace(namespace1);
+                       validateResult(state.get(), expectedResult);
+
+                       keyedBackend.setCurrentKey("def");
+                       mergingState.mergeNamespaces(namespace1, 
asList(namespace2, namespace3));
+                       mergingState.setCurrentNamespace(namespace1);
+                       validateResult(state.get(), expectedResult);
+
+                       keyedBackend.setCurrentKey("ghi");
+                       mergingState.mergeNamespaces(namespace1, 
asList(namespace2, namespace3));
+                       mergingState.setCurrentNamespace(namespace1);
+                       assertNull(state.get());
+
+                       keyedBackend.setCurrentKey("jkl");
+                       mergingState.mergeNamespaces(namespace1, 
asList(namespace2, namespace3));
+                       mergingState.setCurrentNamespace(namespace1);
+                       validateResult(state.get(), expectedResult);
+
+                       keyedBackend.setCurrentKey("mno");
+                       mergingState.mergeNamespaces(namespace1, 
asList(namespace2, namespace3));
+                       mergingState.setCurrentNamespace(namespace1);
+                       validateResult(state.get(), expectedResult);
+
+                       // make sure all lists / maps are cleared
+
+                       keyedBackend.setCurrentKey("abc");
+                       mergingState.setCurrentNamespace(namespace1);
+                       state.clear();
+
+                       keyedBackend.setCurrentKey("def");
+                       mergingState.setCurrentNamespace(namespace1);
+                       state.clear();
+
+                       keyedBackend.setCurrentKey("ghi");
+                       mergingState.setCurrentNamespace(namespace1);
+                       state.clear();
+
+                       keyedBackend.setCurrentKey("jkl");
+                       mergingState.setCurrentNamespace(namespace1);
+                       state.clear();
+
+                       keyedBackend.setCurrentKey("mno");
+                       mergingState.setCurrentNamespace(namespace1);
+                       state.clear();
+
+                       assertTrue(mergingState.getStateTable().isEmpty());
+               }
+               finally {
+                       keyedBackend.close();
+                       keyedBackend.dispose();
+               }
+       }
+       
+       private static <T> void validateResult(Iterable<T> values, Set<T> 
expected) {
+               int num = 0;
+               for (T v : values) {
+                       num++;
+                       assertTrue(expected.contains(v));
+               }
+
+               assertEquals(expected.size(), num);
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/c6a80725/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/async/HeapReducingStateTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/async/HeapReducingStateTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/async/HeapReducingStateTest.java
new file mode 100644
index 0000000..5da0fef
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/async/HeapReducingStateTest.java
@@ -0,0 +1,236 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.heap.async;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.api.common.state.ReducingState;
+import org.apache.flink.api.common.state.ReducingStateDescriptor;
+import org.apache.flink.api.common.typeutils.base.IntSerializer;
+import org.apache.flink.runtime.state.VoidNamespace;
+import org.apache.flink.runtime.state.VoidNamespaceSerializer;
+import org.junit.Test;
+
+import static java.util.Arrays.asList;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Tests for the simple Java heap objects implementation of the {@link 
ReducingState}.
+ */
+@SuppressWarnings("unchecked")
+public class HeapReducingStateTest extends HeapStateBackendTestBase {
+
+       @Test
+       public void testAddAndGet() throws Exception {
+
+               final ReducingStateDescriptor<Long> stateDescr =
+                               new ReducingStateDescriptor<>("my-state", new 
AddingFunction(), Long.class);
+               stateDescr.initializeSerializerUnlessSet(new ExecutionConfig());
+
+               final AsyncHeapKeyedStateBackend<String> keyedBackend = 
createKeyedBackend();
+
+               try {
+                       ReducingState<Long> reducingState =
+                               
keyedBackend.createReducingState(VoidNamespaceSerializer.INSTANCE, stateDescr);
+
+                       AbstractHeapMergingState<Long, VoidNamespace, ?, ?, ?, 
?, ?> state =
+                               (AbstractHeapMergingState<Long, VoidNamespace, 
?, ?, ?, ?, ?>) reducingState;
+
+                       state.setCurrentNamespace(VoidNamespace.INSTANCE);
+
+                       keyedBackend.setCurrentKey("abc");
+                       assertNull(reducingState.get());
+
+                       keyedBackend.setCurrentKey("def");
+                       assertNull(reducingState.get());
+                       reducingState.add(17L);
+                       reducingState.add(11L);
+                       assertEquals(28L, reducingState.get().longValue());
+
+                       keyedBackend.setCurrentKey("abc");
+                       assertNull(reducingState.get());
+
+                       keyedBackend.setCurrentKey("g");
+                       assertNull(reducingState.get());
+                       reducingState.add(1L);
+                       reducingState.add(2L);
+
+                       keyedBackend.setCurrentKey("def");
+                       assertEquals(28L, reducingState.get().longValue());
+                       state.clear();
+                       assertNull(reducingState.get());
+
+                       keyedBackend.setCurrentKey("g");
+                       reducingState.add(3L);
+                       reducingState.add(2L);
+                       reducingState.add(1L);
+
+                       keyedBackend.setCurrentKey("def");
+                       assertNull(reducingState.get());
+
+                       keyedBackend.setCurrentKey("g");
+                       assertEquals(9L, reducingState.get().longValue());
+                       state.clear();
+
+                       // make sure all lists / maps are cleared
+                       assertTrue(state.getStateTable().isEmpty());
+               }
+               finally {
+                       keyedBackend.close();
+                       keyedBackend.dispose();
+               }
+       }
+
+       @Test
+       public void testMerging() throws Exception {
+
+               final ReducingStateDescriptor<Long> stateDescr = new 
ReducingStateDescriptor<>(
+                               "my-state", new AddingFunction(), Long.class);
+               stateDescr.initializeSerializerUnlessSet(new ExecutionConfig());
+
+               final Integer namespace1 = 1;
+               final Integer namespace2 = 2;
+               final Integer namespace3 = 3;
+
+               final Long expectedResult = 165L;
+
+               final AsyncHeapKeyedStateBackend<String> keyedBackend = 
createKeyedBackend();
+
+               try {
+                       final ReducingState<Long> reducingState =
+                                       
keyedBackend.createReducingState(IntSerializer.INSTANCE, stateDescr);
+
+                       AbstractHeapMergingState<Long, Integer, ?, ?, ?, ?, ?> 
state =
+                               (AbstractHeapMergingState<Long, Integer, ?, ?, 
?, ?, ?>) reducingState;
+
+                       // populate the different namespaces
+                       //  - abc spreads the values over three namespaces
+                       //  - def spreads teh values over two namespaces (one 
empty)
+                       //  - ghi is empty
+                       //  - jkl has all elements already in the target 
namespace
+                       //  - mno has all elements already in one source 
namespace
+
+                       keyedBackend.setCurrentKey("abc");
+                       state.setCurrentNamespace(namespace1);
+                       reducingState.add(33L);
+                       reducingState.add(55L);
+
+                       state.setCurrentNamespace(namespace2);
+                       reducingState.add(22L);
+                       reducingState.add(11L);
+
+                       state.setCurrentNamespace(namespace3);
+                       reducingState.add(44L);
+
+                       keyedBackend.setCurrentKey("def");
+                       state.setCurrentNamespace(namespace1);
+                       reducingState.add(11L);
+                       reducingState.add(44L);
+
+                       state.setCurrentNamespace(namespace3);
+                       reducingState.add(22L);
+                       reducingState.add(55L);
+                       reducingState.add(33L);
+
+                       keyedBackend.setCurrentKey("jkl");
+                       state.setCurrentNamespace(namespace1);
+                       reducingState.add(11L);
+                       reducingState.add(22L);
+                       reducingState.add(33L);
+                       reducingState.add(44L);
+                       reducingState.add(55L);
+
+                       keyedBackend.setCurrentKey("mno");
+                       state.setCurrentNamespace(namespace3);
+                       reducingState.add(11L);
+                       reducingState.add(22L);
+                       reducingState.add(33L);
+                       reducingState.add(44L);
+                       reducingState.add(55L);
+
+                       keyedBackend.setCurrentKey("abc");
+                       state.mergeNamespaces(namespace1, asList(namespace2, 
namespace3));
+                       state.setCurrentNamespace(namespace1);
+                       assertEquals(expectedResult, reducingState.get());
+
+                       keyedBackend.setCurrentKey("def");
+                       state.mergeNamespaces(namespace1, asList(namespace2, 
namespace3));
+                       state.setCurrentNamespace(namespace1);
+                       assertEquals(expectedResult, reducingState.get());
+
+                       keyedBackend.setCurrentKey("ghi");
+                       state.mergeNamespaces(namespace1, asList(namespace2, 
namespace3));
+                       state.setCurrentNamespace(namespace1);
+                       assertNull(reducingState.get());
+
+                       keyedBackend.setCurrentKey("jkl");
+                       state.mergeNamespaces(namespace1, asList(namespace2, 
namespace3));
+                       state.setCurrentNamespace(namespace1);
+                       assertEquals(expectedResult, reducingState.get());
+
+                       keyedBackend.setCurrentKey("mno");
+                       state.mergeNamespaces(namespace1, asList(namespace2, 
namespace3));
+                       state.setCurrentNamespace(namespace1);
+                       assertEquals(expectedResult, reducingState.get());
+
+                       // make sure all lists / maps are cleared
+
+                       keyedBackend.setCurrentKey("abc");
+                       state.setCurrentNamespace(namespace1);
+                       state.clear();
+
+                       keyedBackend.setCurrentKey("def");
+                       state.setCurrentNamespace(namespace1);
+                       state.clear();
+
+                       keyedBackend.setCurrentKey("ghi");
+                       state.setCurrentNamespace(namespace1);
+                       state.clear();
+
+                       keyedBackend.setCurrentKey("jkl");
+                       state.setCurrentNamespace(namespace1);
+                       state.clear();
+
+                       keyedBackend.setCurrentKey("mno");
+                       state.setCurrentNamespace(namespace1);
+                       state.clear();
+
+                       assertTrue(state.getStateTable().isEmpty());
+               }
+               finally {
+                       keyedBackend.close();
+                       keyedBackend.dispose();
+               }
+       }
+
+       // 
------------------------------------------------------------------------
+       //  test functions
+       // 
------------------------------------------------------------------------
+
+       @SuppressWarnings("serial")
+       private static class AddingFunction implements ReduceFunction<Long> {
+
+               @Override
+               public Long reduce(Long a, Long b)  {
+                       return a + b;
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/c6a80725/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/async/HeapStateBackendTestBase.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/async/HeapStateBackendTestBase.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/async/HeapStateBackendTestBase.java
new file mode 100644
index 0000000..0bb3775
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/async/HeapStateBackendTestBase.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.heap.async;
+
+import org.apache.flink.api.common.typeutils.base.StringSerializer;
+import org.apache.flink.runtime.query.TaskKvStateRegistry;
+import org.apache.flink.runtime.state.KeyGroupRange;
+
+import static org.mockito.Mockito.mock;
+
+public abstract class HeapStateBackendTestBase {
+
+       public AsyncHeapKeyedStateBackend<String> createKeyedBackend() throws 
Exception {
+               return new AsyncHeapKeyedStateBackend<>(
+                               mock(TaskKvStateRegistry.class),
+                               StringSerializer.INSTANCE,
+                               HeapStateBackendTestBase.class.getClassLoader(),
+                               16,
+                               new KeyGroupRange(0, 15));
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/c6a80725/flink-runtime/src/test/java/org/apache/flink/runtime/util/BlockerCheckpointStreamFactory.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/util/BlockerCheckpointStreamFactory.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/util/BlockerCheckpointStreamFactory.java
new file mode 100644
index 0000000..291f3ed
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/util/BlockerCheckpointStreamFactory.java
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.util;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.core.testutils.OneShotLatch;
+import org.apache.flink.runtime.state.CheckpointStreamFactory;
+import org.apache.flink.runtime.state.memory.MemCheckpointStreamFactory;
+
+import java.io.IOException;
+
+/**
+ * {@link CheckpointStreamFactory} for tests that allows for testing 
cancellation in async IO
+ */
+@VisibleForTesting
+@Internal
+public class BlockerCheckpointStreamFactory implements CheckpointStreamFactory 
{
+
+       private final int maxSize;
+       private volatile int afterNumberInvocations;
+       private volatile OneShotLatch blocker;
+       private volatile OneShotLatch waiter;
+
+       MemCheckpointStreamFactory.MemoryCheckpointOutputStream 
lastCreatedStream;
+
+       public MemCheckpointStreamFactory.MemoryCheckpointOutputStream 
getLastCreatedStream() {
+               return lastCreatedStream;
+       }
+
+       public BlockerCheckpointStreamFactory(int maxSize) {
+               this.maxSize = maxSize;
+       }
+
+       public void setAfterNumberInvocations(int afterNumberInvocations) {
+               this.afterNumberInvocations = afterNumberInvocations;
+       }
+
+       public void setBlockerLatch(OneShotLatch latch) {
+               this.blocker = latch;
+       }
+
+       public void setWaiterLatch(OneShotLatch latch) {
+               this.waiter = latch;
+       }
+
+       @Override
+       public MemCheckpointStreamFactory.MemoryCheckpointOutputStream 
createCheckpointStateOutputStream(long checkpointID, long timestamp) throws 
Exception {
+               this.lastCreatedStream = new 
MemCheckpointStreamFactory.MemoryCheckpointOutputStream(maxSize) {
+
+                       private int afterNInvocations = afterNumberInvocations;
+                       private final OneShotLatch streamBlocker = blocker;
+                       private final OneShotLatch streamWaiter = waiter;
+
+                       @Override
+                       public void write(int b) throws IOException {
+
+                               if (null != waiter) {
+                                       waiter.trigger();
+                               }
+
+                               if (afterNInvocations > 0) {
+                                       --afterNInvocations;
+                               }
+
+                               if (0 == afterNInvocations && null != 
streamBlocker) {
+                                       try {
+                                               streamBlocker.await();
+                                       } catch (InterruptedException ignored) {
+                                       }
+                               }
+                               try {
+                                       super.write(b);
+                               } catch (IOException ex) {
+                                       if (null != streamWaiter) {
+                                               streamWaiter.trigger();
+                                       }
+                                       throw ex;
+                               }
+
+                               if (0 == afterNInvocations && null != 
streamWaiter) {
+                                       streamWaiter.trigger();
+                               }
+                       }
+
+                       @Override
+                       public void close() {
+                               super.close();
+                               if (null != streamWaiter) {
+                                       streamWaiter.trigger();
+                               }
+                       }
+               };
+
+               return lastCreatedStream;
+       }
+
+       @Override
+       public void close() throws Exception {
+
+       }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/c6a80725/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/windows/TimeWindow.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/windows/TimeWindow.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/windows/TimeWindow.java
index 0d5d091..a1adda1 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/windows/TimeWindow.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/windows/TimeWindow.java
@@ -23,6 +23,7 @@ import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataOutputView;
 import 
org.apache.flink.streaming.api.windowing.assigners.MergingWindowAssigner;
+import org.apache.flink.util.MathUtils;
 
 import java.io.IOException;
 import java.util.ArrayList;
@@ -35,7 +36,7 @@ import java.util.Set;
 
 /**
  * A {@link Window} that represents a time interval from {@code start} 
(inclusive) to
- * {@code start + size} (exclusive).
+ * {@code end} (exclusive).
  */
 @PublicEvolving
 public class TimeWindow extends Window {
@@ -48,14 +49,35 @@ public class TimeWindow extends Window {
                this.end = end;
        }
 
+       /**
+        * Gets the starting timestamp of the window. This is the first 
timestamp that belongs
+        * to this window.
+        *
+        * @return The starting timestamp of this window.
+        */
        public long getStart() {
                return start;
        }
 
+       /**
+        * Gets the end timestamp of this window. The end timestamp is 
exclusive, meaning it
+        * is the first timestamp that does not belong to this window any more.
+        *
+        * @return The exclusive end timestamp of this window.
+        */
        public long getEnd() {
                return end;
        }
 
+       /**
+        * Gets the largest timestamp that still belongs to this window.
+        *
+        * <p>This timestamp is identical to {@code getEnd() - 1}.
+        *
+        * @return The largest timestamp that still belongs to this window.
+        *
+        * @see #getEnd()
+        */
        @Override
        public long maxTimestamp() {
                return end - 1;
@@ -77,17 +99,15 @@ public class TimeWindow extends Window {
 
        @Override
        public int hashCode() {
-               int result = (int) (start ^ (start >>> 32));
-               result = 31 * result + (int) (end ^ (end >>> 32));
-               return result;
+               return MathUtils.longToIntWithBitMixing(start + end);
        }
 
        @Override
        public String toString() {
                return "TimeWindow{" +
-                               "start=" + start +
-                               ", end=" + end +
-                               '}';
+                       "start=" + start +
+                       ", end=" + end +
+                       '}';
        }
 
        /**
@@ -104,6 +124,13 @@ public class TimeWindow extends Window {
                return new TimeWindow(Math.min(start, other.start), 
Math.max(end, other.end));
        }
 
+       // 
------------------------------------------------------------------------
+       // Serializer
+       // 
------------------------------------------------------------------------
+
+       /**
+        * The serializer used to write the TimeWindow type.
+        */
        public static class Serializer extends TypeSerializer<TimeWindow> {
                private static final long serialVersionUID = 1L;
 
@@ -152,9 +179,7 @@ public class TimeWindow extends Window {
 
                @Override
                public TimeWindow deserialize(TimeWindow reuse, DataInputView 
source) throws IOException {
-                       long start = source.readLong();
-                       long end = source.readLong();
-                       return new TimeWindow(start, end);
+                       return deserialize(source);
                }
 
                @Override
@@ -179,6 +204,10 @@ public class TimeWindow extends Window {
                }
        }
 
+       // 
------------------------------------------------------------------------
+       //  Utilities
+       // 
------------------------------------------------------------------------
+
        /**
         * Merge overlapping {@link TimeWindow}s. For use by merging
         * {@link 
org.apache.flink.streaming.api.windowing.assigners.WindowAssigner 
WindowAssigners}.

http://git-wip-us.apache.org/repos/asf/flink/blob/c6a80725/flink-tests/src/test/java/org/apache/flink/test/checkpointing/AbstractEventTimeWindowCheckpointingITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/AbstractEventTimeWindowCheckpointingITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/AbstractEventTimeWindowCheckpointingITCase.java
index ee417ac..b9028c8 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/AbstractEventTimeWindowCheckpointingITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/AbstractEventTimeWindowCheckpointingITCase.java
@@ -32,7 +32,9 @@ import 
org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
 import org.apache.flink.runtime.state.AbstractStateBackend;
 import org.apache.flink.runtime.state.CheckpointListener;
 import org.apache.flink.runtime.state.filesystem.FsStateBackend;
+import org.apache.flink.runtime.state.filesystem.async.AsyncFsStateBackend;
 import org.apache.flink.runtime.state.memory.MemoryStateBackend;
+import org.apache.flink.runtime.state.memory.async.AsyncMemoryStateBackend;
 import org.apache.flink.streaming.api.TimeCharacteristic;
 import org.apache.flink.streaming.api.checkpoint.ListCheckpointed;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
@@ -91,7 +93,7 @@ public abstract class 
AbstractEventTimeWindowCheckpointingITCase extends TestLog
        }
 
        enum StateBackendEnum {
-               MEM, FILE, ROCKSDB_FULLY_ASYNC
+               MEM, FILE, ROCKSDB_FULLY_ASYNC, MEM_ASYNC, FILE_ASYNC
        }
 
        @BeforeClass
@@ -115,12 +117,18 @@ public abstract class 
AbstractEventTimeWindowCheckpointingITCase extends TestLog
        @Before
        public void initStateBackend() throws IOException {
                switch (stateBackendEnum) {
+                       case MEM_ASYNC:
+                               this.stateBackend = new 
AsyncMemoryStateBackend(MAX_MEM_STATE_SIZE);
+                               break;
+                       case FILE_ASYNC: {
+                               this.stateBackend = new 
AsyncFsStateBackend(tempFolder.newFolder().toURI());
+                               break;
+                       }
                        case MEM:
                                this.stateBackend = new 
MemoryStateBackend(MAX_MEM_STATE_SIZE);
                                break;
                        case FILE: {
-                               String backups = 
tempFolder.newFolder().getAbsolutePath();
-                               this.stateBackend = new 
FsStateBackend("file://" + backups);
+                               this.stateBackend = new 
FsStateBackend(tempFolder.newFolder().toURI());
                                break;
                        }
                        case ROCKSDB_FULLY_ASYNC: {

http://git-wip-us.apache.org/repos/asf/flink/blob/c6a80725/flink-tests/src/test/java/org/apache/flink/test/checkpointing/AsyncFileBackendEventTimeWindowCheckpointingITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/AsyncFileBackendEventTimeWindowCheckpointingITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/AsyncFileBackendEventTimeWindowCheckpointingITCase.java
new file mode 100644
index 0000000..a5bf10c
--- /dev/null
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/AsyncFileBackendEventTimeWindowCheckpointingITCase.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.checkpointing;
+
+public class AsyncFileBackendEventTimeWindowCheckpointingITCase extends 
AbstractEventTimeWindowCheckpointingITCase {
+
+       public AsyncFileBackendEventTimeWindowCheckpointingITCase() {
+               super(StateBackendEnum.FILE_ASYNC);
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/c6a80725/flink-tests/src/test/java/org/apache/flink/test/checkpointing/AsyncMemBackendEventTimeWindowCheckpointingITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/AsyncMemBackendEventTimeWindowCheckpointingITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/AsyncMemBackendEventTimeWindowCheckpointingITCase.java
new file mode 100644
index 0000000..ef9ad37
--- /dev/null
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/AsyncMemBackendEventTimeWindowCheckpointingITCase.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.checkpointing;
+
+public class AsyncMemBackendEventTimeWindowCheckpointingITCase extends 
AbstractEventTimeWindowCheckpointingITCase {
+
+       public AsyncMemBackendEventTimeWindowCheckpointingITCase() {
+               super(StateBackendEnum.MEM_ASYNC);
+       }
+}

Reply via email to