[FLINK-5715] Asynchronous snapshots for heap-based keyed state backend

Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/ab014ef9
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/ab014ef9
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/ab014ef9

Branch: refs/heads/master
Commit: ab014ef94e0e9137ac6f8f41dae385ff71e8ba5b
Parents: 30bb958
Author: Stefan Richter <[email protected]>
Authored: Fri Mar 3 10:51:15 2017 +0100
Committer: Stefan Richter <[email protected]>
Committed: Thu Mar 16 18:34:02 2017 +0100

----------------------------------------------------------------------
 .../state/RocksDBKeyedStateBackend.java         |    5 +
 .../state/RocksDBStateBackendTest.java          |   86 +-
 .../java/org/apache/flink/util/MathUtils.java   |   35 +-
 .../filesystem/AbstractFsStateSnapshot.java     |   30 +-
 .../state/memory/AbstractMemStateSnapshot.java  |   40 +-
 .../AbstractMigrationRestoreStrategy.java       |  117 ++
 .../state/memory/MigrationRestoreSnapshot.java  |   32 +
 .../state/AbstractKeyedStateBackend.java        |    7 +
 .../runtime/state/ArrayListSerializer.java      |   14 +-
 .../state/KeyedBackendSerializationProxy.java   |   23 +-
 .../flink/runtime/state/KeyedStateBackend.java  |   28 +-
 .../state/StateTransformationFunction.java      |   23 +
 .../state/filesystem/FsStateBackend.java        |  109 +-
 .../state/heap/AbstractHeapMergingState.java    |   93 +-
 .../runtime/state/heap/AbstractHeapState.java   |   84 +-
 .../state/heap/AbstractStateTableSnapshot.java  |   51 +
 .../state/heap/CopyOnWriteStateTable.java       | 1066 ++++++++++++++++++
 .../heap/CopyOnWriteStateTableSnapshot.java     |  188 +++
 .../state/heap/HeapAggregatingState.java        |   92 +-
 .../runtime/state/heap/HeapFoldingState.java    |   70 +-
 .../state/heap/HeapKeyedStateBackend.java       |  426 +++----
 .../flink/runtime/state/heap/HeapListState.java |   63 +-
 .../flink/runtime/state/heap/HeapMapState.java  |  167 +--
 .../runtime/state/heap/HeapReducingState.java   |   82 +-
 .../runtime/state/heap/HeapValueState.java      |   47 +-
 .../runtime/state/heap/InternalKeyContext.java  |   60 +
 .../state/heap/NestedMapsStateTable.java        |  363 ++++++
 .../flink/runtime/state/heap/StateEntry.java    |   44 +
 .../flink/runtime/state/heap/StateTable.java    |  222 ++--
 .../state/heap/StateTableByKeyGroupReader.java  |   38 +
 .../state/heap/StateTableByKeyGroupReaders.java |  136 +++
 .../runtime/state/heap/StateTableSnapshot.java  |   45 +
 .../state/memory/MemoryStateBackend.java        |   26 +
 .../runtime/query/QueryableStateClientTest.java |    7 +-
 .../message/KvStateRequestSerializerTest.java   |   38 +-
 .../state/AsyncFileStateBackendTest.java        |   27 +
 .../state/AsyncMemoryStateBackendTest.java      |   27 +
 .../runtime/state/FileStateBackendTest.java     |    8 +-
 .../runtime/state/MemoryStateBackendTest.java   |    8 +-
 .../runtime/state/StateBackendTestBase.java     |  216 +++-
 .../state/heap/CopyOnWriteStateTableTest.java   |  486 ++++++++
 .../state/heap/HeapAggregatingStateTest.java    |   21 +-
 ...pKeyedStateBackendSnapshotMigrationTest.java |  173 +++
 .../runtime/state/heap/HeapListStateTest.java   |   17 +-
 .../state/heap/HeapReducingStateTest.java       |   23 +-
 .../state/heap/HeapStateBackendTestBase.java    |   54 +
 .../StateTableSnapshotCompatibilityTest.java    |  118 ++
 .../util/BlockerCheckpointStreamFactory.java    |  118 ++
 .../heap_keyed_statebackend_1_2.snapshot        |  Bin 0 -> 2068 bytes
 .../api/windowing/windows/TimeWindow.java       |    5 +-
 ...tractEventTimeWindowCheckpointingITCase.java |   64 +-
 ...ckendEventTimeWindowCheckpointingITCase.java |   26 +
 ...ckendEventTimeWindowCheckpointingITCase.java |   26 +
 ...ckendEventTimeWindowCheckpointingITCase.java |   20 +
 .../test/state/ManualWindowSpeedITCase.java     |    7 +-
 55 files changed, 4239 insertions(+), 1162 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/ab014ef9/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
----------------------------------------------------------------------
diff --git 
a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
 
b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
index aaccc2f..f585d21 100644
--- 
a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
+++ 
b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
@@ -1219,4 +1219,9 @@ public class RocksDBKeyedStateBackend<K> extends 
AbstractKeyedStateBackend<K> {
                        // expected
                }
        }
+
+       @Override
+       public boolean supportsAsynchronousSnapshots() {
+               return true;
+       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/ab014ef9/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendTest.java
----------------------------------------------------------------------
diff --git 
a/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendTest.java
 
b/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendTest.java
index c7b5c20..708613b 100644
--- 
a/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendTest.java
+++ 
b/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendTest.java
@@ -31,14 +31,13 @@ import org.apache.flink.runtime.execution.Environment;
 import org.apache.flink.runtime.operators.testutils.DummyEnvironment;
 import org.apache.flink.runtime.query.TaskKvStateRegistry;
 import org.apache.flink.runtime.state.AbstractKeyedStateBackend;
-import org.apache.flink.runtime.state.CheckpointStreamFactory;
 import org.apache.flink.runtime.state.KeyGroupRange;
 import org.apache.flink.runtime.state.KeyGroupsStateHandle;
 import org.apache.flink.runtime.state.StateBackendTestBase;
 import org.apache.flink.runtime.state.VoidNamespace;
 import org.apache.flink.runtime.state.VoidNamespaceSerializer;
 import org.apache.flink.runtime.state.filesystem.FsStateBackend;
-import org.apache.flink.runtime.state.memory.MemCheckpointStreamFactory;
+import org.apache.flink.runtime.util.BlockerCheckpointStreamFactory;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.TemporaryFolder;
@@ -364,89 +363,6 @@ public class RocksDBStateBackendTest extends 
StateBackendTestBase<RocksDBStateBa
                assertEquals(null, keyedStateBackend.db);
        }
 
-       static class BlockerCheckpointStreamFactory implements 
CheckpointStreamFactory {
-
-               private final int maxSize;
-               private int afterNumberInvocations;
-               private OneShotLatch blocker;
-               private OneShotLatch waiter;
-
-               MemCheckpointStreamFactory.MemoryCheckpointOutputStream 
lastCreatedStream;
-
-               public MemCheckpointStreamFactory.MemoryCheckpointOutputStream 
getLastCreatedStream() {
-                       return lastCreatedStream;
-               }
-
-               public BlockerCheckpointStreamFactory(int maxSize) {
-                       this.maxSize = maxSize;
-               }
-
-               public void setAfterNumberInvocations(int 
afterNumberInvocations) {
-                       this.afterNumberInvocations = afterNumberInvocations;
-               }
-
-               public void setBlockerLatch(OneShotLatch latch) {
-                       this.blocker = latch;
-               }
-
-               public void setWaiterLatch(OneShotLatch latch) {
-                       this.waiter = latch;
-               }
-
-               @Override
-               public MemCheckpointStreamFactory.MemoryCheckpointOutputStream 
createCheckpointStateOutputStream(long checkpointID, long timestamp) throws 
Exception {
-                       waiter.trigger();
-                       this.lastCreatedStream = new 
MemCheckpointStreamFactory.MemoryCheckpointOutputStream(maxSize) {
-
-                               private int afterNInvocations = 
afterNumberInvocations;
-                               private final OneShotLatch streamBlocker = 
blocker;
-                               private final OneShotLatch streamWaiter = 
waiter;
-
-                               @Override
-                               public void write(int b) throws IOException {
-
-                                       if (afterNInvocations > 0) {
-                                               --afterNInvocations;
-                                       }
-
-                                       if (0 == afterNInvocations && null != 
streamBlocker) {
-                                               try {
-                                                       streamBlocker.await();
-                                               } catch (InterruptedException 
ignored) {
-                                               }
-                                       }
-                                       try {
-                                               super.write(b);
-                                       } catch (IOException ex) {
-                                               if (null != streamWaiter) {
-                                                       streamWaiter.trigger();
-                                               }
-                                               throw ex;
-                                       }
-
-                                       if (0 == afterNInvocations && null != 
streamWaiter) {
-                                               streamWaiter.trigger();
-                                       }
-                               }
-
-                               @Override
-                               public void close() {
-                                       super.close();
-                                       if (null != streamWaiter) {
-                                               streamWaiter.trigger();
-                                       }
-                               }
-                       };
-
-                       return lastCreatedStream;
-               }
-
-               @Override
-               public void close() throws Exception {
-
-               }
-       }
-
        private static class AcceptAllFilter implements IOFileFilter {
                @Override
                public boolean accept(File file) {

http://git-wip-us.apache.org/repos/asf/flink/blob/ab014ef9/flink-core/src/main/java/org/apache/flink/util/MathUtils.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/util/MathUtils.java 
b/flink-core/src/main/java/org/apache/flink/util/MathUtils.java
index 074e8ae..1d84a39 100644
--- a/flink-core/src/main/java/org/apache/flink/util/MathUtils.java
+++ b/flink-core/src/main/java/org/apache/flink/util/MathUtils.java
@@ -140,11 +140,7 @@ public final class MathUtils {
                code = code * 5 + 0xe6546b64;
 
                code ^= 4;
-               code ^= code >>> 16;
-               code *= 0x85ebca6b;
-               code ^= code >>> 13;
-               code *= 0xc2b2ae35;
-               code ^= code >>> 16;
+               code = bitMix(code);
 
                if (code >= 0) {
                        return code;
@@ -172,6 +168,35 @@ public final class MathUtils {
                return x + 1;
        }
 
+       /**
+        * Pseudo-randomly maps a long (64-bit) to an integer (32-bit) using 
some bit-mixing for better distribution.
+        *
+        * @param in the long (64-bit)input.
+        * @return the bit-mixed int (32-bit) output
+        */
+       public static int longToIntWithBitMixing(long in) {
+               in = (in ^ (in >>> 30)) * 0xbf58476d1ce4e5b9L;
+               in = (in ^ (in >>> 27)) * 0x94d049bb133111ebL;
+               in = in ^ (in >>> 31);
+               return (int) in;
+       }
+
+       /**
+        * Bit-mixing for pseudo-randomization of integers (e.g., to guard 
against bad hash functions). Implementation is
+        * from Murmur's 32 bit finalizer.
+        *
+        * @param in the input value
+        * @return the bit-mixed output value
+        */
+       public static int bitMix(int in) {
+               in ^= in >>> 16;
+               in *= 0x85ebca6b;
+               in ^= in >>> 13;
+               in *= 0xc2b2ae35;
+               in ^= in >>> 16;
+               return in;
+       }
+
        // 
============================================================================================
        
        /**

http://git-wip-us.apache.org/repos/asf/flink/blob/ab014ef9/flink-runtime/src/main/java/org/apache/flink/migration/runtime/state/filesystem/AbstractFsStateSnapshot.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/migration/runtime/state/filesystem/AbstractFsStateSnapshot.java
 
b/flink-runtime/src/main/java/org/apache/flink/migration/runtime/state/filesystem/AbstractFsStateSnapshot.java
index 103c214..a15e49d 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/migration/runtime/state/filesystem/AbstractFsStateSnapshot.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/migration/runtime/state/filesystem/AbstractFsStateSnapshot.java
@@ -21,8 +21,16 @@ package org.apache.flink.migration.runtime.state.filesystem;
 import org.apache.flink.api.common.state.State;
 import org.apache.flink.api.common.state.StateDescriptor;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.core.fs.FSDataInputStream;
+import org.apache.flink.core.fs.FileSystem;
 import org.apache.flink.core.fs.Path;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataInputViewStreamWrapper;
 import org.apache.flink.migration.runtime.state.KvStateSnapshot;
+import 
org.apache.flink.migration.runtime.state.memory.AbstractMigrationRestoreStrategy;
+import 
org.apache.flink.migration.runtime.state.memory.MigrationRestoreSnapshot;
+import org.apache.flink.runtime.state.heap.HeapKeyedStateBackend;
+import org.apache.flink.runtime.state.heap.StateTable;
 
 import java.io.IOException;
 
@@ -36,7 +44,7 @@ import java.io.IOException;
 @Deprecated
 @SuppressWarnings("deprecation")
 public abstract class AbstractFsStateSnapshot<K, N, SV, S extends State, SD 
extends StateDescriptor<S, ?>> 
-               extends AbstractFileStateHandle implements KvStateSnapshot<K, 
N, S, SD> {
+               extends AbstractFileStateHandle implements KvStateSnapshot<K, 
N, S, SD>, MigrationRestoreSnapshot<K, N, SV> {
 
        private static final long serialVersionUID = 1L;
 
@@ -85,4 +93,24 @@ public abstract class AbstractFsStateSnapshot<K, N, SV, S 
extends State, SD exte
        public SD getStateDesc() {
                return stateDesc;
        }
+
+       @Override
+       @SuppressWarnings("unchecked")
+       public StateTable<K, N, SV> deserialize(
+                       String stateName,
+                       HeapKeyedStateBackend<K> stateBackend) throws 
IOException {
+
+               final FileSystem fs = getFilePath().getFileSystem();
+               try (FSDataInputStream inStream = fs.open(getFilePath())) {
+                       final DataInputViewStreamWrapper inView = new 
DataInputViewStreamWrapper(inStream);
+                       AbstractMigrationRestoreStrategy<K, N, SV> 
restoreStrategy =
+                                       new AbstractMigrationRestoreStrategy<K, 
N, SV>(keySerializer, namespaceSerializer, stateSerializer) {
+                                               @Override
+                                               protected DataInputView 
openDataInputView() throws IOException {
+                                                       return inView;
+                                               }
+                                       };
+                       return restoreStrategy.deserialize(stateName, 
stateBackend);
+               }
+       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/ab014ef9/flink-runtime/src/main/java/org/apache/flink/migration/runtime/state/memory/AbstractMemStateSnapshot.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/migration/runtime/state/memory/AbstractMemStateSnapshot.java
 
b/flink-runtime/src/main/java/org/apache/flink/migration/runtime/state/memory/AbstractMemStateSnapshot.java
index 6056578..ff86f7c 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/migration/runtime/state/memory/AbstractMemStateSnapshot.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/migration/runtime/state/memory/AbstractMemStateSnapshot.java
@@ -21,17 +21,18 @@ package org.apache.flink.migration.runtime.state.memory;
 import org.apache.flink.api.common.state.State;
 import org.apache.flink.api.common.state.StateDescriptor;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.migration.runtime.state.KvStateSnapshot;
+import org.apache.flink.runtime.state.heap.HeapKeyedStateBackend;
+import org.apache.flink.runtime.state.heap.StateTable;
 import org.apache.flink.runtime.util.DataInputDeserializer;
 
 import java.io.IOException;
-import java.util.HashMap;
-import java.util.Map;
 
 @Deprecated
 @SuppressWarnings("deprecation")
 public abstract class AbstractMemStateSnapshot<K, N, SV, S extends State, SD 
extends StateDescriptor<S, ?>> 
-               implements KvStateSnapshot<K, N, S, SD> {
+               implements KvStateSnapshot<K, N, S, SD>, 
MigrationRestoreSnapshot<K, N, SV> {
 
        private static final long serialVersionUID = 1L;
 
@@ -73,24 +74,21 @@ public abstract class AbstractMemStateSnapshot<K, N, SV, S 
extends State, SD ext
                this.data = data;
        }
 
-       public HashMap<N, Map<K, SV>> deserialize() throws IOException {
-               DataInputDeserializer inView = new DataInputDeserializer(data, 
0, data.length);
-
-               final int numKeys = inView.readInt();
-               HashMap<N, Map<K, SV>> stateMap = new HashMap<>(numKeys);
-
-               for (int i = 0; i < numKeys && !closed; i++) {
-                       N namespace = namespaceSerializer.deserialize(inView);
-                       final int numValues = inView.readInt();
-                       Map<K, SV> namespaceMap = new HashMap<>(numValues);
-                       stateMap.put(namespace, namespaceMap);
-                       for (int j = 0; j < numValues; j++) {
-                               K key = keySerializer.deserialize(inView);
-                               SV value = stateSerializer.deserialize(inView);
-                               namespaceMap.put(key, value);
-                       }
-               }
-               return stateMap;
+       @Override
+       @SuppressWarnings("unchecked")
+       public StateTable<K, N, SV> deserialize(
+                       String stateName,
+                       HeapKeyedStateBackend<K> stateBackend) throws 
IOException {
+
+               final DataInputDeserializer inView = new 
DataInputDeserializer(data, 0, data.length);
+               AbstractMigrationRestoreStrategy<K, N, SV> restoreStrategy =
+                               new AbstractMigrationRestoreStrategy<K, N, 
SV>(keySerializer, namespaceSerializer, stateSerializer) {
+                                       @Override
+                                       protected DataInputView 
openDataInputView() throws IOException {
+                                               return inView;
+                                       }
+                               };
+               return restoreStrategy.deserialize(stateName, stateBackend);
        }
 
        /**

http://git-wip-us.apache.org/repos/asf/flink/blob/ab014ef9/flink-runtime/src/main/java/org/apache/flink/migration/runtime/state/memory/AbstractMigrationRestoreStrategy.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/migration/runtime/state/memory/AbstractMigrationRestoreStrategy.java
 
b/flink-runtime/src/main/java/org/apache/flink/migration/runtime/state/memory/AbstractMigrationRestoreStrategy.java
new file mode 100644
index 0000000..e572619
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/migration/runtime/state/memory/AbstractMigrationRestoreStrategy.java
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.migration.runtime.state.memory;
+
+import org.apache.flink.api.common.state.StateDescriptor;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.base.VoidSerializer;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.runtime.state.KeyGroupRange;
+import org.apache.flink.runtime.state.RegisteredBackendStateMetaInfo;
+import org.apache.flink.runtime.state.VoidNamespace;
+import org.apache.flink.runtime.state.VoidNamespaceSerializer;
+import org.apache.flink.runtime.state.heap.HeapKeyedStateBackend;
+import org.apache.flink.runtime.state.heap.StateTable;
+import org.apache.flink.util.Preconditions;
+
+import java.io.IOException;
+
+/**
+ * This class outlines the general strategy to restore from migration states.
+ *
+ * @param <K> type of key.
+ * @param <N> type of namespace.
+ * @param <S> type of state.
+ */
+@Deprecated
+public abstract class AbstractMigrationRestoreStrategy<K, N, S> implements 
MigrationRestoreSnapshot<K, N, S> {
+
+       /**
+        * Key Serializer
+        */
+       protected final TypeSerializer<K> keySerializer;
+
+       /**
+        * Namespace Serializer
+        */
+       protected final TypeSerializer<N> namespaceSerializer;
+
+       /**
+        * Serializer for the state value
+        */
+       protected final TypeSerializer<S> stateSerializer;
+
+       public AbstractMigrationRestoreStrategy(
+                       TypeSerializer<K> keySerializer,
+                       TypeSerializer<N> namespaceSerializer,
+                       TypeSerializer<S> stateSerializer) {
+
+               this.keySerializer = Preconditions.checkNotNull(keySerializer);
+               this.namespaceSerializer = 
Preconditions.checkNotNull(namespaceSerializer);
+               this.stateSerializer = 
Preconditions.checkNotNull(stateSerializer);
+       }
+
+       @Override
+       public StateTable<K, N, S> deserialize(String stateName, 
HeapKeyedStateBackend<K> stateBackend) throws IOException {
+
+               Preconditions.checkNotNull(stateName, "State name is null. 
Cannot deserialize snapshot.");
+               Preconditions.checkNotNull(stateBackend, "State backend is 
null. Cannot deserialize snapshot.");
+
+               final KeyGroupRange keyGroupRange = 
stateBackend.getKeyGroupRange();
+               Preconditions.checkState(1 == 
keyGroupRange.getNumberOfKeyGroups(),
+                               "Unexpected number of key-groups for restoring 
from Flink 1.1");
+
+               TypeSerializer<N> patchedNamespaceSerializer = 
this.namespaceSerializer;
+
+               if (patchedNamespaceSerializer instanceof VoidSerializer) {
+                       patchedNamespaceSerializer = (TypeSerializer<N>) 
VoidNamespaceSerializer.INSTANCE;
+               }
+
+               RegisteredBackendStateMetaInfo<N, S> 
registeredBackendStateMetaInfo =
+                               new RegisteredBackendStateMetaInfo<>(
+                                               StateDescriptor.Type.UNKNOWN,
+                                               stateName,
+                                               patchedNamespaceSerializer,
+                                               stateSerializer);
+
+               final StateTable<K, N, S> stateTable = 
stateBackend.newStateTable(registeredBackendStateMetaInfo);
+               final DataInputView inView = openDataInputView();
+               final int keyGroup = keyGroupRange.getStartKeyGroup();
+               final int numNamespaces = inView.readInt();
+
+               for (int i = 0; i < numNamespaces; i++) {
+                       N namespace = namespaceSerializer.deserialize(inView);
+                       if (null == namespace) {
+                               namespace = (N) VoidNamespace.INSTANCE;
+                       }
+                       final int numKV = inView.readInt();
+                       for (int j = 0; j < numKV; j++) {
+                               K key = keySerializer.deserialize(inView);
+                               S value = stateSerializer.deserialize(inView);
+                               stateTable.put(key, keyGroup, namespace, value);
+                       }
+               }
+               return stateTable;
+       }
+
+       /**
+        * Different state handles require different code to end up with a 
{@link DataInputView}.
+        */
+       protected abstract DataInputView openDataInputView() throws IOException;
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/ab014ef9/flink-runtime/src/main/java/org/apache/flink/migration/runtime/state/memory/MigrationRestoreSnapshot.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/migration/runtime/state/memory/MigrationRestoreSnapshot.java
 
b/flink-runtime/src/main/java/org/apache/flink/migration/runtime/state/memory/MigrationRestoreSnapshot.java
new file mode 100644
index 0000000..ea529db
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/migration/runtime/state/memory/MigrationRestoreSnapshot.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.migration.runtime.state.memory;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.runtime.state.heap.HeapKeyedStateBackend;
+import org.apache.flink.runtime.state.heap.StateTable;
+import org.apache.flink.util.Migration;
+
+import java.io.IOException;
+
+@Deprecated
+@Internal
+public interface MigrationRestoreSnapshot<K, N, S> extends Migration {
+       StateTable<K, N, S> deserialize(String stateName, 
HeapKeyedStateBackend<K> stateBackend) throws IOException;
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/ab014ef9/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackend.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackend.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackend.java
index aba00f3..1f2f4a2 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackend.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackend.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.runtime.state;
 
+import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.state.AggregatingState;
 import org.apache.flink.api.common.state.AggregatingStateDescriptor;
@@ -254,6 +255,7 @@ public abstract class AbstractKeyedStateBackend<K>
        /**
         * @see KeyedStateBackend
         */
+       @Override
        public KeyGroupRange getKeyGroupRange() {
                return keyGroupRange;
        }
@@ -382,4 +384,9 @@ public abstract class AbstractKeyedStateBackend<K>
        public void close() throws IOException {
                cancelStreamRegistry.close();
        }
+
+       @VisibleForTesting
+       public boolean supportsAsynchronousSnapshots() {
+               return false;
+       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/ab014ef9/flink-runtime/src/main/java/org/apache/flink/runtime/state/ArrayListSerializer.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/ArrayListSerializer.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/ArrayListSerializer.java
index f5a6405..0badb41 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/ArrayListSerializer.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/ArrayListSerializer.java
@@ -57,11 +57,17 @@ final public class ArrayListSerializer<T> extends 
TypeSerializer<ArrayList<T>> {
 
        @Override
        public ArrayList<T> copy(ArrayList<T> from) {
-               ArrayList<T> newList = new ArrayList<>(from.size());
-               for (int i = 0; i < from.size(); i++) {
-                       newList.add(elementSerializer.copy(from.get(i)));
+               if (elementSerializer.isImmutableType()) {
+                       // fast track using memcopy for immutable types
+                       return new ArrayList<>(from);
+               } else {
+                       // element-wise deep copy for mutable types
+                       ArrayList<T> newList = new ArrayList<>(from.size());
+                       for (int i = 0; i < from.size(); i++) {
+                               
newList.add(elementSerializer.copy(from.get(i)));
+                       }
+                       return newList;
                }
-               return newList;
        }
 
        @Override

http://git-wip-us.apache.org/repos/asf/flink/blob/ab014ef9/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyedBackendSerializationProxy.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyedBackendSerializationProxy.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyedBackendSerializationProxy.java
index dbee6cb..5661c38 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyedBackendSerializationProxy.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyedBackendSerializationProxy.java
@@ -22,6 +22,7 @@ import org.apache.flink.api.common.state.StateDescriptor;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.common.typeutils.TypeSerializerSerializationProxy;
 import org.apache.flink.core.io.IOReadableWritable;
+import org.apache.flink.core.io.VersionMismatchException;
 import org.apache.flink.core.io.VersionedIOReadableWritable;
 import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataOutputView;
@@ -37,11 +38,12 @@ import java.util.List;
  */
 public class KeyedBackendSerializationProxy extends 
VersionedIOReadableWritable {
 
-       private static final int VERSION = 1;
+       public static final int VERSION = 2;
 
        private TypeSerializerSerializationProxy<?> keySerializerProxy;
        private List<StateMetaInfo<?, ?>> namedStateSerializationProxies;
 
+       private int restoredVersion;
        private ClassLoader userCodeClassLoader;
 
        public KeyedBackendSerializationProxy(ClassLoader userCodeClassLoader) {
@@ -51,6 +53,7 @@ public class KeyedBackendSerializationProxy extends 
VersionedIOReadableWritable
        public KeyedBackendSerializationProxy(TypeSerializer<?> keySerializer, 
List<StateMetaInfo<?, ?>> namedStateSerializationProxies) {
                this.keySerializerProxy = new 
TypeSerializerSerializationProxy<>(Preconditions.checkNotNull(keySerializer));
                this.namedStateSerializationProxies = 
Preconditions.checkNotNull(namedStateSerializationProxies);
+               this.restoredVersion = VERSION;
                
Preconditions.checkArgument(namedStateSerializationProxies.size() <= 
Short.MAX_VALUE);
        }
 
@@ -67,6 +70,22 @@ public class KeyedBackendSerializationProxy extends 
VersionedIOReadableWritable
                return VERSION;
        }
 
+       public int getRestoredVersion() {
+               return restoredVersion;
+       }
+
+       @Override
+       protected void resolveVersionRead(int foundVersion) throws 
VersionMismatchException {
+               super.resolveVersionRead(foundVersion);
+               this.restoredVersion = foundVersion;
+       }
+
+       @Override
+       public boolean isCompatibleVersion(int version) {
+               // we are compatible with version 2 (Flink 1.3.x) and version 1 
(Flink 1.2.x)
+               return super.isCompatibleVersion(version) || version == 1;
+       }
+
        @Override
        public void write(DataOutputView out) throws IOException {
                super.write(out);
@@ -96,7 +115,7 @@ public class KeyedBackendSerializationProxy extends 
VersionedIOReadableWritable
                }
        }
 
-//----------------------------------------------------------------------------------------------------------------------
+       
//----------------------------------------------------------------------------------------------------------------------
 
        /**
         * This is the serialization proxy for {@link 
RegisteredBackendStateMetaInfo} for a single registered state in a

http://git-wip-us.apache.org/repos/asf/flink/blob/ab014ef9/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyedStateBackend.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyedStateBackend.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyedStateBackend.java
index 15e0491..09e27e7 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyedStateBackend.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyedStateBackend.java
@@ -21,13 +21,14 @@ package org.apache.flink.runtime.state;
 import org.apache.flink.api.common.state.State;
 import org.apache.flink.api.common.state.StateDescriptor;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.runtime.state.heap.InternalKeyContext;
 
 /**
  * A keyed state backend provides methods for managing keyed state.
  *
  * @param <K> The key by which state is keyed.
  */
-public interface KeyedStateBackend<K> {
+public interface KeyedStateBackend<K> extends InternalKeyContext<K> {
 
        /**
         * Sets the current key that is used for partitioned state.
@@ -36,31 +37,6 @@ public interface KeyedStateBackend<K> {
        void setCurrentKey(K newKey);
 
        /**
-        * Used by states to access the current key.
-        */
-       K getCurrentKey();
-
-       /**
-        * Returns the key-group to which the current key belongs.
-        */
-       int getCurrentKeyGroupIndex();
-
-       /**
-        * Returns the number of key-groups aka max parallelism.
-        */
-       int getNumberOfKeyGroups();
-
-       /**
-        * Returns the key groups for this backend.
-        */
-       KeyGroupsList getKeyGroupRange();
-
-       /**
-        * {@link TypeSerializer} for the state backend key type.
-        */
-       TypeSerializer<K> getKeySerializer();
-
-       /**
         * Creates or retrieves a keyed state backed by this state backend.
         *
         * @param namespaceSerializer The serializer used for the namespace 
type of the state

http://git-wip-us.apache.org/repos/asf/flink/blob/ab014ef9/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateTransformationFunction.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateTransformationFunction.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateTransformationFunction.java
new file mode 100644
index 0000000..9e12ee5
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateTransformationFunction.java
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state;
+
+public interface StateTransformationFunction<S, T> {
+       S apply(S previousState, T value) throws Exception;
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/ab014ef9/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsStateBackend.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsStateBackend.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsStateBackend.java
index 2e9198f..e27712c 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsStateBackend.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsStateBackend.java
@@ -66,7 +66,10 @@ public class FsStateBackend extends AbstractStateBackend {
 
        /** State below this size will be stored as part of the metadata, 
rather than in files */
        private final int fileStateThreshold;
-       
+
+       /** Switch to chose between synchronous and asynchronous snapshots */
+       private final boolean asynchronousSnapshots;
+
        /**
         * Creates a new state backend that stores its checkpoint data in the 
file system and location
         * defined by the given URI.
@@ -99,6 +102,27 @@ public class FsStateBackend extends AbstractStateBackend {
         *
         * @param checkpointDataUri The URI describing the filesystem (scheme 
and optionally authority),
         *                          and the path to the checkpoint data 
directory.
+        * @param asynchronousSnapshots Switch to enable asynchronous snapshots.
+        *
+        * @throws IOException Thrown, if no file system can be found for the 
scheme in the URI.
+        */
+       public FsStateBackend(String checkpointDataUri, boolean 
asynchronousSnapshots) throws IOException {
+               this(new Path(checkpointDataUri), asynchronousSnapshots);
+       }
+
+       /**
+        * Creates a new state backend that stores its checkpoint data in the 
file system and location
+        * defined by the given URI.
+        *
+        * <p>A file system for the file system scheme in the URI (e.g., 
'file://', 'hdfs://', or 'S3://')
+        * must be accessible via {@link FileSystem#get(URI)}.
+        *
+        * <p>For a state backend targeting HDFS, this means that the URI must 
either specify the authority
+        * (host and port), or that the Hadoop configuration that describes 
that information must be in the
+        * classpath.
+        *
+        * @param checkpointDataUri The URI describing the filesystem (scheme 
and optionally authority),
+        *                          and the path to the checkpoint data 
directory.
         * @throws IOException Thrown, if no file system can be found for the 
scheme in the URI.
         */
        public FsStateBackend(Path checkpointDataUri) throws IOException {
@@ -118,10 +142,52 @@ public class FsStateBackend extends AbstractStateBackend {
         *
         * @param checkpointDataUri The URI describing the filesystem (scheme 
and optionally authority),
         *                          and the path to the checkpoint data 
directory.
+        * @param asynchronousSnapshots Switch to enable asynchronous snapshots.
+        *
+        * @throws IOException Thrown, if no file system can be found for the 
scheme in the URI.
+        */
+       public FsStateBackend(Path checkpointDataUri, boolean 
asynchronousSnapshots) throws IOException {
+               this(checkpointDataUri.toUri(), asynchronousSnapshots);
+       }
+
+       /**
+        * Creates a new state backend that stores its checkpoint data in the 
file system and location
+        * defined by the given URI.
+        *
+        * <p>A file system for the file system scheme in the URI (e.g., 
'file://', 'hdfs://', or 'S3://')
+        * must be accessible via {@link FileSystem#get(URI)}.
+        *
+        * <p>For a state backend targeting HDFS, this means that the URI must 
either specify the authority
+        * (host and port), or that the Hadoop configuration that describes 
that information must be in the
+        * classpath.
+        *
+        * @param checkpointDataUri The URI describing the filesystem (scheme 
and optionally authority),
+        *                          and the path to the checkpoint data 
directory.
         * @throws IOException Thrown, if no file system can be found for the 
scheme in the URI.
         */
        public FsStateBackend(URI checkpointDataUri) throws IOException {
-               this(checkpointDataUri, DEFAULT_FILE_STATE_THRESHOLD);
+               this(checkpointDataUri, DEFAULT_FILE_STATE_THRESHOLD, false);
+       }
+
+       /**
+        * Creates a new state backend that stores its checkpoint data in the 
file system and location
+        * defined by the given URI.
+        *
+        * <p>A file system for the file system scheme in the URI (e.g., 
'file://', 'hdfs://', or 'S3://')
+        * must be accessible via {@link FileSystem#get(URI)}.
+        *
+        * <p>For a state backend targeting HDFS, this means that the URI must 
either specify the authority
+        * (host and port), or that the Hadoop configuration that describes 
that information must be in the
+        * classpath.
+        *
+        * @param checkpointDataUri The URI describing the filesystem (scheme 
and optionally authority),
+        *                          and the path to the checkpoint data 
directory.
+        * @param asynchronousSnapshots Switch to enable asynchronous snapshots.
+        *
+        * @throws IOException Thrown, if no file system can be found for the 
scheme in the URI.
+        */
+       public FsStateBackend(URI checkpointDataUri, boolean 
asynchronousSnapshots) throws IOException {
+               this(checkpointDataUri, DEFAULT_FILE_STATE_THRESHOLD, 
asynchronousSnapshots);
        }
 
        /**
@@ -139,17 +205,47 @@ public class FsStateBackend extends AbstractStateBackend {
         *                          and the path to the checkpoint data 
directory.
         * @param fileStateSizeThreshold State up to this size will be stored 
as part of the metadata,
         *                             rather than in files
-        * 
+        *
         * @throws IOException Thrown, if no file system can be found for the 
scheme in the URI.
         * @throws IllegalArgumentException Thrown, if the {@code 
fileStateSizeThreshold} is out of bounds.
         */
        public FsStateBackend(URI checkpointDataUri, int 
fileStateSizeThreshold) throws IOException {
+
+               this(checkpointDataUri, fileStateSizeThreshold, false);
+       }
+
+       /**
+        * Creates a new state backend that stores its checkpoint data in the 
file system and location
+        * defined by the given URI.
+        *
+        * <p>A file system for the file system scheme in the URI (e.g., 
'file://', 'hdfs://', or 'S3://')
+        * must be accessible via {@link FileSystem#get(URI)}.
+        *
+        * <p>For a state backend targeting HDFS, this means that the URI must 
either specify the authority
+        * (host and port), or that the Hadoop configuration that describes 
that information must be in the
+        * classpath.
+        *
+        * @param checkpointDataUri The URI describing the filesystem (scheme 
and optionally authority),
+        *                          and the path to the checkpoint data 
directory.
+        * @param fileStateSizeThreshold State up to this size will be stored 
as part of the metadata,
+        *                             rather than in files
+        * @param asynchronousSnapshots Switch to enable asynchronous snapshots.
+        *
+        * @throws IOException Thrown, if no file system can be found for the 
scheme in the URI.
+        */
+       public FsStateBackend(
+                       URI checkpointDataUri,
+                       int fileStateSizeThreshold,
+                       boolean asynchronousSnapshots) throws IOException {
+
                checkArgument(fileStateSizeThreshold >= 0, "The threshold for 
file state size must be zero or larger.");
-               checkArgument(fileStateSizeThreshold <= 
MAX_FILE_STATE_THRESHOLD, 
+               checkArgument(fileStateSizeThreshold <= 
MAX_FILE_STATE_THRESHOLD,
                                "The threshold for file state size cannot be 
larger than %s", MAX_FILE_STATE_THRESHOLD);
 
                this.fileStateThreshold = fileStateSizeThreshold;
                this.basePath = validateAndNormalizeUri(checkpointDataUri);
+
+               this.asynchronousSnapshots = asynchronousSnapshots;
        }
 
        /**
@@ -166,9 +262,9 @@ public class FsStateBackend extends AbstractStateBackend {
         * Gets the threshold below which state is stored as part of the 
metadata, rather than in files.
         * This threshold ensures that the backend does not create a large 
amount of very small files,
         * where potentially the file pointers are larger than the state itself.
-        * 
+        *
         * <p>By default, this threshold is {@value 
#DEFAULT_FILE_STATE_THRESHOLD}.
-        * 
+        *
         * @return The file size threshold, in bytes.
         */
        public int getMinFileSizeThreshold() {
@@ -209,6 +305,7 @@ public class FsStateBackend extends AbstractStateBackend {
                                env.getUserClassLoader(),
                                numberOfKeyGroups,
                                keyGroupRange,
+                               asynchronousSnapshots,
                                env.getExecutionConfig());
        }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/ab014ef9/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/AbstractHeapMergingState.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/AbstractHeapMergingState.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/AbstractHeapMergingState.java
index 4ac7125..3e76423 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/AbstractHeapMergingState.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/AbstractHeapMergingState.java
@@ -22,18 +22,15 @@ import org.apache.flink.api.common.state.MergingState;
 import org.apache.flink.api.common.state.State;
 import org.apache.flink.api.common.state.StateDescriptor;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.runtime.state.KeyedStateBackend;
+import org.apache.flink.runtime.state.StateTransformationFunction;
 import org.apache.flink.runtime.state.internal.InternalMergingState;
 
 import java.util.Collection;
-import java.util.Map;
-
-import static org.apache.flink.util.Preconditions.checkState;
 
 /**
  * Base class for {@link MergingState} ({@link 
org.apache.flink.runtime.state.internal.InternalMergingState})
  * that is stored on the heap.
- * 
+ *
  * @param <K> The type of the key.
  * @param <N> The type of the namespace.
  * @param <SV> The type of the values in the state.
@@ -45,21 +42,25 @@ public abstract class AbstractHeapMergingState<K, N, IN, 
OUT, SV, S extends Stat
                implements InternalMergingState<N, IN, OUT> {
 
        /**
+        * The merge transformation function that implements the merge logic.
+        */
+       private final MergeTransformation mergeTransformation;
+
+       /**
         * Creates a new key/value state for the given hash map of key/value 
pairs.
         *
-        * @param backend The state backend backing that created this state.
         * @param stateDesc The state identifier for the state. This contains 
name
         *                           and can create a default state value.
         * @param stateTable The state tab;e to use in this kev/value state. 
May contain initial state.
         */
        protected AbstractHeapMergingState(
-                       KeyedStateBackend<K> backend,
                        SD stateDesc,
                        StateTable<K, N, SV> stateTable,
                        TypeSerializer<K> keySerializer,
                        TypeSerializer<N> namespaceSerializer) {
 
-               super(backend, stateDesc, stateTable, keySerializer, 
namespaceSerializer);
+               super(stateDesc, stateTable, keySerializer, 
namespaceSerializer);
+               this.mergeTransformation = new MergeTransformation();
        }
 
        @Override
@@ -68,56 +69,40 @@ public abstract class AbstractHeapMergingState<K, N, IN, 
OUT, SV, S extends Stat
                        return; // nothing to do
                }
 
-               final K key = backend.getCurrentKey();
-               checkState(key != null, "No key set.");
-
-               final Map<N, Map<K, SV>> namespaceMap = 
stateTable.get(backend.getCurrentKeyGroupIndex());
-
-               if (namespaceMap != null) {
-                       SV merged = null;
-
-                       // merge the sources
-                       for (N source : sources) {
-                               Map<K, SV> keysForNamespace = 
namespaceMap.get(source);
-                               if (keysForNamespace != null) {
-                                       // get and remove the next source per 
namespace/key
-                                       SV sourceState = 
keysForNamespace.remove(key);
-
-                                       // if the namespace map became empty, 
remove 
-                                       if (keysForNamespace.isEmpty()) {
-                                               namespaceMap.remove(source);
-                                       }
-
-                                       if (merged != null && sourceState != 
null) {
-                                               merged = mergeState(merged, 
sourceState);
-                                       }
-                                       else if (merged == null) {
-                                               merged = sourceState;
-                                       }
-                               }
-                       }
+               final StateTable<K, N, SV> map = stateTable;
+
+               SV merged = null;
+
+               // merge the sources
+               for (N source : sources) {
 
-                       // merge into the target, if needed
-                       if (merged != null) {
-                               Map<K, SV> keysForTarget = 
namespaceMap.get(target);
-                               if (keysForTarget == null) {
-                                       keysForTarget = createNewMap();
-                                       namespaceMap.put(target, keysForTarget);
-                               }
-                               SV targetState = keysForTarget.get(key);
-
-                               if (targetState != null) {
-                                       targetState = mergeState(targetState, 
merged);
-                               }
-                               else {
-                                       targetState = merged;
-                               }
-                               keysForTarget.put(key, targetState);
+                       // get and remove the next source per namespace/key
+                       SV sourceState = map.removeAndGetOld(source);
+
+                       if (merged != null && sourceState != null) {
+                               merged = mergeState(merged, sourceState);
+                       } else if (merged == null) {
+                               merged = sourceState;
                        }
                }
 
-               // else no entries for that key at all, nothing to do skip
+               // merge into the target, if needed
+               if (merged != null) {
+                       map.transform(target, merged, mergeTransformation);
+               }
        }
 
        protected abstract SV mergeState(SV a, SV b) throws Exception;
-}
+
+       final class MergeTransformation implements 
StateTransformationFunction<SV, SV> {
+
+               @Override
+               public SV apply(SV targetState, SV merged) throws Exception {
+                       if (targetState != null) {
+                               return mergeState(targetState, merged);
+                       } else {
+                               return merged;
+                       }
+               }
+       }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/ab014ef9/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/AbstractHeapState.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/AbstractHeapState.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/AbstractHeapState.java
index 18b71de..7e1123d 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/AbstractHeapState.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/AbstractHeapState.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.runtime.state.heap;
 
+import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.common.state.ListState;
 import org.apache.flink.api.common.state.State;
 import org.apache.flink.api.common.state.StateDescriptor;
@@ -25,18 +26,12 @@ import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.runtime.query.netty.message.KvStateRequestSerializer;
 import org.apache.flink.runtime.state.internal.InternalKvState;
-import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
-import org.apache.flink.runtime.state.KeyedStateBackend;
 import org.apache.flink.util.Preconditions;
 
-import java.util.HashMap;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-
 /**
  * Base class for partitioned {@link ListState} implementations that are 
backed by a regular
  * heap hash map. The concrete implementations define how the state is 
checkpointed.
- * 
+ *
  * @param <K> The type of the key.
  * @param <N> The type of the namespace.
  * @param <SV> The type of the values in the state.
@@ -53,9 +48,7 @@ public abstract class AbstractHeapState<K, N, SV, S extends 
State, SD extends St
        protected final SD stateDesc;
 
        /** The current namespace, which the access methods will refer to. */
-       protected N currentNamespace = null;
-
-       protected final KeyedStateBackend<K> backend;
+       protected N currentNamespace;
 
        protected final TypeSerializer<K> keySerializer;
 
@@ -64,58 +57,28 @@ public abstract class AbstractHeapState<K, N, SV, S extends 
State, SD extends St
        /**
         * Creates a new key/value state for the given hash map of key/value 
pairs.
         *
-        * @param backend The state backend backing that created this state.
         * @param stateDesc The state identifier for the state. This contains 
name
         *                           and can create a default state value.
         * @param stateTable The state tab;e to use in this kev/value state. 
May contain initial state.
         */
        protected AbstractHeapState(
-                       KeyedStateBackend<K> backend,
                        SD stateDesc,
                        StateTable<K, N, SV> stateTable,
                        TypeSerializer<K> keySerializer,
                        TypeSerializer<N> namespaceSerializer) {
 
-               Preconditions.checkNotNull(stateTable, "State table must not be 
null.");
-
-               this.backend = backend;
                this.stateDesc = stateDesc;
-               this.stateTable = stateTable;
+               this.stateTable = Preconditions.checkNotNull(stateTable, "State 
table must not be null.");
                this.keySerializer = keySerializer;
                this.namespaceSerializer = namespaceSerializer;
+               this.currentNamespace = null;
        }
 
        // 
------------------------------------------------------------------------
 
        @Override
        public final void clear() {
-               Preconditions.checkState(currentNamespace != null, "No 
namespace set.");
-               Preconditions.checkState(backend.getCurrentKey() != null, "No 
key set.");
-
-               Map<N, Map<K, SV>> namespaceMap =
-                               
stateTable.get(backend.getCurrentKeyGroupIndex());
-
-               if (namespaceMap == null) {
-                       return;
-               }
-
-               Map<K, SV> keyedMap = namespaceMap.get(currentNamespace);
-
-               if (keyedMap == null) {
-                       return;
-               }
-
-               SV removed = keyedMap.remove(backend.getCurrentKey());
-
-               if (removed == null) {
-                       return;
-               }
-
-               if (!keyedMap.isEmpty()) {
-                       return;
-               }
-
-               namespaceMap.remove(currentNamespace);
+               stateTable.remove(currentNamespace);
        }
 
        @Override
@@ -137,20 +100,7 @@ public abstract class AbstractHeapState<K, N, SV, S 
extends State, SD extends St
                Preconditions.checkState(namespace != null, "No namespace 
given.");
                Preconditions.checkState(key != null, "No key given.");
 
-               Map<N, Map<K, SV>> namespaceMap =
-                               
stateTable.get(KeyGroupRangeAssignment.assignToKeyGroup(key, 
backend.getNumberOfKeyGroups()));
-
-               if (namespaceMap == null) {
-                       return null;
-               }
-
-               Map<K, SV> keyedMap = namespaceMap.get(currentNamespace);
-
-               if (keyedMap == null) {
-                       return null;
-               }
-
-               SV result = keyedMap.get(key);
+               SV result = stateTable.get(key, namespace);
 
                if (result == null) {
                        return null;
@@ -158,30 +108,14 @@ public abstract class AbstractHeapState<K, N, SV, S 
extends State, SD extends St
 
                @SuppressWarnings("unchecked,rawtypes")
                TypeSerializer serializer = stateDesc.getSerializer();
-
                return KvStateRequestSerializer.serializeValue(result, 
serializer);
        }
 
        /**
-        * Creates a new map for use in Heap based state.
-        *
-        * <p>If the state queryable ({@link StateDescriptor#isQueryable()}, 
this
-        * will create a concurrent hash map instead of a regular one.
-        *
-        * @return A new namespace map.
-        */
-       protected <MK, MV> Map<MK, MV> createNewMap() {
-               if (stateDesc.isQueryable()) {
-                       return new ConcurrentHashMap<>();
-               } else {
-                       return new HashMap<>();
-               }
-       }
-
-       /**
         * This should only be used for testing.
         */
+       @VisibleForTesting
        public StateTable<K, N, SV> getStateTable() {
                return stateTable;
        }
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/ab014ef9/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/AbstractStateTableSnapshot.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/AbstractStateTableSnapshot.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/AbstractStateTableSnapshot.java
new file mode 100644
index 0000000..b0d7727
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/AbstractStateTableSnapshot.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.heap;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.util.Preconditions;
+
+/**
+ * Abstract class to encapsulate the logic to take snapshots of {@link 
StateTable} implementations and also defines how
+ * the snapshot is written during the serialization phase of checkpointing.
+ */
+@Internal
+abstract class AbstractStateTableSnapshot<K, N, S, T extends StateTable<K, N, 
S>> implements StateTableSnapshot {
+
+       /**
+        * The {@link StateTable} from which this snapshot was created.
+        */
+       final T owningStateTable;
+
+       /**
+        * Creates a new {@link AbstractStateTableSnapshot} for and owned by 
the given table.
+        *
+        * @param owningStateTable the {@link StateTable} for which this object 
represents a snapshot.
+        */
+       AbstractStateTableSnapshot(T owningStateTable) {
+               this.owningStateTable = 
Preconditions.checkNotNull(owningStateTable);
+       }
+
+       /**
+        * Optional hook to release resources for this snapshot at the end of 
its lifecycle.
+        */
+       @Override
+       public void release() {
+       }
+}
\ No newline at end of file

Reply via email to