[FLINK-5715] Asynchronous snapshots for heap-based keyed state backend 
(backport from 1.3)


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/c6a80725
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/c6a80725
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/c6a80725

Branch: refs/heads/release-1.2
Commit: c6a80725053c49dd2064405577291bdc86c82003
Parents: b703a24
Author: Stefan Richter <[email protected]>
Authored: Thu Mar 23 11:36:56 2017 +0100
Committer: Stefan Richter <[email protected]>
Committed: Fri Mar 24 18:51:19 2017 +0100

----------------------------------------------------------------------
 .../java/org/apache/flink/util/MathUtils.java   |   47 +-
 .../state/AbstractKeyedStateBackend.java        |   18 +-
 .../state/StateTransformationFunction.java      |   42 +
 .../filesystem/async/AsyncFsStateBackend.java   |  266 +++++
 .../heap/async/AbstractHeapMergingState.java    |  104 ++
 .../state/heap/async/AbstractHeapState.java     |  119 ++
 .../heap/async/AbstractStateTableSnapshot.java  |   51 +
 .../heap/async/AsyncHeapKeyedStateBackend.java  |  433 +++++++
 .../state/heap/async/CopyOnWriteStateTable.java | 1066 ++++++++++++++++++
 .../async/CopyOnWriteStateTableSnapshot.java    |  188 +++
 .../state/heap/async/HeapFoldingState.java      |   99 ++
 .../runtime/state/heap/async/HeapListState.java |  122 ++
 .../state/heap/async/HeapReducingState.java     |  107 ++
 .../state/heap/async/HeapValueState.java        |   73 ++
 .../state/heap/async/InternalKeyContext.java    |   60 +
 .../runtime/state/heap/async/StateEntry.java    |   44 +
 .../runtime/state/heap/async/StateTable.java    |  189 ++++
 .../heap/async/StateTableByKeyGroupReader.java  |   38 +
 .../heap/async/StateTableByKeyGroupReaders.java |  136 +++
 .../state/heap/async/StateTableSnapshot.java    |   45 +
 .../memory/async/AsyncMemoryStateBackend.java   |   94 ++
 .../state/AsyncFileStateBackendTest.java        |  213 ++++
 .../state/AsyncMemoryStateBackendTest.java      |  197 ++++
 .../runtime/state/MemoryStateBackendTest.java   |    2 +-
 .../runtime/state/StateBackendTestBase.java     |  150 +++
 .../heap/async/CopyOnWriteStateTableTest.java   |  486 ++++++++
 .../state/heap/async/HeapListStateTest.java     |  238 ++++
 .../state/heap/async/HeapReducingStateTest.java |  236 ++++
 .../heap/async/HeapStateBackendTestBase.java    |   37 +
 .../util/BlockerCheckpointStreamFactory.java    |  118 ++
 .../api/windowing/windows/TimeWindow.java       |   49 +-
 ...tractEventTimeWindowCheckpointingITCase.java |   14 +-
 ...ckendEventTimeWindowCheckpointingITCase.java |   26 +
 ...ckendEventTimeWindowCheckpointingITCase.java |   26 +
 34 files changed, 5106 insertions(+), 27 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/c6a80725/flink-core/src/main/java/org/apache/flink/util/MathUtils.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/util/MathUtils.java 
b/flink-core/src/main/java/org/apache/flink/util/MathUtils.java
index 074e8ae..4c52b6e 100644
--- a/flink-core/src/main/java/org/apache/flink/util/MathUtils.java
+++ b/flink-core/src/main/java/org/apache/flink/util/MathUtils.java
@@ -22,13 +22,13 @@ package org.apache.flink.util;
  * Collection of simple mathematical routines.
  */
 public final class MathUtils {
-       
+
        /**
         * Computes the logarithm of the given value to the base of 2, rounded 
down. It corresponds to the
         * position of the highest non-zero bit. The position is counted, 
starting with 0 from the least
         * significant bit to the most significant bit. For example, 
<code>log2floor(16) = 4</code>, and
         * <code>log2floor(10) = 3</code>.
-        * 
+        *
         * @param value The value to compute the logarithm for.
         * @return The logarithm (rounded down) to the base of 2.
         * @throws ArithmeticException Thrown, if the given value is zero.
@@ -40,11 +40,11 @@ public final class MathUtils {
 
                return 31 - Integer.numberOfLeadingZeros(value);
        }
-       
+
        /**
         * Computes the logarithm of the given value to the base of 2. This 
method throws an error,
         * if the given argument is not a power of 2.
-        * 
+        *
         * @param value The value to compute the logarithm for.
         * @return The logarithm to the base of 2.
         * @throws ArithmeticException Thrown, if the given value is zero.
@@ -59,25 +59,25 @@ public final class MathUtils {
                }
                return 31 - Integer.numberOfLeadingZeros(value);
        }
-       
+
        /**
         * Decrements the given number down to the closest power of two. If the 
argument is a
         * power of two, it remains unchanged.
-        * 
+        *
         * @param value The value to round down.
         * @return The closest value that is a power of two and less or equal 
than the given value.
         */
        public static int roundDownToPowerOf2(int value) {
                return Integer.highestOneBit(value);
        }
-       
+
        /**
         * Casts the given value to a 32 bit integer, if it can be safely done. 
If the cast would change the numeric
         * value, this method raises an exception.
         * <p>
         * This method is a protection in places where one expects to be able 
to safely case, but where unexpected
         * situations could make the cast unsafe and would cause hidden 
problems that are hard to track down.
-        * 
+        *
         * @param value The value to be cast to an integer.
         * @return The given value as an integer.
         * @see Math#toIntExact(long)
@@ -172,8 +172,37 @@ public final class MathUtils {
                return x + 1;
        }
 
+       /**
+        * Pseudo-randomly maps a long (64-bit) to an integer (32-bit) using 
some bit-mixing for better distribution.
+        *
+        * @param in the long (64-bit)input.
+        * @return the bit-mixed int (32-bit) output
+        */
+       public static int longToIntWithBitMixing(long in) {
+               in = (in ^ (in >>> 30)) * 0xbf58476d1ce4e5b9L;
+               in = (in ^ (in >>> 27)) * 0x94d049bb133111ebL;
+               in = in ^ (in >>> 31);
+               return (int) in;
+       }
+
+       /**
+        * Bit-mixing for pseudo-randomization of integers (e.g., to guard 
against bad hash functions). Implementation is
+        * from Murmur's 32 bit finalizer.
+        *
+        * @param in the input value
+        * @return the bit-mixed output value
+        */
+       public static int bitMix(int in) {
+               in ^= in >>> 16;
+               in *= 0x85ebca6b;
+               in ^= in >>> 13;
+               in *= 0xc2b2ae35;
+               in ^= in >>> 16;
+               return in;
+       }
+
        // 
============================================================================================
-       
+
        /**
         * Prevent Instantiation through private constructor.
         */

http://git-wip-us.apache.org/repos/asf/flink/blob/c6a80725/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackend.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackend.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackend.java
index 2daf896..23c9a49 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackend.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackend.java
@@ -35,6 +35,8 @@ import org.apache.flink.api.common.state.ValueStateDescriptor;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.core.fs.CloseableRegistry;
 import org.apache.flink.runtime.query.TaskKvStateRegistry;
+import org.apache.flink.runtime.state.heap.async.AbstractHeapMergingState;
+import org.apache.flink.runtime.state.heap.async.InternalKeyContext;
 import org.apache.flink.util.Preconditions;
 
 import java.io.Closeable;
@@ -51,7 +53,7 @@ import java.util.List;
  * @param <K> Type of the key by which state is keyed.
  */
 public abstract class AbstractKeyedStateBackend<K>
-               implements KeyedStateBackend<K>, 
Snapshotable<KeyGroupsStateHandle>, Closeable {
+               implements KeyedStateBackend<K>, 
Snapshotable<KeyGroupsStateHandle>, Closeable, InternalKeyContext<K> {
 
        /** {@link TypeSerializer} for our key. */
        protected final TypeSerializer<K> keySerializer;
@@ -205,6 +207,7 @@ public abstract class AbstractKeyedStateBackend<K>
        /**
         * @see KeyedStateBackend
         */
+       @Override
        public KeyGroupRange getKeyGroupRange() {
                return keyGroupRange;
        }
@@ -293,10 +296,16 @@ public abstract class AbstractKeyedStateBackend<K>
        @Override
        @SuppressWarnings("unchecked,rawtypes")
        public <N, S extends MergingState<?, ?>> void 
mergePartitionedStates(final N target, Collection<N> sources, final 
TypeSerializer<N> namespaceSerializer, final StateDescriptor<S, ?> 
stateDescriptor) throws Exception {
-               if (stateDescriptor instanceof ReducingStateDescriptor) {
+
+               State stateRef = getPartitionedState(target, 
namespaceSerializer, stateDescriptor);
+               if (stateRef instanceof AbstractHeapMergingState) {
+
+                       ((AbstractHeapMergingState) 
stateRef).mergeNamespaces(target, sources);
+               } else if (stateDescriptor instanceof ReducingStateDescriptor) {
+
                        ReducingStateDescriptor reducingStateDescriptor = 
(ReducingStateDescriptor) stateDescriptor;
+                       ReducingState state = (ReducingState) stateRef;
                        ReduceFunction reduceFn = 
reducingStateDescriptor.getReduceFunction();
-                       ReducingState state = (ReducingState) 
getPartitionedState(target, namespaceSerializer, stateDescriptor);
                        KvState kvState = (KvState) state;
                        Object result = null;
                        for (N source: sources) {
@@ -314,7 +323,8 @@ public abstract class AbstractKeyedStateBackend<K>
                                state.add(result);
                        }
                } else if (stateDescriptor instanceof ListStateDescriptor) {
-                       ListState<Object> state = (ListState) 
getPartitionedState(target, namespaceSerializer, stateDescriptor);
+
+                       ListState<Object> state = (ListState) stateRef;
                        KvState kvState = (KvState) state;
                        List<Object> result = new ArrayList<>();
                        for (N source: sources) {

http://git-wip-us.apache.org/repos/asf/flink/blob/c6a80725/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateTransformationFunction.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateTransformationFunction.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateTransformationFunction.java
new file mode 100644
index 0000000..182b4c8
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateTransformationFunction.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state;
+
+import org.apache.flink.annotation.Internal;
+
+/**
+ * Interface for a binary function that is used for push-down of state 
transformation into state backends. The
+ * function takes as inputs the old state and an element. From those inputs, 
the function computes the new state.
+ *
+ * @param <S> type of the previous state that is the bases for the computation 
of the new state.
+ * @param <T> type of the element value that is used to compute the change of 
state.
+ */
+@Internal
+public interface StateTransformationFunction<S, T> {
+
+       /**
+        * Binary function that applies a given value to the given old state to 
compute the new state.
+        *
+        * @param previousState the previous state that is the basis for the 
transformation.
+        * @param value         the value that the implementation applies to 
the old state to obtain the new state.
+        * @return the new state, computed by applying the given value on the 
given old state.
+        * @throws Exception if something goes wrong in applying the 
transformation function.
+        */
+       S apply(S previousState, T value) throws Exception;
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/c6a80725/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/async/AsyncFsStateBackend.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/async/AsyncFsStateBackend.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/async/AsyncFsStateBackend.java
new file mode 100644
index 0000000..d90ffbd
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/async/AsyncFsStateBackend.java
@@ -0,0 +1,266 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.filesystem.async;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.execution.Environment;
+import org.apache.flink.runtime.query.TaskKvStateRegistry;
+import org.apache.flink.runtime.state.AbstractKeyedStateBackend;
+import org.apache.flink.runtime.state.AbstractStateBackend;
+import org.apache.flink.runtime.state.CheckpointStreamFactory;
+import org.apache.flink.runtime.state.KeyGroupRange;
+import org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory;
+import org.apache.flink.runtime.state.heap.async.AsyncHeapKeyedStateBackend;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+
+/**
+ * The file state backend is a state backend that stores the state of 
streaming jobs in a file system.
+ *
+ * <p>The state backend has one core directory into which it puts all 
checkpoint data. Inside that
+ * directory, it creates a directory per job, inside which each checkpoint 
gets a directory, with
+ * files for each state, for example:
+ *
+ * {@code 
hdfs://namenode:port/flink-checkpoints/<job-id>/chk-17/6ba7b810-9dad-11d1-80b4-00c04fd430c8
 }
+ */
+public class AsyncFsStateBackend extends AbstractStateBackend {
+
+       private static final long serialVersionUID = -8191916350224044011L;
+
+       private static final Logger LOG = 
LoggerFactory.getLogger(AsyncFsStateBackend.class);
+
+       /** By default, state smaller than 1024 bytes will not be written to 
files, but
+        * will be stored directly with the metadata */
+       public static final int DEFAULT_FILE_STATE_THRESHOLD = 1024;
+
+       /** Maximum size of state that is stored with the metadata, rather than 
in files */
+       private static final int MAX_FILE_STATE_THRESHOLD = 1024 * 1024;
+       
+       /** The path to the directory for the checkpoint data, including the 
file system
+        * description via scheme and optional authority */
+       private final Path basePath;
+
+       /** State below this size will be stored as part of the metadata, 
rather than in files */
+       private final int fileStateThreshold;
+       
+       /**
+        * Creates a new state backend that stores its checkpoint data in the 
file system and location
+        * defined by the given URI.
+        *
+        * <p>A file system for the file system scheme in the URI (e.g., 
'file://', 'hdfs://', or 'S3://')
+        * must be accessible via {@link FileSystem#get(URI)}.
+        *
+        * <p>For a state backend targeting HDFS, this means that the URI must 
either specify the authority
+        * (host and port), or that the Hadoop configuration that describes 
that information must be in the
+        * classpath.
+        *
+        * @param checkpointDataUri The URI describing the filesystem (scheme 
and optionally authority),
+        *                          and the path to the checkpoint data 
directory.
+        * @throws IOException Thrown, if no file system can be found for the 
scheme in the URI.
+        */
+       public AsyncFsStateBackend(String checkpointDataUri) throws IOException 
{
+               this(new Path(checkpointDataUri));
+       }
+
+       /**
+        * Creates a new state backend that stores its checkpoint data in the 
file system and location
+        * defined by the given URI.
+        *
+        * <p>A file system for the file system scheme in the URI (e.g., 
'file://', 'hdfs://', or 'S3://')
+        * must be accessible via {@link FileSystem#get(URI)}.
+        *
+        * <p>For a state backend targeting HDFS, this means that the URI must 
either specify the authority
+        * (host and port), or that the Hadoop configuration that describes 
that information must be in the
+        * classpath.
+        *
+        * @param checkpointDataUri The URI describing the filesystem (scheme 
and optionally authority),
+        *                          and the path to the checkpoint data 
directory.
+        * @throws IOException Thrown, if no file system can be found for the 
scheme in the URI.
+        */
+       public AsyncFsStateBackend(Path checkpointDataUri) throws IOException {
+               this(checkpointDataUri.toUri());
+       }
+
+       /**
+        * Creates a new state backend that stores its checkpoint data in the 
file system and location
+        * defined by the given URI.
+        *
+        * <p>A file system for the file system scheme in the URI (e.g., 
'file://', 'hdfs://', or 'S3://')
+        * must be accessible via {@link FileSystem#get(URI)}.
+        *
+        * <p>For a state backend targeting HDFS, this means that the URI must 
either specify the authority
+        * (host and port), or that the Hadoop configuration that describes 
that information must be in the
+        * classpath.
+        *
+        * @param checkpointDataUri The URI describing the filesystem (scheme 
and optionally authority),
+        *                          and the path to the checkpoint data 
directory.
+        * @throws IOException Thrown, if no file system can be found for the 
scheme in the URI.
+        */
+       public AsyncFsStateBackend(URI checkpointDataUri) throws IOException {
+               this(checkpointDataUri, DEFAULT_FILE_STATE_THRESHOLD);
+       }
+
+       /**
+        * Creates a new state backend that stores its checkpoint data in the 
file system and location
+        * defined by the given URI.
+        *
+        * <p>A file system for the file system scheme in the URI (e.g., 
'file://', 'hdfs://', or 'S3://')
+        * must be accessible via {@link FileSystem#get(URI)}.
+        *
+        * <p>For a state backend targeting HDFS, this means that the URI must 
either specify the authority
+        * (host and port), or that the Hadoop configuration that describes 
that information must be in the
+        * classpath.
+        *
+        * @param checkpointDataUri The URI describing the filesystem (scheme 
and optionally authority),
+        *                          and the path to the checkpoint data 
directory.
+        * @param fileStateSizeThreshold State up to this size will be stored 
as part of the metadata,
+        *                             rather than in files
+        * 
+        * @throws IOException Thrown, if no file system can be found for the 
scheme in the URI.
+        */
+       public AsyncFsStateBackend(URI checkpointDataUri, int 
fileStateSizeThreshold) throws IOException {
+               if (fileStateSizeThreshold < 0) {
+                       throw new IllegalArgumentException("The threshold for 
file state size must be zero or larger.");
+               }
+               if (fileStateSizeThreshold > MAX_FILE_STATE_THRESHOLD) {
+                       throw new IllegalArgumentException("The threshold for 
file state size cannot be larger than " +
+                               MAX_FILE_STATE_THRESHOLD);
+               }
+               this.fileStateThreshold = fileStateSizeThreshold;
+               
+               this.basePath = validateAndNormalizeUri(checkpointDataUri);
+       }
+
+       /**
+        * Gets the base directory where all state-containing files are stored.
+        * The job specific directory is created inside this directory.
+        *
+        * @return The base directory.
+        */
+       public Path getBasePath() {
+               return basePath;
+       }
+
+       // 
------------------------------------------------------------------------
+       //  initialization and cleanup
+       // 
------------------------------------------------------------------------
+
+       @Override
+       public CheckpointStreamFactory createStreamFactory(JobID jobId, String 
operatorIdentifier) throws IOException {
+               return new FsCheckpointStreamFactory(basePath, jobId, 
fileStateThreshold);
+       }
+
+       @Override
+       public <K> AbstractKeyedStateBackend<K> createKeyedStateBackend(
+                       Environment env,
+                       JobID jobID,
+                       String operatorIdentifier,
+                       TypeSerializer<K> keySerializer,
+                       int numberOfKeyGroups,
+                       KeyGroupRange keyGroupRange,
+                       TaskKvStateRegistry kvStateRegistry) throws Exception {
+               return new AsyncHeapKeyedStateBackend<>(
+                               kvStateRegistry,
+                               keySerializer,
+                               env.getUserClassLoader(),
+                               numberOfKeyGroups,
+                               keyGroupRange);
+       }
+
+       @Override
+       public String toString() {
+               return "File State Backend @ " + basePath;
+       }
+
+       /**
+        * Checks and normalizes the checkpoint data URI. This method first 
checks the validity of the
+        * URI (scheme, path, availability of a matching file system) and then 
normalizes the URI
+        * to a path.
+        * 
+        * <p>If the URI does not include an authority, but the file system 
configured for the URI has an
+        * authority, then the normalized path will include this authority.
+        * 
+        * @param checkpointDataUri The URI to check and normalize.
+        * @return A normalized URI as a Path.
+        * 
+        * @throws IllegalArgumentException Thrown, if the URI misses scheme or 
path. 
+        * @throws IOException Thrown, if no file system can be found for the 
URI's scheme.
+        */
+       public static Path validateAndNormalizeUri(URI checkpointDataUri) 
throws IOException {
+               final String scheme = checkpointDataUri.getScheme();
+               final String path = checkpointDataUri.getPath();
+
+               // some validity checks
+               if (scheme == null) {
+                       throw new IllegalArgumentException("The scheme 
(hdfs://, file://, etc) is null. " +
+                                       "Please specify the file system scheme 
explicitly in the URI.");
+               }
+               if (path == null) {
+                       throw new IllegalArgumentException("The path to store 
the checkpoint data in is null. " +
+                                       "Please specify a directory path for 
the checkpoint data.");
+               }
+               if (path.length() == 0 || path.equals("/")) {
+                       throw new IllegalArgumentException("Cannot use the root 
directory for checkpoints.");
+               }
+
+               if 
(!FileSystem.isFlinkSupportedScheme(checkpointDataUri.getScheme())) {
+                       // skip verification checks for non-flink supported 
filesystem
+                       // this is because the required filesystem classes may 
not be available to the flink client
+                       return new Path(checkpointDataUri);
+               } else {
+                       // we do a bit of work to make sure that the URI for 
the filesystem refers to exactly the same
+                       // (distributed) filesystem on all hosts and includes 
full host/port information, even if the
+                       // original URI did not include that. We count on the 
filesystem loading from the configuration
+                       // to fill in the missing data.
+
+                       // try to grab the file system for this path/URI
+                       FileSystem filesystem = 
FileSystem.get(checkpointDataUri);
+                       if (filesystem == null) {
+                               String reason = "Could not find a file system 
for the given scheme in" +
+                               "the available configurations.";
+                               LOG.warn("Could not verify checkpoint path. 
This might be caused by a genuine " +
+                                               "problem or by the fact that 
the file system is not accessible from the " +
+                                               "client. Reason:{}", reason);
+                               return new Path(checkpointDataUri);
+                       }
+
+                       URI fsURI = filesystem.getUri();
+                       try {
+                               URI baseURI = new URI(fsURI.getScheme(), 
fsURI.getAuthority(), path, null, null);
+                               return new Path(baseURI);
+                       } catch (URISyntaxException e) {
+                               String reason = String.format(
+                                               "Cannot create file system URI 
for checkpointDataUri %s and filesystem URI %s: " + e.toString(),
+                                               checkpointDataUri,
+                                               fsURI);
+                               LOG.warn("Could not verify checkpoint path. 
This might be caused by a genuine " +
+                                               "problem or by the fact that 
the file system is not accessible from the " +
+                                               "client. Reason: {}", reason);
+                               return new Path(checkpointDataUri);
+                       }
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/c6a80725/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/async/AbstractHeapMergingState.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/async/AbstractHeapMergingState.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/async/AbstractHeapMergingState.java
new file mode 100644
index 0000000..1b09d9c
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/async/AbstractHeapMergingState.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.heap.async;
+
+import org.apache.flink.api.common.state.MergingState;
+import org.apache.flink.api.common.state.State;
+import org.apache.flink.api.common.state.StateDescriptor;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.runtime.state.StateTransformationFunction;
+
+import java.util.Collection;
+
+/**
+ * Base class for {@link MergingState} that is stored on the heap.
+ *
+ * @param <K> The type of the key.
+ * @param <N> The type of the namespace.
+ * @param <SV> The type of the values in the state.
+ * @param <S> The type of State
+ * @param <SD> The type of StateDescriptor for the State S
+ */
+public abstract class AbstractHeapMergingState<K, N, IN, OUT, SV, S extends 
State, SD extends StateDescriptor<S, ?>>
+               extends AbstractHeapState<K, N, SV, S, SD> {
+
+       /**
+        * The merge transformation function that implements the merge logic.
+        */
+       private final MergeTransformation mergeTransformation;
+
+       /**
+        * Creates a new key/value state for the given hash map of key/value 
pairs.
+        *
+        * @param stateDesc The state identifier for the state. This contains 
name
+        *                           and can create a default state value.
+        * @param stateTable The state tab;e to use in this kev/value state. 
May contain initial state.
+        */
+       protected AbstractHeapMergingState(
+                       SD stateDesc,
+                       StateTable<K, N, SV> stateTable,
+                       TypeSerializer<K> keySerializer,
+                       TypeSerializer<N> namespaceSerializer) {
+
+               super(stateDesc, stateTable, keySerializer, 
namespaceSerializer);
+               this.mergeTransformation = new MergeTransformation();
+       }
+
+       public void mergeNamespaces(N target, Collection<N> sources) throws 
Exception {
+               if (sources == null || sources.isEmpty()) {
+                       return; // nothing to do
+               }
+
+               final StateTable<K, N, SV> map = stateTable;
+
+               SV merged = null;
+
+               // merge the sources
+               for (N source : sources) {
+
+                       // get and remove the next source per namespace/key
+                       SV sourceState = map.removeAndGetOld(source);
+
+                       if (merged != null && sourceState != null) {
+                               merged = mergeState(merged, sourceState);
+                       } else if (merged == null) {
+                               merged = sourceState;
+                       }
+               }
+
+               // merge into the target, if needed
+               if (merged != null) {
+                       map.transform(target, merged, mergeTransformation);
+               }
+       }
+
+       protected abstract SV mergeState(SV a, SV b) throws Exception;
+
+       final class MergeTransformation implements 
StateTransformationFunction<SV, SV> {
+
+               @Override
+               public SV apply(SV targetState, SV merged) throws Exception {
+                       if (targetState != null) {
+                               return mergeState(targetState, merged);
+                       } else {
+                               return merged;
+                       }
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/c6a80725/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/async/AbstractHeapState.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/async/AbstractHeapState.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/async/AbstractHeapState.java
new file mode 100644
index 0000000..c93ea6a
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/async/AbstractHeapState.java
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.heap.async;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.State;
+import org.apache.flink.api.common.state.StateDescriptor;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.runtime.query.netty.message.KvStateRequestSerializer;
+import org.apache.flink.runtime.state.KvState;
+import org.apache.flink.util.Preconditions;
+
+/**
+ * Base class for partitioned {@link ListState} implementations that are 
backed by a regular
+ * heap hash map. The concrete implementations define how the state is 
checkpointed.
+ *
+ * @param <K> The type of the key.
+ * @param <N> The type of the namespace.
+ * @param <SV> The type of the values in the state.
+ * @param <S> The type of State
+ * @param <SD> The type of StateDescriptor for the State S
+ */
+public abstract class AbstractHeapState<K, N, SV, S extends State, SD extends 
StateDescriptor<S, ?>>
+       implements KvState<N>, State {
+
+       /** Map containing the actual key/value pairs */
+       protected final StateTable<K, N, SV> stateTable;
+
+       /** This holds the name of the state and can create an initial default 
value for the state. */
+       protected final SD stateDesc;
+
+       /** The current namespace, which the access methods will refer to. */
+       protected N currentNamespace;
+
+       protected final TypeSerializer<K> keySerializer;
+
+       protected final TypeSerializer<N> namespaceSerializer;
+
+       /**
+        * Creates a new key/value state for the given hash map of key/value 
pairs.
+        *
+        * @param stateDesc The state identifier for the state. This contains 
name
+        *                           and can create a default state value.
+        * @param stateTable The state tab;e to use in this kev/value state. 
May contain initial state.
+        */
+       protected AbstractHeapState(
+                       SD stateDesc,
+                       StateTable<K, N, SV> stateTable,
+                       TypeSerializer<K> keySerializer,
+                       TypeSerializer<N> namespaceSerializer) {
+
+               this.stateDesc = stateDesc;
+               this.stateTable = Preconditions.checkNotNull(stateTable, "State 
table must not be null.");
+               this.keySerializer = keySerializer;
+               this.namespaceSerializer = namespaceSerializer;
+               this.currentNamespace = null;
+       }
+
+       // 
------------------------------------------------------------------------
+
+
+       public final void clear() {
+               stateTable.remove(currentNamespace);
+       }
+
+       public final void setCurrentNamespace(N namespace) {
+               this.currentNamespace = Preconditions.checkNotNull(namespace, 
"Namespace must not be null.");
+       }
+
+       public byte[] getSerializedValue(byte[] serializedKeyAndNamespace) 
throws Exception {
+               Preconditions.checkNotNull(serializedKeyAndNamespace, 
"Serialized key and namespace");
+
+               Tuple2<K, N> keyAndNamespace = 
KvStateRequestSerializer.deserializeKeyAndNamespace(
+                               serializedKeyAndNamespace, keySerializer, 
namespaceSerializer);
+
+               return getSerializedValue(keyAndNamespace.f0, 
keyAndNamespace.f1);
+       }
+
+       public byte[] getSerializedValue(K key, N namespace) throws Exception {
+               Preconditions.checkState(namespace != null, "No namespace 
given.");
+               Preconditions.checkState(key != null, "No key given.");
+
+               SV result = stateTable.get(key, namespace);
+
+               if (result == null) {
+                       return null;
+               }
+
+               @SuppressWarnings("unchecked,rawtypes")
+               TypeSerializer serializer = stateDesc.getSerializer();
+               return KvStateRequestSerializer.serializeValue(result, 
serializer);
+       }
+
+       /**
+        * This should only be used for testing.
+        */
+       @VisibleForTesting
+       public StateTable<K, N, SV> getStateTable() {
+               return stateTable;
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/c6a80725/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/async/AbstractStateTableSnapshot.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/async/AbstractStateTableSnapshot.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/async/AbstractStateTableSnapshot.java
new file mode 100644
index 0000000..8a1d3f3
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/async/AbstractStateTableSnapshot.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.heap.async;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.util.Preconditions;
+
+/**
+ * Abstract class to encapsulate the logic to take snapshots of {@link 
StateTable} implementations and also defines how
+ * the snapshot is written during the serialization phase of checkpointing.
+ */
+@Internal
+abstract class AbstractStateTableSnapshot<K, N, S, T extends StateTable<K, N, 
S>> implements StateTableSnapshot {
+
+       /**
+        * The {@link StateTable} from which this snapshot was created.
+        */
+       final T owningStateTable;
+
+       /**
+        * Creates a new {@link AbstractStateTableSnapshot} for and owned by 
the given table.
+        *
+        * @param owningStateTable the {@link StateTable} for which this object 
represents a snapshot.
+        */
+       AbstractStateTableSnapshot(T owningStateTable) {
+               this.owningStateTable = 
Preconditions.checkNotNull(owningStateTable);
+       }
+
+       /**
+        * Optional hook to release resources for this snapshot at the end of 
its lifecycle.
+        */
+       @Override
+       public void release() {
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/c6a80725/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/async/AsyncHeapKeyedStateBackend.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/async/AsyncHeapKeyedStateBackend.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/async/AsyncHeapKeyedStateBackend.java
new file mode 100644
index 0000000..e19ed00
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/async/AsyncHeapKeyedStateBackend.java
@@ -0,0 +1,433 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.heap.async;
+
+import org.apache.commons.collections.map.HashedMap;
+import org.apache.commons.io.IOUtils;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.state.FoldingState;
+import org.apache.flink.api.common.state.FoldingStateDescriptor;
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.state.ReducingState;
+import org.apache.flink.api.common.state.ReducingStateDescriptor;
+import org.apache.flink.api.common.state.StateDescriptor;
+import org.apache.flink.api.common.state.ValueState;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.core.fs.FSDataInputStream;
+import org.apache.flink.core.memory.DataInputViewStreamWrapper;
+import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+import org.apache.flink.migration.MigrationUtil;
+import org.apache.flink.runtime.io.async.AbstractAsyncIOCallable;
+import org.apache.flink.runtime.io.async.AsyncStoppableTaskWithCallback;
+import org.apache.flink.runtime.query.TaskKvStateRegistry;
+import org.apache.flink.runtime.state.AbstractKeyedStateBackend;
+import org.apache.flink.runtime.state.ArrayListSerializer;
+import org.apache.flink.runtime.state.CheckpointStreamFactory;
+import org.apache.flink.runtime.state.DoneFuture;
+import org.apache.flink.runtime.state.KeyGroupRange;
+import org.apache.flink.runtime.state.KeyGroupRangeOffsets;
+import org.apache.flink.runtime.state.KeyGroupsStateHandle;
+import org.apache.flink.runtime.state.KeyedBackendSerializationProxy;
+import org.apache.flink.runtime.state.RegisteredBackendStateMetaInfo;
+import org.apache.flink.runtime.state.StreamStateHandle;
+import org.apache.flink.util.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.RunnableFuture;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+/**
+ * A {@link AbstractKeyedStateBackend} that keeps state on the Java Heap and 
will serialize state to
+ * streams provided by a {@link CheckpointStreamFactory} upon
+ * checkpointing.
+ *
+ * @param <K> The key by which state is keyed.
+ */
+public class AsyncHeapKeyedStateBackend<K> extends 
AbstractKeyedStateBackend<K> {
+
+       private static final Logger LOG = 
LoggerFactory.getLogger(AsyncHeapKeyedStateBackend.class);
+
+       /**
+        * Map of state tables that stores all state of key/value states. We 
store it centrally so
+        * that we can easily checkpoint/restore it.
+        *
+        * <p>The actual parameters of StateTable are {@code 
StateTable<NamespaceT, Map<KeyT, StateT>>}
+        * but we can't put them here because different key/value states with 
different types and
+        * namespace types share this central list of tables.
+        */
+       private final HashMap<String, StateTable<K, ?, ?>> stateTables = new 
HashMap<>();
+
+       public AsyncHeapKeyedStateBackend(
+                       TaskKvStateRegistry kvStateRegistry,
+                       TypeSerializer<K> keySerializer,
+                       ClassLoader userCodeClassLoader,
+                       int numberOfKeyGroups,
+                       KeyGroupRange keyGroupRange) {
+
+               super(kvStateRegistry, keySerializer, userCodeClassLoader, 
numberOfKeyGroups, keyGroupRange);
+               LOG.info("Initializing heap keyed state backend with stream 
factory.");
+       }
+
+       // 
------------------------------------------------------------------------
+       //  state backend operations
+       // 
------------------------------------------------------------------------
+
+       private <N, V> StateTable<K, N, V> tryRegisterStateTable(
+                       TypeSerializer<N> namespaceSerializer, 
StateDescriptor<?, V> stateDesc) {
+
+               return tryRegisterStateTable(
+                               stateDesc.getName(), stateDesc.getType(),
+                               namespaceSerializer, stateDesc.getSerializer());
+       }
+
+       private <N, V> StateTable<K, N, V> tryRegisterStateTable(
+                       String stateName,
+                       StateDescriptor.Type stateType,
+                       TypeSerializer<N> namespaceSerializer,
+                       TypeSerializer<V> valueSerializer) {
+
+               final RegisteredBackendStateMetaInfo<N, V> newMetaInfo =
+                               new RegisteredBackendStateMetaInfo<>(stateType, 
stateName, namespaceSerializer, valueSerializer);
+
+               @SuppressWarnings("unchecked")
+               StateTable<K, N, V> stateTable = (StateTable<K, N, V>) 
stateTables.get(stateName);
+
+               if (stateTable == null) {
+                       stateTable = newStateTable(newMetaInfo);
+                       stateTables.put(stateName, stateTable);
+               } else {
+                       if 
(!newMetaInfo.isCompatibleWith(stateTable.getMetaInfo())) {
+                               throw new RuntimeException("Trying to access 
state using incompatible meta info, was " +
+                                               stateTable.getMetaInfo() + " 
trying access with " + newMetaInfo);
+                       }
+                       stateTable.setMetaInfo(newMetaInfo);
+               }
+               return stateTable;
+       }
+
+       private boolean hasRegisteredState() {
+               return !stateTables.isEmpty();
+       }
+
+       @Override
+       public <N, V> ValueState<V> createValueState(
+               TypeSerializer<N> namespaceSerializer,
+               ValueStateDescriptor<V> stateDesc) throws Exception {
+
+               StateTable<K, N, V> stateTable = 
tryRegisterStateTable(namespaceSerializer, stateDesc);
+               return new HeapValueState<>(stateDesc, stateTable, 
keySerializer, namespaceSerializer);
+       }
+
+       @Override
+       public <N, T> ListState<T> createListState(
+               TypeSerializer<N> namespaceSerializer,
+               ListStateDescriptor<T> stateDesc) throws Exception {
+
+               // the list state does some manual mapping, because the state 
is typed to the generic
+               // 'List' interface, but we want to use an implementation typed 
to ArrayList
+               // using a more specialized implementation opens up runtime 
optimizations
+
+               StateTable<K, N, ArrayList<T>> stateTable = 
tryRegisterStateTable(
+                       stateDesc.getName(),
+                       stateDesc.getType(),
+                       namespaceSerializer,
+                       new ArrayListSerializer<T>(stateDesc.getSerializer()));
+
+               return new HeapListState<>(stateDesc, stateTable, 
keySerializer, namespaceSerializer);
+       }
+
+       @Override
+       public <N, T> ReducingState<T> createReducingState(
+               TypeSerializer<N> namespaceSerializer,
+               ReducingStateDescriptor<T> stateDesc) throws Exception {
+
+               StateTable<K, N, T> stateTable = 
tryRegisterStateTable(namespaceSerializer, stateDesc);
+               return new HeapReducingState<>(stateDesc, stateTable, 
keySerializer, namespaceSerializer);
+       }
+
+       @Override
+       public <N, T, ACC> FoldingState<T, ACC> createFoldingState(
+               TypeSerializer<N> namespaceSerializer,
+               FoldingStateDescriptor<T, ACC> stateDesc) throws Exception {
+
+               StateTable<K, N, ACC> stateTable = 
tryRegisterStateTable(namespaceSerializer, stateDesc);
+               return new HeapFoldingState<>(stateDesc, stateTable, 
keySerializer, namespaceSerializer);
+       }
+
+       @Override
+       @SuppressWarnings("unchecked")
+       public  RunnableFuture<KeyGroupsStateHandle> snapshot(
+                       final long checkpointId,
+                       final long timestamp,
+                       final CheckpointStreamFactory streamFactory) throws 
Exception {
+
+               if (!hasRegisteredState()) {
+                       return DoneFuture.nullValue();
+               }
+
+               long syncStartTime = System.currentTimeMillis();
+
+               Preconditions.checkState(stateTables.size() <= Short.MAX_VALUE,
+                               "Too many KV-States: " + stateTables.size() +
+                                               ". Currently at most " + 
Short.MAX_VALUE + " states are supported");
+
+               List<KeyedBackendSerializationProxy.StateMetaInfo<?, ?>> 
metaInfoProxyList = new ArrayList<>(stateTables.size());
+
+               final Map<String, Integer> kVStateToId = new 
HashMap<>(stateTables.size());
+
+               final Map<StateTable<K, ?, ?>, StateTableSnapshot> 
cowStateStableSnapshots = new HashedMap(stateTables.size());
+
+               for (Map.Entry<String, StateTable<K, ?, ?>> kvState : 
stateTables.entrySet()) {
+                       RegisteredBackendStateMetaInfo<?, ?> metaInfo = 
kvState.getValue().getMetaInfo();
+                       KeyedBackendSerializationProxy.StateMetaInfo<?, ?> 
metaInfoProxy = new KeyedBackendSerializationProxy.StateMetaInfo(
+                                       metaInfo.getStateType(),
+                                       metaInfo.getName(),
+                                       metaInfo.getNamespaceSerializer(),
+                                       metaInfo.getStateSerializer());
+
+                       metaInfoProxyList.add(metaInfoProxy);
+                       kVStateToId.put(kvState.getKey(), kVStateToId.size());
+                       StateTable<K, ?, ?> stateTable = kvState.getValue();
+                       if (null != stateTable) {
+                               cowStateStableSnapshots.put(stateTable, 
stateTable.createSnapshot());
+                       }
+               }
+
+               final KeyedBackendSerializationProxy serializationProxy =
+                               new 
KeyedBackendSerializationProxy(keySerializer, metaInfoProxyList);
+
+               //--------------------------------------------------- this 
becomes the end of sync part
+
+               // implementation of the async IO operation, based on FutureTask
+               final AbstractAsyncIOCallable<KeyGroupsStateHandle, 
CheckpointStreamFactory.CheckpointStateOutputStream> ioCallable =
+                               new 
AbstractAsyncIOCallable<KeyGroupsStateHandle, 
CheckpointStreamFactory.CheckpointStateOutputStream>() {
+
+                                       AtomicBoolean open = new 
AtomicBoolean(false);
+
+                                       @Override
+                                       public 
CheckpointStreamFactory.CheckpointStateOutputStream openIOHandle() throws 
Exception {
+                                               if (open.compareAndSet(false, 
true)) {
+                                                       
CheckpointStreamFactory.CheckpointStateOutputStream stream =
+                                                                       
streamFactory.createCheckpointStateOutputStream(checkpointId, timestamp);
+                                                       try {
+                                                               
cancelStreamRegistry.registerClosable(stream);
+                                                               return stream;
+                                                       } catch (Exception ex) {
+                                                               open.set(false);
+                                                               throw ex;
+                                                       }
+                                               } else {
+                                                       throw new 
IOException("Operation already opened.");
+                                               }
+                                       }
+
+                                       @Override
+                                       public KeyGroupsStateHandle 
performOperation() throws Exception {
+                                               long asyncStartTime = 
System.currentTimeMillis();
+                                               
CheckpointStreamFactory.CheckpointStateOutputStream stream = getIoHandle();
+                                               DataOutputViewStreamWrapper 
outView = new DataOutputViewStreamWrapper(stream);
+                                               
serializationProxy.write(outView);
+
+                                               long[] keyGroupRangeOffsets = 
new long[keyGroupRange.getNumberOfKeyGroups()];
+
+                                               for (int keyGroupPos = 0; 
keyGroupPos < keyGroupRange.getNumberOfKeyGroups(); ++keyGroupPos) {
+                                                       int keyGroupId = 
keyGroupRange.getKeyGroupId(keyGroupPos);
+                                                       
keyGroupRangeOffsets[keyGroupPos] = stream.getPos();
+                                                       
outView.writeInt(keyGroupId);
+
+                                                       for (Map.Entry<String, 
StateTable<K, ?, ?>> kvState : stateTables.entrySet()) {
+                                                               
outView.writeShort(kVStateToId.get(kvState.getKey()));
+                                                               
cowStateStableSnapshots.get(kvState.getValue()).writeMappingsInKeyGroup(outView,
 keyGroupId);
+                                                       }
+                                               }
+
+                                               if (open.compareAndSet(true, 
false)) {
+                                                       StreamStateHandle 
streamStateHandle = stream.closeAndGetHandle();
+                                                       KeyGroupRangeOffsets 
offsets = new KeyGroupRangeOffsets(keyGroupRange, keyGroupRangeOffsets);
+                                                       final 
KeyGroupsStateHandle keyGroupsStateHandle = new KeyGroupsStateHandle(offsets, 
streamStateHandle);
+
+                                                       LOG.info("Heap backend 
snapshot ({}, asynchronous part) in thread {} took {} ms.",
+                                                               streamFactory, 
Thread.currentThread(), (System.currentTimeMillis() - asyncStartTime));
+
+                                                       return 
keyGroupsStateHandle;
+                                               } else {
+                                                       throw new 
IOException("Checkpoint stream already closed.");
+                                               }
+                                       }
+
+                                       @Override
+                                       public void done(boolean canceled) {
+                                               if (open.compareAndSet(true, 
false)) {
+                                                       
CheckpointStreamFactory.CheckpointStateOutputStream stream = getIoHandle();
+                                                       if (null != stream) {
+                                                               
cancelStreamRegistry.unregisterClosable(stream);
+                                                               
IOUtils.closeQuietly(stream);
+                                                       }
+                                               }
+                                               for (StateTableSnapshot 
snapshot : cowStateStableSnapshots.values()) {
+                                                       snapshot.release();
+                                               }
+                                       }
+                               };
+
+               AsyncStoppableTaskWithCallback<KeyGroupsStateHandle> task = 
AsyncStoppableTaskWithCallback.from(ioCallable);
+
+               LOG.info("Heap backend snapshot (" + streamFactory + ", 
synchronous part) in thread " +
+                               Thread.currentThread() + " took " + 
(System.currentTimeMillis() - syncStartTime) + " ms.");
+
+               return task;
+       }
+
+       @SuppressWarnings("deprecation")
+       @Override
+       public void restore(Collection<KeyGroupsStateHandle> restoredState) 
throws Exception {
+               LOG.info("Initializing heap keyed state backend from 
snapshot.");
+
+               if (LOG.isDebugEnabled()) {
+                       LOG.debug("Restoring snapshot from state handles: {}.", 
restoredState);
+               }
+
+               if (MigrationUtil.isOldSavepointKeyedState(restoredState)) {
+                       throw new UnsupportedOperationException(
+                               "This async.HeapKeyedStateBackend does not 
support restore from old savepoints.");
+               } else {
+                       restorePartitionedState(restoredState);
+               }
+       }
+
+       @SuppressWarnings({"unchecked"})
+       private void restorePartitionedState(Collection<KeyGroupsStateHandle> 
state) throws Exception {
+
+               final Map<Integer, String> kvStatesById = new HashMap<>();
+               int numRegisteredKvStates = 0;
+               stateTables.clear();
+
+               for (KeyGroupsStateHandle keyGroupsHandle : state) {
+
+                       if (keyGroupsHandle == null) {
+                               continue;
+                       }
+
+                       FSDataInputStream fsDataInputStream = 
keyGroupsHandle.openInputStream();
+                       
cancelStreamRegistry.registerClosable(fsDataInputStream);
+
+                       try {
+                               DataInputViewStreamWrapper inView = new 
DataInputViewStreamWrapper(fsDataInputStream);
+
+                               KeyedBackendSerializationProxy 
serializationProxy =
+                                               new 
KeyedBackendSerializationProxy(userCodeClassLoader);
+
+                               serializationProxy.read(inView);
+
+                               
List<KeyedBackendSerializationProxy.StateMetaInfo<?, ?>> metaInfoList =
+                                               
serializationProxy.getNamedStateSerializationProxies();
+
+                               for 
(KeyedBackendSerializationProxy.StateMetaInfo<?, ?> metaInfoSerializationProxy 
: metaInfoList) {
+
+                                       StateTable<K, ?, ?> stateTable = 
stateTables.get(metaInfoSerializationProxy.getStateName());
+
+                                       //important: only create a new table we 
did not already create it previously
+                                       if (null == stateTable) {
+
+                                               
RegisteredBackendStateMetaInfo<?, ?> registeredBackendStateMetaInfo =
+                                                               new 
RegisteredBackendStateMetaInfo<>(metaInfoSerializationProxy);
+
+                                               stateTable = 
newStateTable(registeredBackendStateMetaInfo);
+                                               
stateTables.put(metaInfoSerializationProxy.getStateName(), stateTable);
+                                               
kvStatesById.put(numRegisteredKvStates, 
metaInfoSerializationProxy.getStateName());
+                                               ++numRegisteredKvStates;
+                                       }
+                               }
+
+                               for (Tuple2<Integer, Long> groupOffset : 
keyGroupsHandle.getGroupRangeOffsets()) {
+                                       int keyGroupIndex = groupOffset.f0;
+                                       long offset = groupOffset.f1;
+                                       fsDataInputStream.seek(offset);
+
+                                       int writtenKeyGroupIndex = 
inView.readInt();
+
+                                       
Preconditions.checkState(writtenKeyGroupIndex == keyGroupIndex,
+                                                       "Unexpected key-group 
in restore.");
+
+                                       for (int i = 0; i < 
metaInfoList.size(); i++) {
+                                               int kvStateId = 
inView.readShort();
+                                               StateTable<K, ?, ?> stateTable 
= stateTables.get(kvStatesById.get(kvStateId));
+
+                                               // Hardcoding 2 as version will 
lead to the right method for the
+                                               // serialization format. Due to 
th backport, we should keep this fix and do
+                                               // not allow restore from a 
different format.
+                                               StateTableByKeyGroupReader 
keyGroupReader =
+                                                               
StateTableByKeyGroupReaders.readerForVersion(
+                                                                               
stateTable,
+                                                                               
2);
+
+                                               
keyGroupReader.readMappingsInKeyGroup(inView, keyGroupIndex);
+                                       }
+                               }
+                       } finally {
+                               
cancelStreamRegistry.unregisterClosable(fsDataInputStream);
+                               IOUtils.closeQuietly(fsDataInputStream);
+                       }
+               }
+       }
+
+       @Override
+       public String toString() {
+               return "HeapKeyedStateBackend";
+       }
+
+       /**
+        * Returns the total number of state entries across all keys/namespaces.
+        */
+       @VisibleForTesting
+       @SuppressWarnings("unchecked")
+       public int numStateEntries() {
+               int sum = 0;
+               for (StateTable<K, ?, ?> stateTable : stateTables.values()) {
+                       sum += stateTable.size();
+               }
+               return sum;
+       }
+
+       /**
+        * Returns the total number of state entries across all keys for the 
given namespace.
+        */
+       @VisibleForTesting
+       public int numStateEntries(Object namespace) {
+               int sum = 0;
+               for (StateTable<K, ?, ?> stateTable : stateTables.values()) {
+                       sum += stateTable.sizeOfNamespace(namespace);
+               }
+               return sum;
+       }
+
+       private <N, V> StateTable<K, N, V> 
newStateTable(RegisteredBackendStateMetaInfo<N, V> newMetaInfo) {
+               return new CopyOnWriteStateTable<>(this, newMetaInfo);
+       }
+}

Reply via email to