Repository: flink
Updated Branches:
  refs/heads/master 336b95d4e -> 6bdaf1e4a


[FLINK-6048] [checkpoint] Implement asynchronous snapshots for 
DefaultOperatorStateBackend


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/6bdaf1e4
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/6bdaf1e4
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/6bdaf1e4

Branch: refs/heads/master
Commit: 6bdaf1e4a5db1c1609cbd7f96b20aab285877535
Parents: 336b95d
Author: Stefan Richter <[email protected]>
Authored: Tue Mar 14 14:25:57 2017 +0100
Committer: Stefan Richter <[email protected]>
Committed: Wed Apr 26 19:54:17 2017 +0200

----------------------------------------------------------------------
 .../state/RocksDBKeyedStateBackend.java         |   2 +-
 .../streaming/state/RocksDBStateBackend.java    |  17 +-
 .../state/RocksDBStateBackendTest.java          |   2 +-
 .../AbstractAsyncSnapshotIOCallable.java        | 109 +++++++
 .../runtime/state/AbstractStateBackend.java     |   7 +-
 .../state/DefaultOperatorStateBackend.java      | 220 +++++++++++---
 .../runtime/state/OperatorStateHandle.java      |   6 +-
 .../state/filesystem/FsStateBackend.java        |  13 +
 .../state/heap/HeapKeyedStateBackend.java       | 107 +++----
 .../state/memory/MemoryStateBackend.java        |  13 +
 .../BlockerCheckpointStreamFactory.java         | 112 +++++++
 .../runtime/state/OperatorStateBackendTest.java | 301 +++++++++++++++++--
 .../runtime/tasks/BlockingCheckpointsTest.java  |  10 +
 .../streaming/runtime/StateBackendITCase.java   |   9 +
 14 files changed, 787 insertions(+), 141 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/6bdaf1e4/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
----------------------------------------------------------------------
diff --git 
a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
 
b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
index 91d947e..199a5a4 100644
--- 
a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
+++ 
b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
@@ -421,7 +421,6 @@ public class RocksDBKeyedStateBackend<K> extends 
AbstractKeyedStateBackend<K> {
                 */
                public void closeCheckpointStream() throws IOException {
                        if (outStream != null) {
-                               
stateBackend.cancelStreamRegistry.unregisterClosable(outStream);
                                snapshotResultStateHandle = 
closeSnapshotStreamAndGetHandle();
                        } else {
                                snapshotResultStateHandle = null;
@@ -592,6 +591,7 @@ public class RocksDBKeyedStateBackend<K> extends 
AbstractKeyedStateBackend<K> {
                }
 
                private KeyGroupsStateHandle closeSnapshotStreamAndGetHandle() 
throws IOException {
+                       
stateBackend.cancelStreamRegistry.unregisterClosable(outStream);
                        StreamStateHandle stateHandle = 
outStream.closeAndGetHandle();
                        outStream = null;
                        return stateHandle != null ? new 
KeyGroupsStateHandle(keyGroupRangeOffsets, stateHandle) : null;

http://git-wip-us.apache.org/repos/asf/flink/blob/6bdaf1e4/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
----------------------------------------------------------------------
diff --git 
a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
 
b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
index 16b0869..80c9a29 100644
--- 
a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
+++ 
b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
@@ -26,15 +26,15 @@ import org.apache.flink.runtime.query.TaskKvStateRegistry;
 import org.apache.flink.runtime.state.AbstractKeyedStateBackend;
 import org.apache.flink.runtime.state.AbstractStateBackend;
 import org.apache.flink.runtime.state.CheckpointStreamFactory;
+import org.apache.flink.runtime.state.DefaultOperatorStateBackend;
 import org.apache.flink.runtime.state.KeyGroupRange;
+import org.apache.flink.runtime.state.OperatorStateBackend;
 import org.apache.flink.runtime.state.filesystem.FsStateBackend;
 import org.apache.flink.util.AbstractID;
-
 import org.rocksdb.ColumnFamilyOptions;
 import org.rocksdb.DBOptions;
 import org.rocksdb.NativeLibraryLoader;
 import org.rocksdb.RocksDB;
-
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -427,6 +427,19 @@ public class RocksDBStateBackend extends 
AbstractStateBackend {
        }
 
        @Override
+       public OperatorStateBackend createOperatorStateBackend(
+               Environment env,
+               String operatorIdentifier) throws Exception {
+
+               //the default for RocksDB; eventually there can be a operator 
state backend based on RocksDB, too.
+               final boolean asyncSnapshots = true;
+               return new DefaultOperatorStateBackend(
+                       env.getUserClassLoader(),
+                       env.getExecutionConfig(),
+                       asyncSnapshots);
+       }
+
+       @Override
        public String toString() {
                return "RocksDB State Backend {" +
                        "isInitialized=" + isInitialized +

http://git-wip-us.apache.org/repos/asf/flink/blob/6bdaf1e4/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendTest.java
----------------------------------------------------------------------
diff --git 
a/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendTest.java
 
b/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendTest.java
index d95a9b4..b5f18a4 100644
--- 
a/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendTest.java
+++ 
b/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendTest.java
@@ -26,6 +26,7 @@ import org.apache.flink.api.common.state.ValueState;
 import org.apache.flink.api.common.state.ValueStateDescriptor;
 import org.apache.flink.api.common.typeutils.base.IntSerializer;
 import org.apache.flink.core.testutils.OneShotLatch;
+import org.apache.flink.runtime.checkpoint.BlockerCheckpointStreamFactory;
 import org.apache.flink.runtime.checkpoint.CheckpointOptions;
 import org.apache.flink.runtime.execution.Environment;
 import org.apache.flink.runtime.operators.testutils.DummyEnvironment;
@@ -38,7 +39,6 @@ import org.apache.flink.runtime.state.StateBackendTestBase;
 import org.apache.flink.runtime.state.VoidNamespace;
 import org.apache.flink.runtime.state.VoidNamespaceSerializer;
 import org.apache.flink.runtime.state.filesystem.FsStateBackend;
-import org.apache.flink.runtime.util.BlockerCheckpointStreamFactory;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.TemporaryFolder;

http://git-wip-us.apache.org/repos/asf/flink/blob/6bdaf1e4/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/AbstractAsyncSnapshotIOCallable.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/AbstractAsyncSnapshotIOCallable.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/AbstractAsyncSnapshotIOCallable.java
new file mode 100644
index 0000000..1aaa473
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/AbstractAsyncSnapshotIOCallable.java
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.checkpoint;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.flink.core.fs.CloseableRegistry;
+import org.apache.flink.runtime.io.async.AbstractAsyncIOCallable;
+import org.apache.flink.runtime.state.CheckpointStreamFactory;
+import org.apache.flink.runtime.state.StateObject;
+import org.apache.flink.runtime.state.StreamStateHandle;
+import org.apache.flink.util.Preconditions;
+
+import java.io.IOException;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+/**
+ * Abstract base class for async IO operations of snapshots against a
+ * {@link java.util.zip.CheckedOutputStream}. This includes participating in 
lifecycle management
+ * through a {@link CloseableRegistry}.
+ */
+public abstract class AbstractAsyncSnapshotIOCallable<H extends StateObject>
+       extends AbstractAsyncIOCallable<H, 
CheckpointStreamFactory.CheckpointStateOutputStream> {
+
+       protected final  long checkpointId;
+       protected final  long timestamp;
+
+       protected final CheckpointStreamFactory streamFactory;
+       protected final CloseableRegistry closeStreamOnCancelRegistry;
+       protected final AtomicBoolean open;
+
+       public AbstractAsyncSnapshotIOCallable(
+               long checkpointId,
+               long timestamp,
+               CheckpointStreamFactory streamFactory,
+               CloseableRegistry closeStreamOnCancelRegistry) {
+
+               this.streamFactory = Preconditions.checkNotNull(streamFactory);
+               this.closeStreamOnCancelRegistry = 
Preconditions.checkNotNull(closeStreamOnCancelRegistry);
+               this.checkpointId = checkpointId;
+               this.timestamp = timestamp;
+               this.open = new AtomicBoolean(false);
+       }
+
+       @Override
+       public CheckpointStreamFactory.CheckpointStateOutputStream 
openIOHandle() throws Exception {
+               if (checkStreamClosedAndDoTransitionToOpen()) {
+                       CheckpointStreamFactory.CheckpointStateOutputStream 
stream =
+                               
streamFactory.createCheckpointStateOutputStream(checkpointId, timestamp);
+                       try {
+                               
closeStreamOnCancelRegistry.registerClosable(stream);
+                               return stream;
+                       } catch (Exception ex) {
+                               open.set(false);
+                               throw ex;
+                       }
+               } else {
+                       throw new IOException("Async snapshot: a checkpoint 
stream was already opened.");
+               }
+       }
+
+       @Override
+       public void done(boolean canceled) {
+               if (checkStreamOpenAndDoTransitionToClose()) {
+                       CheckpointStreamFactory.CheckpointStateOutputStream 
stream = getIoHandle();
+                       if (stream != null) {
+                               
closeStreamOnCancelRegistry.unregisterClosable(stream);
+                               IOUtils.closeQuietly(stream);
+                       }
+               }
+       }
+
+       protected boolean checkStreamClosedAndDoTransitionToOpen() {
+               return open.compareAndSet(false, true);
+       }
+
+       protected boolean checkStreamOpenAndDoTransitionToClose() {
+               return open.compareAndSet(true, false);
+       }
+
+       protected StreamStateHandle closeStreamAndGetStateHandle() throws 
IOException {
+               if (checkStreamOpenAndDoTransitionToClose()) {
+                       final 
CheckpointStreamFactory.CheckpointStateOutputStream stream = getIoHandle();
+                       try {
+                               return stream.closeAndGetHandle();
+                       } finally {
+                               
closeStreamOnCancelRegistry.unregisterClosable(stream);
+                       }
+               } else {
+                       throw new IOException("Checkpoint stream already 
closed.");
+               }
+       }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/6bdaf1e4/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractStateBackend.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractStateBackend.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractStateBackend.java
index 74025bf..1594e2e 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractStateBackend.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractStateBackend.java
@@ -91,12 +91,9 @@ public abstract class AbstractStateBackend implements 
StateBackend, java.io.Seri
                        TaskKvStateRegistry kvStateRegistry) throws IOException;
 
        @Override
-       public OperatorStateBackend createOperatorStateBackend(
+       public abstract OperatorStateBackend createOperatorStateBackend(
                        Environment env,
-                       String operatorIdentifier) throws Exception {
-
-               return new 
DefaultOperatorStateBackend(env.getUserClassLoader(), env.getExecutionConfig());
-       }
+                       String operatorIdentifier) throws Exception;
 
        // 
------------------------------------------------------------------------
        //  Loading the state backend from a configuration 

http://git-wip-us.apache.org/repos/asf/flink/blob/6bdaf1e4/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java
index eb3ba01..e7ed26f 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java
@@ -20,10 +20,10 @@ package org.apache.flink.runtime.state;
 
 import org.apache.commons.io.IOUtils;
 import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.state.ListState;
 import org.apache.flink.api.common.state.ListStateDescriptor;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.core.fs.CloseableRegistry;
 import org.apache.flink.core.fs.FSDataInputStream;
 import org.apache.flink.core.fs.FSDataOutputStream;
@@ -31,8 +31,12 @@ import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataInputViewStreamWrapper;
 import org.apache.flink.core.memory.DataOutputView;
 import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+import org.apache.flink.runtime.checkpoint.AbstractAsyncSnapshotIOCallable;
 import org.apache.flink.runtime.checkpoint.CheckpointOptions;
+import org.apache.flink.runtime.io.async.AsyncStoppableTaskWithCallback;
 import org.apache.flink.util.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.io.Serializable;
@@ -50,21 +54,54 @@ import java.util.concurrent.RunnableFuture;
 @Internal
 public class DefaultOperatorStateBackend implements OperatorStateBackend {
 
-       /** The default namespace for state in cases where no state name is 
provided */
+       private static final Logger LOG = 
LoggerFactory.getLogger(DefaultOperatorStateBackend.class);
+
+       /**
+        * The default namespace for state in cases where no state name is 
provided
+        */
        public static final String DEFAULT_OPERATOR_STATE_NAME = "_default_";
-       
+
+       /**
+        * Map for all registered operator states. Maps state name -> state
+        */
        private final Map<String, PartitionableListState<?>> registeredStates;
+
+       /**
+        * CloseableRegistry to participate in the tasks lifecycle.
+        */
        private final CloseableRegistry closeStreamOnCancelRegistry;
+
+       /**
+        * Default serializer. Only used for the default operator state.
+        */
        private final JavaSerializer<Serializable> javaSerializer;
+
+       /**
+        * The user code classloader.
+        */
        private final ClassLoader userClassloader;
+
+       /**
+        * The execution configuration.
+        */
        private final ExecutionConfig executionConfig;
 
-       public DefaultOperatorStateBackend(ClassLoader userClassLoader, 
ExecutionConfig executionConfig) throws IOException {
+       /**
+        * Flag to de/activate asynchronous snapshots.
+        */
+       private final boolean asynchronousSnapshots;
+
+       public DefaultOperatorStateBackend(
+               ClassLoader userClassLoader,
+               ExecutionConfig executionConfig,
+               boolean asynchronousSnapshots) throws IOException {
+
                this.closeStreamOnCancelRegistry = new CloseableRegistry();
                this.userClassloader = 
Preconditions.checkNotNull(userClassLoader);
                this.executionConfig = executionConfig;
                this.javaSerializer = new JavaSerializer<>();
                this.registeredStates = new HashMap<>();
+               this.asynchronousSnapshots = asynchronousSnapshots;
        }
 
        public ExecutionConfig getExecutionConfig() {
@@ -131,59 +168,109 @@ public class DefaultOperatorStateBackend implements 
OperatorStateBackend {
 
        @Override
        public RunnableFuture<OperatorStateHandle> snapshot(
-                       long checkpointId,
-                       long timestamp,
-                       CheckpointStreamFactory streamFactory,
-                       CheckpointOptions checkpointOptions) throws Exception {
+                       final long checkpointId,
+                       final long timestamp,
+                       final CheckpointStreamFactory streamFactory,
+                       final CheckpointOptions checkpointOptions) throws 
Exception {
+
+               final long syncStartTime = System.currentTimeMillis();
 
                if (registeredStates.isEmpty()) {
                        return DoneFuture.nullValue();
                }
 
-               List<OperatorBackendSerializationProxy.StateMetaInfo<?>> 
metaInfoList =
-                               new ArrayList<>(registeredStates.size());
-
-               for (Map.Entry<String, PartitionableListState<?>> entry : 
registeredStates.entrySet()) {
-                       PartitionableListState<?> state = entry.getValue();
-                       OperatorBackendSerializationProxy.StateMetaInfo<?> 
metaInfo =
-                                       new 
OperatorBackendSerializationProxy.StateMetaInfo<>(
-                                                       state.getName(),
-                                                       
state.getPartitionStateSerializer(),
-                                                       
state.getAssignmentMode());
-                       metaInfoList.add(metaInfo);
+               final Map<String, PartitionableListState<?>> 
registeredStatesDeepCopies =
+                               new HashMap<>(registeredStates.size());
+
+               // eagerly create deep copies of the list states in the sync 
phase, so that we can use them in the async writing
+               for (Map.Entry<String, PartitionableListState<?>> entry : 
this.registeredStates.entrySet()) {
+
+                       PartitionableListState<?> listState = entry.getValue();
+                       if (null != listState) {
+                               listState = listState.deepCopy();
+                       }
+                       registeredStatesDeepCopies.put(entry.getKey(), 
listState);
                }
 
-               Map<String, OperatorStateHandle.StateMetaInfo> 
writtenStatesMetaData = new HashMap<>(registeredStates.size());
+               // implementation of the async IO operation, based on FutureTask
+               final AbstractAsyncSnapshotIOCallable<OperatorStateHandle> 
ioCallable =
+                       new 
AbstractAsyncSnapshotIOCallable<OperatorStateHandle>(
+                               checkpointId,
+                               timestamp,
+                               streamFactory,
+                               closeStreamOnCancelRegistry) {
+
+                               @Override
+                               public OperatorStateHandle performOperation() 
throws Exception {
+                                       long asyncStartTime = 
System.currentTimeMillis();
+
+                                       final Map<String, 
OperatorStateHandle.StateMetaInfo> writtenStatesMetaData =
+                                               new 
HashMap<>(registeredStatesDeepCopies.size());
+
+                                       
List<OperatorBackendSerializationProxy.StateMetaInfo<?>> metaInfoList =
+                                               new 
ArrayList<>(registeredStatesDeepCopies.size());
+
+                                       for (Map.Entry<String, 
PartitionableListState<?>> entry :
+                                               
registeredStatesDeepCopies.entrySet()) {
+
+                                               PartitionableListState<?> state 
= entry.getValue();
+                                               
OperatorBackendSerializationProxy.StateMetaInfo<?> metaInfo =
+                                                       new 
OperatorBackendSerializationProxy.StateMetaInfo<>(
+                                                               state.getName(),
+                                                               
state.getPartitionStateSerializer(),
+                                                               
state.getAssignmentMode());
+                                               metaInfoList.add(metaInfo);
+                                       }
 
-               CheckpointStreamFactory.CheckpointStateOutputStream out = 
streamFactory.
-                               createCheckpointStateOutputStream(checkpointId, 
timestamp);
+                                       
CheckpointStreamFactory.CheckpointStateOutputStream out = getIoHandle();
+                                       DataOutputView dov = new 
DataOutputViewStreamWrapper(out);
 
-               try {
-                       closeStreamOnCancelRegistry.registerClosable(out);
+                                       OperatorBackendSerializationProxy 
backendSerializationProxy =
+                                               new 
OperatorBackendSerializationProxy(metaInfoList);
 
-                       DataOutputView dov = new 
DataOutputViewStreamWrapper(out);
+                                       backendSerializationProxy.write(dov);
+
+                                       
dov.writeInt(registeredStatesDeepCopies.size());
 
-                       OperatorBackendSerializationProxy 
backendSerializationProxy =
-                                       new 
OperatorBackendSerializationProxy(metaInfoList);
+                                       for (Map.Entry<String, 
PartitionableListState<?>> entry :
+                                               
registeredStatesDeepCopies.entrySet()) {
 
-                       backendSerializationProxy.write(dov);
+                                               PartitionableListState<?> value 
= entry.getValue();
+                                               long[] partitionOffsets = 
value.write(out);
+                                               OperatorStateHandle.Mode mode = 
value.getAssignmentMode();
+                                               writtenStatesMetaData.put(
+                                                       entry.getKey(),
+                                                       new 
OperatorStateHandle.StateMetaInfo(partitionOffsets, mode));
+                                       }
 
-                       dov.writeInt(registeredStates.size());
-                       for (Map.Entry<String, PartitionableListState<?>> entry 
: registeredStates.entrySet()) {
+                                       StreamStateHandle stateHandle = 
closeStreamAndGetStateHandle();
 
-                               PartitionableListState<?> value = 
entry.getValue();
-                               long[] partitionOffsets = value.write(out);
-                               OperatorStateHandle.Mode mode = 
value.getAssignmentMode();
-                               writtenStatesMetaData.put(entry.getKey(), new 
OperatorStateHandle.StateMetaInfo(partitionOffsets, mode));
-                       }
+                                       if (asynchronousSnapshots) {
+                                               
LOG.info("DefaultOperatorStateBackend snapshot ({}, asynchronous part) in 
thread {} took {} ms.",
+                                                       streamFactory, 
Thread.currentThread(), (System.currentTimeMillis() - asyncStartTime));
+                                       }
+
+                                       if (stateHandle == null) {
+                                               return null;
+                                       }
 
-                       OperatorStateHandle handle = new 
OperatorStateHandle(writtenStatesMetaData, out.closeAndGetHandle());
+                                       OperatorStateHandle operatorStateHandle 
=
+                                               new 
OperatorStateHandle(writtenStatesMetaData, stateHandle);
 
-                       return new DoneFuture<>(handle);
-               } finally {
-                       closeStreamOnCancelRegistry.unregisterClosable(out);
-                       out.close();
+                                       return operatorStateHandle;
+                               }
+                       };
+
+               AsyncStoppableTaskWithCallback<OperatorStateHandle> task = 
AsyncStoppableTaskWithCallback.from(ioCallable);
+
+               if (!asynchronousSnapshots) {
+                       task.run();
                }
+
+               LOG.info("DefaultOperatorStateBackend snapshot (" + 
streamFactory + ", synchronous part) in thread " +
+                               Thread.currentThread() + " took " + 
(System.currentTimeMillis() - syncStartTime) + " ms.");
+
+               return task;
        }
 
        @Override
@@ -253,22 +340,67 @@ public class DefaultOperatorStateBackend implements 
OperatorStateBackend {
                }
        }
 
+       /**
+        *
+        * Implementation of operator list state.
+        *
+        * @param <S> the type of an operator state partition.
+        */
        static final class PartitionableListState<S> implements ListState<S> {
 
+               /**
+                * The name of the state, as registered by the user
+                */
                private final String name;
+
+               /**
+                * The type serializer for the elements in the state list
+                */
                private final TypeSerializer<S> partitionStateSerializer;
+
+               /**
+                * The mode how elements in this state are assigned to tasks 
during restore
+                */
                private final OperatorStateHandle.Mode assignmentMode;
-               private final List<S> internalList;
+
+               /**
+                * The internal list the holds the elements of the state
+                */
+               private final ArrayList<S> internalList;
+
+               /**
+                * A serializer that allows to perfom deep copies of 
internalList
+                */
+               private final ArrayListSerializer<S> internalListCopySerializer;
 
                public PartitionableListState(
                                String name,
                                TypeSerializer<S> partitionStateSerializer,
                                OperatorStateHandle.Mode assignmentMode) {
 
+                       this(name, partitionStateSerializer, assignmentMode, 
new ArrayList<S>());
+               }
+
+               private PartitionableListState(
+                               String name,
+                               TypeSerializer<S> partitionStateSerializer,
+                               OperatorStateHandle.Mode assignmentMode,
+                               ArrayList<S> internalList) {
+
                        this.name = Preconditions.checkNotNull(name);
                        this.partitionStateSerializer = 
Preconditions.checkNotNull(partitionStateSerializer);
                        this.assignmentMode = 
Preconditions.checkNotNull(assignmentMode);
-                       this.internalList = new ArrayList<>();
+                       this.internalList = 
Preconditions.checkNotNull(internalList);
+                       this.internalListCopySerializer = new 
ArrayListSerializer<>(partitionStateSerializer);
+               }
+
+               private PartitionableListState(PartitionableListState<S> 
toCopy) {
+
+                       this(
+                                       toCopy.name,
+                                       
toCopy.partitionStateSerializer.duplicate(),
+                                       toCopy.assignmentMode,
+                                       
toCopy.internalListCopySerializer.copy(toCopy.internalList));
                }
 
                public String getName() {
@@ -287,6 +419,10 @@ public class DefaultOperatorStateBackend implements 
OperatorStateBackend {
                        return internalList;
                }
 
+               public PartitionableListState<S> deepCopy() {
+                       return new PartitionableListState<>(this);
+               }
+
                @Override
                public void clear() {
                        internalList.clear();

http://git-wip-us.apache.org/repos/asf/flink/blob/6bdaf1e4/flink-runtime/src/main/java/org/apache/flink/runtime/state/OperatorStateHandle.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/OperatorStateHandle.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/OperatorStateHandle.java
index c59fbad..7c49338 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/OperatorStateHandle.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/OperatorStateHandle.java
@@ -32,8 +32,12 @@ import java.util.Map;
  */
 public class OperatorStateHandle implements StreamStateHandle {
 
+       /**
+        * The modes that determine how an {@link OperatorStateHandle} is 
assigned to tasks during restore.
+        */
        public enum Mode {
-               SPLIT_DISTRIBUTE, BROADCAST
+               SPLIT_DISTRIBUTE, // The operator state partitions in the state 
handle are split and distributed to one task each.
+               BROADCAST // The operator state partitions are broadcasted to 
all task.
        }
 
        private static final long serialVersionUID = 35876522969227335L;

http://git-wip-us.apache.org/repos/asf/flink/blob/6bdaf1e4/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsStateBackend.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsStateBackend.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsStateBackend.java
index e27712c..e320bf3 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsStateBackend.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsStateBackend.java
@@ -27,7 +27,9 @@ import org.apache.flink.runtime.query.TaskKvStateRegistry;
 import org.apache.flink.runtime.state.AbstractKeyedStateBackend;
 import org.apache.flink.runtime.state.AbstractStateBackend;
 import org.apache.flink.runtime.state.CheckpointStreamFactory;
+import org.apache.flink.runtime.state.DefaultOperatorStateBackend;
 import org.apache.flink.runtime.state.KeyGroupRange;
+import org.apache.flink.runtime.state.OperatorStateBackend;
 import org.apache.flink.runtime.state.heap.HeapKeyedStateBackend;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -310,6 +312,17 @@ public class FsStateBackend extends AbstractStateBackend {
        }
 
        @Override
+       public OperatorStateBackend createOperatorStateBackend(
+               Environment env,
+               String operatorIdentifier) throws Exception {
+
+               return new DefaultOperatorStateBackend(
+                       env.getUserClassLoader(),
+                       env.getExecutionConfig(),
+                       asynchronousSnapshots);
+       }
+
+       @Override
        public String toString() {
                return "File State Backend @ " + basePath;
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/6bdaf1e4/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
index 9247ffa..38817cd 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
@@ -37,10 +37,10 @@ import 
org.apache.flink.core.memory.DataOutputViewStreamWrapper;
 import org.apache.flink.migration.MigrationUtil;
 import org.apache.flink.migration.runtime.state.KvStateSnapshot;
 import 
org.apache.flink.migration.runtime.state.memory.MigrationRestoreSnapshot;
-import org.apache.flink.runtime.io.async.AbstractAsyncIOCallable;
-import org.apache.flink.runtime.io.async.AsyncStoppableTaskWithCallback;
 import org.apache.flink.migration.state.MigrationKeyGroupStateHandle;
+import org.apache.flink.runtime.checkpoint.AbstractAsyncSnapshotIOCallable;
 import org.apache.flink.runtime.checkpoint.CheckpointOptions;
+import org.apache.flink.runtime.io.async.AsyncStoppableTaskWithCallback;
 import org.apache.flink.runtime.query.TaskKvStateRegistry;
 import org.apache.flink.runtime.state.AbstractKeyedStateBackend;
 import org.apache.flink.runtime.state.ArrayListSerializer;
@@ -72,7 +72,6 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.RunnableFuture;
-import java.util.concurrent.atomic.AtomicBoolean;
 
 /**
  * A {@link AbstractKeyedStateBackend} that keeps state on the Java Heap and 
will serialize state to
@@ -269,78 +268,50 @@ public class HeapKeyedStateBackend<K> extends 
AbstractKeyedStateBackend<K> {
                //--------------------------------------------------- this 
becomes the end of sync part
 
                // implementation of the async IO operation, based on FutureTask
-               final AbstractAsyncIOCallable<KeyedStateHandle, 
CheckpointStreamFactory.CheckpointStateOutputStream> ioCallable =
-                               new AbstractAsyncIOCallable<KeyedStateHandle, 
CheckpointStreamFactory.CheckpointStateOutputStream>() {
-
-                                       AtomicBoolean open = new 
AtomicBoolean(false);
-
-                                       @Override
-                                       public 
CheckpointStreamFactory.CheckpointStateOutputStream openIOHandle() throws 
Exception {
-                                               if (open.compareAndSet(false, 
true)) {
-                                                       
CheckpointStreamFactory.CheckpointStateOutputStream stream =
-                                                                       
streamFactory.createCheckpointStateOutputStream(checkpointId, timestamp);
-                                                       try {
-                                                               
cancelStreamRegistry.registerClosable(stream);
-                                                               return stream;
-                                                       } catch (Exception ex) {
-                                                               open.set(false);
-                                                               throw ex;
-                                                       }
-                                               } else {
-                                                       throw new 
IOException("Operation already opened.");
+               final AbstractAsyncSnapshotIOCallable<KeyedStateHandle> 
ioCallable =
+                       new AbstractAsyncSnapshotIOCallable<KeyedStateHandle>(
+                               checkpointId,
+                               timestamp,
+                               streamFactory,
+                               cancelStreamRegistry) {
+
+                               @Override
+                               public KeyGroupsStateHandle performOperation() 
throws Exception {
+                                       long asyncStartTime = 
System.currentTimeMillis();
+                                       
CheckpointStreamFactory.CheckpointStateOutputStream stream = getIoHandle();
+                                       DataOutputViewStreamWrapper outView = 
new DataOutputViewStreamWrapper(stream);
+                                       serializationProxy.write(outView);
+
+                                       long[] keyGroupRangeOffsets = new 
long[keyGroupRange.getNumberOfKeyGroups()];
+
+                                       for (int keyGroupPos = 0; keyGroupPos < 
keyGroupRange.getNumberOfKeyGroups(); ++keyGroupPos) {
+                                               int keyGroupId = 
keyGroupRange.getKeyGroupId(keyGroupPos);
+                                               
keyGroupRangeOffsets[keyGroupPos] = stream.getPos();
+                                               outView.writeInt(keyGroupId);
+
+                                               for (Map.Entry<String, 
StateTable<K, ?, ?>> kvState : stateTables.entrySet()) {
+                                                       
outView.writeShort(kVStateToId.get(kvState.getKey()));
+                                                       
cowStateStableSnapshots.get(kvState.getValue()).writeMappingsInKeyGroup(outView,
 keyGroupId);
                                                }
                                        }
 
-                                       @Override
-                                       public KeyGroupsStateHandle 
performOperation() throws Exception {
-                                               long asyncStartTime = 
System.currentTimeMillis();
-                                               
CheckpointStreamFactory.CheckpointStateOutputStream stream = getIoHandle();
-                                               DataOutputViewStreamWrapper 
outView = new DataOutputViewStreamWrapper(stream);
-                                               
serializationProxy.write(outView);
-
-                                               long[] keyGroupRangeOffsets = 
new long[keyGroupRange.getNumberOfKeyGroups()];
-
-                                               for (int keyGroupPos = 0; 
keyGroupPos < keyGroupRange.getNumberOfKeyGroups(); ++keyGroupPos) {
-                                                       int keyGroupId = 
keyGroupRange.getKeyGroupId(keyGroupPos);
-                                                       
keyGroupRangeOffsets[keyGroupPos] = stream.getPos();
-                                                       
outView.writeInt(keyGroupId);
-
-                                                       for (Map.Entry<String, 
StateTable<K, ?, ?>> kvState : stateTables.entrySet()) {
-                                                               
outView.writeShort(kVStateToId.get(kvState.getKey()));
-                                                               
cowStateStableSnapshots.get(kvState.getValue()).writeMappingsInKeyGroup(outView,
 keyGroupId);
-                                                       }
-                                               }
-
-                                               if (open.compareAndSet(true, 
false)) {
-                                                       StreamStateHandle 
streamStateHandle = stream.closeAndGetHandle();
-                                                       KeyGroupRangeOffsets 
offsets = new KeyGroupRangeOffsets(keyGroupRange, keyGroupRangeOffsets);
-                                                       final 
KeyGroupsStateHandle keyGroupsStateHandle = new KeyGroupsStateHandle(offsets, 
streamStateHandle);
+                                       final StreamStateHandle 
streamStateHandle = closeStreamAndGetStateHandle();
 
-                                                       if 
(asynchronousSnapshots) {
-                                                               LOG.info("Heap 
backend snapshot ({}, asynchronous part) in thread {} took {} ms.",
-                                                                               
streamFactory, Thread.currentThread(), (System.currentTimeMillis() - 
asyncStartTime));
-                                                       }
-
-                                                       return 
keyGroupsStateHandle;
-                                               } else {
-                                                       throw new 
IOException("Checkpoint stream already closed.");
-                                               }
+                                       if (asynchronousSnapshots) {
+                                               LOG.info("Heap backend snapshot 
({}, asynchronous part) in thread {} took {} ms.",
+                                                       streamFactory, 
Thread.currentThread(), (System.currentTimeMillis() - asyncStartTime));
                                        }
 
-                                       @Override
-                                       public void done(boolean canceled) {
-                                               if (open.compareAndSet(true, 
false)) {
-                                                       
CheckpointStreamFactory.CheckpointStateOutputStream stream = getIoHandle();
-                                                       if (null != stream) {
-                                                               
cancelStreamRegistry.unregisterClosable(stream);
-                                                               
IOUtils.closeQuietly(stream);
-                                                       }
-                                               }
-                                               for (StateTableSnapshot 
snapshot : cowStateStableSnapshots.values()) {
-                                                       snapshot.release();
-                                               }
+                                       if (streamStateHandle == null) {
+                                               return null;
                                        }
-                               };
+
+                                       KeyGroupRangeOffsets offsets = new 
KeyGroupRangeOffsets(keyGroupRange, keyGroupRangeOffsets);
+                                       final KeyGroupsStateHandle 
keyGroupsStateHandle = new KeyGroupsStateHandle(offsets, streamStateHandle);
+
+                                       return keyGroupsStateHandle;
+                               }
+                       };
 
                AsyncStoppableTaskWithCallback<KeyedStateHandle> task = 
AsyncStoppableTaskWithCallback.from(ioCallable);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/6bdaf1e4/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemoryStateBackend.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemoryStateBackend.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemoryStateBackend.java
index f0bac1b..7ed1dea 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemoryStateBackend.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemoryStateBackend.java
@@ -25,7 +25,9 @@ import org.apache.flink.runtime.query.TaskKvStateRegistry;
 import org.apache.flink.runtime.state.AbstractKeyedStateBackend;
 import org.apache.flink.runtime.state.AbstractStateBackend;
 import org.apache.flink.runtime.state.CheckpointStreamFactory;
+import org.apache.flink.runtime.state.DefaultOperatorStateBackend;
 import org.apache.flink.runtime.state.KeyGroupRange;
+import org.apache.flink.runtime.state.OperatorStateBackend;
 import org.apache.flink.runtime.state.heap.HeapKeyedStateBackend;
 
 import java.io.IOException;
@@ -89,6 +91,17 @@ public class MemoryStateBackend extends AbstractStateBackend 
{
        }
 
        @Override
+       public OperatorStateBackend createOperatorStateBackend(
+               Environment env,
+               String operatorIdentifier) throws Exception {
+
+               return new DefaultOperatorStateBackend(
+                       env.getUserClassLoader(),
+                       env.getExecutionConfig(),
+                       asynchronousSnapshots);
+       }
+
+       @Override
        public String toString() {
                return "MemoryStateBackend (data in heap memory / checkpoints 
to JobManager)";
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/6bdaf1e4/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/BlockerCheckpointStreamFactory.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/BlockerCheckpointStreamFactory.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/BlockerCheckpointStreamFactory.java
new file mode 100644
index 0000000..6f892e2
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/BlockerCheckpointStreamFactory.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.checkpoint;
+
+import org.apache.flink.core.testutils.OneShotLatch;
+import org.apache.flink.runtime.state.CheckpointStreamFactory;
+import org.apache.flink.runtime.state.memory.MemCheckpointStreamFactory;
+
+import java.io.IOException;
+
+/**
+ * A {@link CheckpointStreamFactory} for tests that creates streams that block 
on a latch to test concurrency in
+ * checkpointing.
+ */
+public class BlockerCheckpointStreamFactory implements CheckpointStreamFactory 
{
+
+       private final int maxSize;
+       private int afterNumberInvocations;
+       private OneShotLatch blocker;
+       private OneShotLatch waiter;
+
+       MemCheckpointStreamFactory.MemoryCheckpointOutputStream 
lastCreatedStream;
+
+       public MemCheckpointStreamFactory.MemoryCheckpointOutputStream 
getLastCreatedStream() {
+               return lastCreatedStream;
+       }
+
+       public BlockerCheckpointStreamFactory(int maxSize) {
+               this.maxSize = maxSize;
+       }
+
+       public void setAfterNumberInvocations(int afterNumberInvocations) {
+               this.afterNumberInvocations = afterNumberInvocations;
+       }
+
+       public void setBlockerLatch(OneShotLatch latch) {
+               this.blocker = latch;
+       }
+
+       public void setWaiterLatch(OneShotLatch latch) {
+               this.waiter = latch;
+       }
+
+       @Override
+       public MemCheckpointStreamFactory.MemoryCheckpointOutputStream 
createCheckpointStateOutputStream(long checkpointID, long timestamp) throws 
Exception {
+               waiter.trigger();
+               this.lastCreatedStream = new 
MemCheckpointStreamFactory.MemoryCheckpointOutputStream(maxSize) {
+
+                       private int afterNInvocations = afterNumberInvocations;
+                       private final OneShotLatch streamBlocker = blocker;
+                       private final OneShotLatch streamWaiter = waiter;
+
+                       @Override
+                       public void write(int b) throws IOException {
+
+                               if (afterNInvocations > 0) {
+                                       --afterNInvocations;
+                               }
+
+                               if (0 == afterNInvocations && null != 
streamBlocker) {
+                                       try {
+                                               streamBlocker.await();
+                                       } catch (InterruptedException ignored) {
+                                       }
+                               }
+                               try {
+                                       super.write(b);
+                               } catch (IOException ex) {
+                                       if (null != streamWaiter) {
+                                               streamWaiter.trigger();
+                                       }
+                                       throw ex;
+                               }
+
+                               if (0 == afterNInvocations && null != 
streamWaiter) {
+                                       streamWaiter.trigger();
+                               }
+                       }
+
+                       @Override
+                       public void close() {
+                               super.close();
+                               if (null != streamWaiter) {
+                                       streamWaiter.trigger();
+                               }
+                       }
+               };
+
+               return lastCreatedStream;
+       }
+
+       @Override
+       public void close() throws Exception {
+
+       }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/6bdaf1e4/flink-runtime/src/test/java/org/apache/flink/runtime/state/OperatorStateBackendTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/OperatorStateBackendTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/OperatorStateBackendTest.java
index 13a6307..c04ed8c 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/OperatorStateBackendTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/OperatorStateBackendTest.java
@@ -21,22 +21,30 @@ import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.state.ListState;
 import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.core.testutils.OneShotLatch;
+import org.apache.flink.runtime.checkpoint.BlockerCheckpointStreamFactory;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer;
 import org.apache.flink.runtime.checkpoint.CheckpointOptions;
 import org.apache.flink.runtime.execution.Environment;
 import 
org.apache.flink.runtime.state.DefaultOperatorStateBackend.PartitionableListState;
 import org.apache.flink.runtime.state.memory.MemoryStateBackend;
-
 import org.apache.flink.util.FutureUtil;
+import org.junit.Assert;
 import org.junit.Test;
 
+import java.io.IOException;
 import java.io.Serializable;
 import java.io.File;
 import java.util.Collections;
 import java.util.Iterator;
+import java.util.concurrent.CancellationException;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
 import java.util.concurrent.FutureTask;
 import java.util.concurrent.RunnableFuture;
+import java.util.concurrent.TimeUnit;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
@@ -75,7 +83,7 @@ public class OperatorStateBackendTest {
                final ExecutionConfig cfg = new ExecutionConfig();
                cfg.registerTypeWithKryoSerializer(registeredType, 
com.esotericsoftware.kryo.serializers.JavaSerializer.class);
 
-               final OperatorStateBackend operatorStateBackend = new 
DefaultOperatorStateBackend(classLoader, cfg);
+               final OperatorStateBackend operatorStateBackend = new 
DefaultOperatorStateBackend(classLoader, cfg, false);
 
                ListStateDescriptor<File> stateDescriptor = new 
ListStateDescriptor<>("test", File.class);
                ListStateDescriptor<String> stateDescriptor2 = new 
ListStateDescriptor<>("test2", String.class);
@@ -108,7 +116,7 @@ public class OperatorStateBackendTest {
        @Test
        public void testRegisterStates() throws Exception {
                final OperatorStateBackend operatorStateBackend =
-                               new DefaultOperatorStateBackend(classLoader, 
new ExecutionConfig());
+                               new DefaultOperatorStateBackend(classLoader, 
new ExecutionConfig(), false);
 
                ListStateDescriptor<Serializable> stateDescriptor1 = new 
ListStateDescriptor<>("test1", new JavaSerializer<>());
                ListStateDescriptor<Serializable> stateDescriptor2 = new 
ListStateDescriptor<>("test2", new JavaSerializer<>());
@@ -117,19 +125,19 @@ public class OperatorStateBackendTest {
                assertNotNull(listState1);
                assertEquals(1, 
operatorStateBackend.getRegisteredStateNames().size());
                Iterator<Serializable> it = listState1.get().iterator();
-               assertTrue(!it.hasNext());
+               assertFalse(it.hasNext());
                listState1.add(42);
                listState1.add(4711);
 
                it = listState1.get().iterator();
                assertEquals(42, it.next());
                assertEquals(4711, it.next());
-               assertTrue(!it.hasNext());
+               assertFalse(it.hasNext());
 
                ListState<Serializable> listState2 = 
operatorStateBackend.getListState(stateDescriptor2);
                assertNotNull(listState2);
                assertEquals(2, 
operatorStateBackend.getRegisteredStateNames().size());
-               assertTrue(!it.hasNext());
+               assertFalse(it.hasNext());
                listState2.add(7);
                listState2.add(13);
                listState2.add(23);
@@ -138,12 +146,12 @@ public class OperatorStateBackendTest {
                assertEquals(7, it.next());
                assertEquals(13, it.next());
                assertEquals(23, it.next());
-               assertTrue(!it.hasNext());
+               assertFalse(it.hasNext());
 
                ListState<Serializable> listState3 = 
operatorStateBackend.getUnionListState(stateDescriptor3);
                assertNotNull(listState3);
                assertEquals(3, 
operatorStateBackend.getRegisteredStateNames().size());
-               assertTrue(!it.hasNext());
+               assertFalse(it.hasNext());
                listState3.add(17);
                listState3.add(3);
                listState3.add(123);
@@ -152,7 +160,7 @@ public class OperatorStateBackendTest {
                assertEquals(17, it.next());
                assertEquals(3, it.next());
                assertEquals(123, it.next());
-               assertTrue(!it.hasNext());
+               assertFalse(it.hasNext());
 
                ListState<Serializable> listState1b = 
operatorStateBackend.getListState(stateDescriptor1);
                assertNotNull(listState1b);
@@ -161,19 +169,19 @@ public class OperatorStateBackendTest {
                assertEquals(42, it.next());
                assertEquals(4711, it.next());
                assertEquals(123, it.next());
-               assertTrue(!it.hasNext());
+               assertFalse(it.hasNext());
 
                it = listState1.get().iterator();
                assertEquals(42, it.next());
                assertEquals(4711, it.next());
                assertEquals(123, it.next());
-               assertTrue(!it.hasNext());
+               assertFalse(it.hasNext());
 
                it = listState1b.get().iterator();
                assertEquals(42, it.next());
                assertEquals(4711, it.next());
                assertEquals(123, it.next());
-               assertTrue(!it.hasNext());
+               assertFalse(it.hasNext());
 
                try {
                        
operatorStateBackend.getUnionListState(stateDescriptor2);
@@ -208,12 +216,10 @@ public class OperatorStateBackendTest {
        }
 
        @Test
-       public void testSnapshotRestore() throws Exception {
+       public void testSnapshotRestoreSync() throws Exception {
                AbstractStateBackend abstractStateBackend = new 
MemoryStateBackend(4096);
 
-               OperatorStateBackend operatorStateBackend =
-                               
abstractStateBackend.createOperatorStateBackend(createMockEnvironment(), 
"test-op-name");
-
+               OperatorStateBackend operatorStateBackend = 
abstractStateBackend.createOperatorStateBackend(createMockEnvironment(), 
"test-op-name");
                ListStateDescriptor<Serializable> stateDescriptor1 = new 
ListStateDescriptor<>("test1", new JavaSerializer<>());
                ListStateDescriptor<Serializable> stateDescriptor2 = new 
ListStateDescriptor<>("test2", new JavaSerializer<>());
                ListStateDescriptor<Serializable> stateDescriptor3 = new 
ListStateDescriptor<>("test3", new JavaSerializer<>());
@@ -234,8 +240,9 @@ public class OperatorStateBackendTest {
                listState3.add(20);
 
                CheckpointStreamFactory streamFactory = 
abstractStateBackend.createStreamFactory(new JobID(), "testOperator");
-               OperatorStateHandle stateHandle = FutureUtil.runIfNotDoneAndGet(
-                               operatorStateBackend.snapshot(1, 1, 
streamFactory, CheckpointOptions.forFullCheckpoint()));
+               RunnableFuture<OperatorStateHandle> runnableFuture =
+                               operatorStateBackend.snapshot(1, 1, 
streamFactory, CheckpointOptions.forFullCheckpoint());
+               OperatorStateHandle stateHandle = 
FutureUtil.runIfNotDoneAndGet(runnableFuture);
 
                try {
 
@@ -259,26 +266,278 @@ public class OperatorStateBackendTest {
                        Iterator<Serializable> it = listState1.get().iterator();
                        assertEquals(42, it.next());
                        assertEquals(4711, it.next());
-                       assertTrue(!it.hasNext());
+                       assertFalse(it.hasNext());
 
                        it = listState2.get().iterator();
                        assertEquals(7, it.next());
                        assertEquals(13, it.next());
                        assertEquals(23, it.next());
-                       assertTrue(!it.hasNext());
+                       assertFalse(it.hasNext());
 
                        it = listState3.get().iterator();
                        assertEquals(17, it.next());
                        assertEquals(18, it.next());
                        assertEquals(19, it.next());
                        assertEquals(20, it.next());
-                       assertTrue(!it.hasNext());
+                       assertFalse(it.hasNext());
+
+                       operatorStateBackend.close();
+                       operatorStateBackend.dispose();
+               } finally {
+                       stateHandle.discardState();
+               }
+       }
+
+       @Test
+       public void testSnapshotRestoreAsync() throws Exception {
+               DefaultOperatorStateBackend operatorStateBackend =
+                               new 
DefaultOperatorStateBackend(OperatorStateBackendTest.class.getClassLoader(), 
new ExecutionConfig(), true);
+
+               ListStateDescriptor<MutableType> stateDescriptor1 =
+                               new ListStateDescriptor<>("test1", new 
JavaSerializer<MutableType>());
+               ListStateDescriptor<MutableType> stateDescriptor2 =
+                               new ListStateDescriptor<>("test2", new 
JavaSerializer<MutableType>());
+               ListStateDescriptor<MutableType> stateDescriptor3 =
+                               new ListStateDescriptor<>("test3", new 
JavaSerializer<MutableType>());
+               ListState<MutableType> listState1 = 
operatorStateBackend.getListState(stateDescriptor1);
+               ListState<MutableType> listState2 = 
operatorStateBackend.getListState(stateDescriptor2);
+               ListState<MutableType> listState3 = 
operatorStateBackend.getUnionListState(stateDescriptor3);
+
+               listState1.add(MutableType.of(42));
+               listState1.add(MutableType.of(4711));
+
+               listState2.add(MutableType.of(7));
+               listState2.add(MutableType.of(13));
+               listState2.add(MutableType.of(23));
+
+               listState3.add(MutableType.of(17));
+               listState3.add(MutableType.of(18));
+               listState3.add(MutableType.of(19));
+               listState3.add(MutableType.of(20));
+
+               BlockerCheckpointStreamFactory streamFactory = new 
BlockerCheckpointStreamFactory(1024 * 1024);
+
+               OneShotLatch waiterLatch = new OneShotLatch();
+               OneShotLatch blockerLatch = new OneShotLatch();
+
+               streamFactory.setWaiterLatch(waiterLatch);
+               streamFactory.setBlockerLatch(blockerLatch);
+
+               RunnableFuture<OperatorStateHandle> runnableFuture =
+                               operatorStateBackend.snapshot(1, 1, 
streamFactory, CheckpointOptions.forFullCheckpoint());
+
+               ExecutorService executorService = 
Executors.newFixedThreadPool(1);
+
+               executorService.submit(runnableFuture);
+
+               // wait until the async checkpoint is in the write code, then 
continue
+               waiterLatch.await();
+
+               // do some mutations to the state, to test if our snapshot will 
NOT reflect them
+
+               listState1.add(MutableType.of(77));
+
+               int n = 0;
+
+               for (MutableType mutableType : listState2.get()) {
+                       if (++n == 2) {
+                               // allow the write code to continue, so that we 
could do changes while state is written in parallel.
+                               blockerLatch.trigger();
+                       }
+                       mutableType.setValue(mutableType.getValue() + 10);
+               }
+
+               listState3.clear();
+
+               operatorStateBackend.getListState(
+                               new ListStateDescriptor<>("test4", new 
JavaSerializer<MutableType>()));
+
+               // run the snapshot
+               OperatorStateHandle stateHandle = runnableFuture.get();
+
+               try {
+
+                       operatorStateBackend.close();
+                       operatorStateBackend.dispose();
+
+                       AbstractStateBackend abstractStateBackend = new 
MemoryStateBackend(4096);
+
+                       //TODO this is temporarily casted to test already 
functionality that we do not yet expose through public API
+                       operatorStateBackend = (DefaultOperatorStateBackend) 
abstractStateBackend.createOperatorStateBackend(
+                                       createMockEnvironment(),
+                                       "testOperator");
+
+                       
operatorStateBackend.restore(Collections.singletonList(stateHandle));
+
+                       assertEquals(3, 
operatorStateBackend.getRegisteredStateNames().size());
+
+                       listState1 = 
operatorStateBackend.getListState(stateDescriptor1);
+                       listState2 = 
operatorStateBackend.getListState(stateDescriptor2);
+                       listState3 = 
operatorStateBackend.getUnionListState(stateDescriptor3);
+
+                       assertEquals(3, 
operatorStateBackend.getRegisteredStateNames().size());
+
+                       Iterator<MutableType> it = listState1.get().iterator();
+                       assertEquals(42, it.next().value);
+                       assertEquals(4711, it.next().value);
+                       assertFalse(it.hasNext());
+
+                       it = listState2.get().iterator();
+                       assertEquals(7, it.next().value);
+                       assertEquals(13, it.next().value);
+                       assertEquals(23, it.next().value);
+                       assertFalse(it.hasNext());
+
+                       it = listState3.get().iterator();
+                       assertEquals(17, it.next().value);
+                       assertEquals(18, it.next().value);
+                       assertEquals(19, it.next().value);
+                       assertEquals(20, it.next().value);
+                       assertFalse(it.hasNext());
 
                        operatorStateBackend.close();
                        operatorStateBackend.dispose();
                } finally {
                        stateHandle.discardState();
                }
+
+               executorService.shutdown();
+       }
+
+       @Test
+       public void testSnapshotAsyncClose() throws Exception {
+               DefaultOperatorStateBackend operatorStateBackend =
+                               new 
DefaultOperatorStateBackend(OperatorStateBackendTest.class.getClassLoader(), 
new ExecutionConfig(), true);
+
+               ListStateDescriptor<MutableType> stateDescriptor1 =
+                               new ListStateDescriptor<>("test1", new 
JavaSerializer<MutableType>());
+
+               ListState<MutableType> listState1 = 
operatorStateBackend.getOperatorState(stateDescriptor1);
+
+
+               listState1.add(MutableType.of(42));
+               listState1.add(MutableType.of(4711));
+
+               BlockerCheckpointStreamFactory streamFactory = new 
BlockerCheckpointStreamFactory(1024 * 1024);
+
+               OneShotLatch waiterLatch = new OneShotLatch();
+               OneShotLatch blockerLatch = new OneShotLatch();
+
+               streamFactory.setWaiterLatch(waiterLatch);
+               streamFactory.setBlockerLatch(blockerLatch);
+
+               RunnableFuture<OperatorStateHandle> runnableFuture =
+                               operatorStateBackend.snapshot(1, 1, 
streamFactory, CheckpointOptions.forFullCheckpoint());
+
+               ExecutorService executorService = 
Executors.newFixedThreadPool(1);
+
+               executorService.submit(runnableFuture);
+
+               // wait until the async checkpoint is in the write code, then 
continue
+               waiterLatch.await();
+
+               operatorStateBackend.close();
+
+               blockerLatch.trigger();
+
+               try {
+                       runnableFuture.get(60, TimeUnit.SECONDS);
+                       Assert.fail();
+               } catch (ExecutionException eex) {
+                       Assert.assertTrue(eex.getCause() instanceof 
IOException);
+               }
+       }
+
+       @Test
+       public void testSnapshotAsyncCancel() throws Exception {
+               DefaultOperatorStateBackend operatorStateBackend =
+                               new 
DefaultOperatorStateBackend(OperatorStateBackendTest.class.getClassLoader(), 
new ExecutionConfig(), true);
+
+               ListStateDescriptor<MutableType> stateDescriptor1 =
+                               new ListStateDescriptor<>("test1", new 
JavaSerializer<MutableType>());
+
+               ListState<MutableType> listState1 = 
operatorStateBackend.getOperatorState(stateDescriptor1);
+
+
+               listState1.add(MutableType.of(42));
+               listState1.add(MutableType.of(4711));
+
+               BlockerCheckpointStreamFactory streamFactory = new 
BlockerCheckpointStreamFactory(1024 * 1024);
+
+               OneShotLatch waiterLatch = new OneShotLatch();
+               OneShotLatch blockerLatch = new OneShotLatch();
+
+               streamFactory.setWaiterLatch(waiterLatch);
+               streamFactory.setBlockerLatch(blockerLatch);
+
+               RunnableFuture<OperatorStateHandle> runnableFuture =
+                               operatorStateBackend.snapshot(1, 1, 
streamFactory, CheckpointOptions.forFullCheckpoint());
+
+               ExecutorService executorService = 
Executors.newFixedThreadPool(1);
+
+               executorService.submit(runnableFuture);
+
+               // wait until the async checkpoint is in the write code, then 
continue
+               waiterLatch.await();
+
+               runnableFuture.cancel(true);
+
+               blockerLatch.trigger();
+
+               try {
+                       runnableFuture.get(60, TimeUnit.SECONDS);
+                       Assert.fail();
+               } catch (CancellationException ignore) {
+
+               }
+       }
+
+       static final class MutableType implements Serializable {
+
+               private static final long serialVersionUID = 1L;
+
+               private int value;
+
+               public MutableType() {
+                       this(0);
+               }
+
+               public MutableType(int value) {
+                       this.value = value;
+               }
+
+               public int getValue() {
+                       return value;
+               }
+
+               public void setValue(int value) {
+                       this.value = value;
+               }
+
+               @Override
+               public boolean equals(Object o) {
+
+                       if (this == o) {
+                               return true;
+                       }
+
+                       if (o == null || getClass() != o.getClass()) {
+                               return false;
+                       }
+
+                       MutableType that = (MutableType) o;
+
+                       return value == that.value;
+               }
+
+               @Override
+               public int hashCode() {
+                       return value;
+               }
+
+               static MutableType of(int value) {
+                       return new MutableType(value);
+               }
        }
 
        // 
------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/6bdaf1e4/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/BlockingCheckpointsTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/BlockingCheckpointsTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/BlockingCheckpointsTest.java
index e266ea1..bf5be79 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/BlockingCheckpointsTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/BlockingCheckpointsTest.java
@@ -53,7 +53,9 @@ import 
org.apache.flink.runtime.state.AbstractKeyedStateBackend;
 import org.apache.flink.runtime.state.AbstractStateBackend;
 import org.apache.flink.runtime.state.CheckpointStreamFactory;
 import 
org.apache.flink.runtime.state.CheckpointStreamFactory.CheckpointStateOutputStream;
+import org.apache.flink.runtime.state.DefaultOperatorStateBackend;
 import org.apache.flink.runtime.state.KeyGroupRange;
+import org.apache.flink.runtime.state.OperatorStateBackend;
 import org.apache.flink.runtime.state.OperatorStateCheckpointOutputStream;
 import org.apache.flink.runtime.state.StateSnapshotContext;
 import org.apache.flink.runtime.state.StreamStateHandle;
@@ -187,6 +189,14 @@ public class BlockingCheckpointsTest {
 
                        throw new UnsupportedOperationException();
                }
+
+               @Override
+               public OperatorStateBackend 
createOperatorStateBackend(Environment env, String operatorIdentifier) throws 
Exception {
+                       return new DefaultOperatorStateBackend(
+                               getClass().getClassLoader(),
+                               new ExecutionConfig(),
+                               true);
+               }
        }
 
        private static final class LockingOutputStreamFactory implements 
CheckpointStreamFactory {

http://git-wip-us.apache.org/repos/asf/flink/blob/6bdaf1e4/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/StateBackendITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/StateBackendITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/StateBackendITCase.java
index 4677242..ce342c0 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/StateBackendITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/StateBackendITCase.java
@@ -32,6 +32,7 @@ import 
org.apache.flink.runtime.state.AbstractKeyedStateBackend;
 import org.apache.flink.runtime.state.AbstractStateBackend;
 import org.apache.flink.runtime.state.CheckpointStreamFactory;
 import org.apache.flink.runtime.state.KeyGroupRange;
+import org.apache.flink.runtime.state.OperatorStateBackend;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase;
 import org.junit.Test;
@@ -112,6 +113,14 @@ public class StateBackendITCase extends 
StreamingMultipleProgramsTestBase {
                                TaskKvStateRegistry kvStateRegistry) throws 
IOException {
                        throw new SuccessException();
                }
+
+               @Override
+               public OperatorStateBackend createOperatorStateBackend(
+                       Environment env,
+                       String operatorIdentifier) throws Exception {
+
+                       throw new UnsupportedOperationException();
+               }
        }
 
        static final class SuccessException extends IOException {

Reply via email to