[FLINK-6545] [checkpoint] Make incremental checkpoints externalizable

Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/098e46f2
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/098e46f2
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/098e46f2

Branch: refs/heads/master
Commit: 098e46f2d222e8e6c5c27bc7ded40ee642dad104
Parents: efbb41b
Author: Stefan Richter <s.rich...@data-artisans.com>
Authored: Thu May 11 21:04:29 2017 +0200
Committer: Stefan Richter <s.rich...@data-artisans.com>
Committed: Sun May 14 13:49:50 2017 +0200

----------------------------------------------------------------------
 .../RocksDBIncrementalKeyedStateHandle.java     | 241 --------------
 .../state/RocksDBKeyedStateBackend.java         | 155 ++++-----
 .../org/apache/flink/util/StringBasedID.java    |  69 ++++
 .../savepoint/SavepointV2Serializer.java        |  83 ++++-
 .../state/IncrementalKeyedStateHandle.java      | 324 +++++++++++++++++++
 .../state/PlaceholderStreamStateHandle.java     |  88 +++++
 .../flink/runtime/state/SharedStateHandle.java  |  39 ---
 .../runtime/state/SharedStateRegistry.java      |   2 +-
 .../runtime/state/SharedStateRegistryKey.java   |  42 +--
 .../apache/flink/runtime/state/StateHandle.java |  37 ---
 .../flink/runtime/state/StateHandleID.java      |  37 +++
 .../savepoint/CheckpointTestUtils.java          |  87 +++--
 .../savepoint/SavepointV2SerializerTest.java    |   1 -
 .../flink/runtime/state/StateUtilTest.java      |  36 ---
 14 files changed, 736 insertions(+), 505 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/098e46f2/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBIncrementalKeyedStateHandle.java
----------------------------------------------------------------------
diff --git 
a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBIncrementalKeyedStateHandle.java
 
b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBIncrementalKeyedStateHandle.java
deleted file mode 100644
index 961182d..0000000
--- 
a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBIncrementalKeyedStateHandle.java
+++ /dev/null
@@ -1,241 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.contrib.streaming.state;
-
-import org.apache.flink.runtime.state.CompositeStateHandle;
-import org.apache.flink.runtime.state.KeyGroupRange;
-import org.apache.flink.runtime.state.KeyedStateHandle;
-import org.apache.flink.runtime.state.SharedStateRegistry;
-import org.apache.flink.runtime.state.SharedStateRegistryKey;
-import org.apache.flink.runtime.state.StateUtil;
-import org.apache.flink.runtime.state.StreamStateHandle;
-import org.apache.flink.util.Preconditions;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.Map;
-
-/**
- * The handle to states in incremental snapshots taken by {@link 
RocksDBKeyedStateBackend}.
- *
- * The states contained in an incremental snapshot include
- * <ul>
- * <li> New SST state which includes the sst files produced since the last 
completed
- *   checkpoint. These files can be referenced by succeeding checkpoints if the
- *   checkpoint succeeds to complete. </li>
- * <li> Old SST state which includes the sst files materialized in previous
- *   checkpoints. </li>
- * <li> MISC state which include the other files in the RocksDB instance, e.g. 
the
- *   LOG and MANIFEST files. These files are mutable, hence cannot be shared by
- *   other checkpoints. </li>
- * <li> Meta state which includes the information of existing states. </li>
- * </ul>
- */
-public class RocksDBIncrementalKeyedStateHandle implements KeyedStateHandle, 
CompositeStateHandle {
-
-       private static final Logger LOG = 
LoggerFactory.getLogger(RocksDBIncrementalKeyedStateHandle.class);
-
-       private static final long serialVersionUID = -8328808513197388231L;
-
-       private final String operatorIdentifier;
-
-       private final KeyGroupRange keyGroupRange;
-
-       private final long checkpointId;
-
-       private final Map<String, StreamStateHandle> unregisteredSstFiles;
-
-       private final Map<String, StreamStateHandle> registeredSstFiles;
-
-       private final Map<String, StreamStateHandle> miscFiles;
-
-       private final StreamStateHandle metaStateHandle;
-
-       /**
-        * True if the state handle has already registered shared states.
-        *
-        * Once the shared states are registered, it's the {@link 
SharedStateRegistry}'s
-        * responsibility to maintain the shared states. But in the cases where 
the
-        * state handle is discarded before performing the registration, the 
handle
-        * should delete all the shared states created by it.
-        */
-       private boolean registered;
-
-       RocksDBIncrementalKeyedStateHandle(
-                       String operatorIdentifier,
-                       KeyGroupRange keyGroupRange,
-                       long checkpointId,
-                       Map<String, StreamStateHandle> unregisteredSstFiles,
-                       Map<String, StreamStateHandle> registeredSstFiles,
-                       Map<String, StreamStateHandle> miscFiles,
-                       StreamStateHandle metaStateHandle) {
-
-               this.operatorIdentifier = 
Preconditions.checkNotNull(operatorIdentifier);
-               this.keyGroupRange = Preconditions.checkNotNull(keyGroupRange);
-               this.checkpointId = checkpointId;
-               this.unregisteredSstFiles = 
Preconditions.checkNotNull(unregisteredSstFiles);
-               this.registeredSstFiles = 
Preconditions.checkNotNull(registeredSstFiles);
-               this.miscFiles = Preconditions.checkNotNull(miscFiles);
-               this.metaStateHandle = 
Preconditions.checkNotNull(metaStateHandle);
-               this.registered = false;
-       }
-
-       @Override
-       public KeyGroupRange getKeyGroupRange() {
-               return keyGroupRange;
-       }
-
-       long getCheckpointId() {
-               return checkpointId;
-       }
-
-       Map<String, StreamStateHandle> getUnregisteredSstFiles() {
-               return unregisteredSstFiles;
-       }
-
-       Map<String, StreamStateHandle> getRegisteredSstFiles() {
-               return registeredSstFiles;
-       }
-
-       Map<String, StreamStateHandle> getMiscFiles() {
-               return miscFiles;
-       }
-
-       StreamStateHandle getMetaStateHandle() {
-               return metaStateHandle;
-       }
-
-       @Override
-       public KeyedStateHandle getIntersection(KeyGroupRange keyGroupRange) {
-               if (this.keyGroupRange.getIntersection(keyGroupRange) != 
KeyGroupRange.EMPTY_KEY_GROUP_RANGE) {
-                       return this;
-               } else {
-                       return null;
-               }
-       }
-
-       @Override
-       public void discardState() throws Exception {
-
-               Preconditions.checkState(!registered, "Attempt to dispose a 
registered composite state with registered shared state. Must unregister 
first.");
-
-               try {
-                       metaStateHandle.discardState();
-               } catch (Exception e) {
-                       LOG.warn("Could not properly discard meta data.", e);
-               }
-
-               try {
-                       
StateUtil.bestEffortDiscardAllStateObjects(miscFiles.values());
-               } catch (Exception e) {
-                       LOG.warn("Could not properly discard misc file 
states.", e);
-               }
-
-               try {
-                       
StateUtil.bestEffortDiscardAllStateObjects(unregisteredSstFiles.values());
-               } catch (Exception e) {
-                       LOG.warn("Could not properly discard new sst file 
states.", e);
-               }
-
-       }
-
-       @Override
-       public long getStateSize() {
-               long size = StateUtil.getStateSize(metaStateHandle);
-
-               for (StreamStateHandle newSstFileHandle : 
unregisteredSstFiles.values()) {
-                       size += newSstFileHandle.getStateSize();
-               }
-
-               for (StreamStateHandle oldSstFileHandle : 
registeredSstFiles.values()) {
-                       size += oldSstFileHandle.getStateSize();
-               }
-
-               for (StreamStateHandle miscFileHandle : miscFiles.values()) {
-                       size += miscFileHandle.getStateSize();
-               }
-
-               return size;
-       }
-
-       @Override
-       public void registerSharedStates(SharedStateRegistry stateRegistry) {
-
-               Preconditions.checkState(!registered, "The state handle has 
already registered its shared states.");
-
-               for (Map.Entry<String, StreamStateHandle> newSstFileEntry : 
unregisteredSstFiles.entrySet()) {
-                       SharedStateRegistryKey registryKey =
-                               
createSharedStateRegistryKeyFromFileName(newSstFileEntry.getKey());
-
-                       SharedStateRegistry.Result result =
-                               stateRegistry.registerNewReference(registryKey, 
newSstFileEntry.getValue());
-
-                       // We update our reference with the result from the 
registry, to prevent the following
-                       // problem:
-                       // A previous checkpoint n has already registered the 
state. This can happen if a
-                       // following checkpoint (n + x) wants to reference the 
same state before the backend got
-                       // notified that checkpoint n completed. In this case, 
the shared registry did
-                       // deduplication and returns the previous reference.
-                       newSstFileEntry.setValue(result.getReference());
-               }
-
-               for (Map.Entry<String, StreamStateHandle> oldSstFileName : 
registeredSstFiles.entrySet()) {
-                       SharedStateRegistryKey registryKey =
-                               
createSharedStateRegistryKeyFromFileName(oldSstFileName.getKey());
-
-                       SharedStateRegistry.Result result = 
stateRegistry.obtainReference(registryKey);
-
-                       // Again we update our state handle with the result 
from the registry, thus replacing
-                       // placeholder state handles with the originals.
-                       oldSstFileName.setValue(result.getReference());
-               }
-
-               // Migrate state from unregistered to registered, so that it 
will not count as private state
-               // for #discardState() from now.
-               registeredSstFiles.putAll(unregisteredSstFiles);
-               unregisteredSstFiles.clear();
-
-               registered = true;
-       }
-
-       @Override
-       public void unregisterSharedStates(SharedStateRegistry stateRegistry) {
-
-               Preconditions.checkState(registered, "The state handle has not 
registered its shared states yet.");
-
-               for (Map.Entry<String, StreamStateHandle> newSstFileEntry : 
unregisteredSstFiles.entrySet()) {
-                       SharedStateRegistryKey registryKey =
-                               
createSharedStateRegistryKeyFromFileName(newSstFileEntry.getKey());
-                       stateRegistry.releaseReference(registryKey);
-               }
-
-               for (Map.Entry<String, StreamStateHandle> oldSstFileEntry : 
registeredSstFiles.entrySet()) {
-                       SharedStateRegistryKey registryKey =
-                               
createSharedStateRegistryKeyFromFileName(oldSstFileEntry.getKey());
-                       stateRegistry.releaseReference(registryKey);
-               }
-
-               registered = false;
-       }
-
-       private SharedStateRegistryKey 
createSharedStateRegistryKeyFromFileName(String fileName) {
-               return new SharedStateRegistryKey(operatorIdentifier + "-" + 
keyGroupRange + "-" + fileName);
-       }
-}
-

http://git-wip-us.apache.org/repos/asf/flink/blob/098e46f2/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
----------------------------------------------------------------------
diff --git 
a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
 
b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
index b9468f7..4bd94fd 100644
--- 
a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
+++ 
b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
@@ -55,13 +55,16 @@ import org.apache.flink.runtime.query.TaskKvStateRegistry;
 import org.apache.flink.runtime.state.AbstractKeyedStateBackend;
 import org.apache.flink.runtime.state.CheckpointStreamFactory;
 import org.apache.flink.runtime.state.DoneFuture;
+import org.apache.flink.runtime.state.IncrementalKeyedStateHandle;
 import org.apache.flink.runtime.state.KeyGroupRange;
 import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
 import org.apache.flink.runtime.state.KeyGroupRangeOffsets;
 import org.apache.flink.runtime.state.KeyGroupsStateHandle;
 import org.apache.flink.runtime.state.KeyedBackendSerializationProxy;
 import org.apache.flink.runtime.state.KeyedStateHandle;
+import org.apache.flink.runtime.state.PlaceholderStreamStateHandle;
 import org.apache.flink.runtime.state.RegisteredKeyedBackendStateMetaInfo;
+import org.apache.flink.runtime.state.StateHandleID;
 import org.apache.flink.runtime.state.StateMigrationUtil;
 import org.apache.flink.runtime.state.StateObject;
 import org.apache.flink.runtime.state.StateUtil;
@@ -72,7 +75,6 @@ import 
org.apache.flink.runtime.state.internal.InternalListState;
 import org.apache.flink.runtime.state.internal.InternalMapState;
 import org.apache.flink.runtime.state.internal.InternalReducingState;
 import org.apache.flink.runtime.state.internal.InternalValueState;
-import org.apache.flink.runtime.state.memory.ByteStreamStateHandle;
 import org.apache.flink.runtime.util.SerializableObject;
 import org.apache.flink.util.FileUtils;
 import org.apache.flink.util.IOUtils;
@@ -172,7 +174,7 @@ public class RocksDBKeyedStateBackend<K> extends 
AbstractKeyedStateBackend<K> {
        private final boolean enableIncrementalCheckpointing;
 
        /** The sst files materialized in pending checkpoints */
-       private final SortedMap<Long, Map<String, StreamStateHandle>> 
materializedSstFiles = new TreeMap<>();
+       private final SortedMap<Long, Map<StateHandleID, StreamStateHandle>> 
materializedSstFiles = new TreeMap<>();
 
        /** The identifier of the last completed checkpoint */
        private long lastCompletedCheckpointId = -1;
@@ -723,7 +725,7 @@ public class RocksDBKeyedStateBackend<K> extends 
AbstractKeyedStateBackend<K> {
                private final long checkpointTimestamp;
 
                /** All sst files that were part of the last previously 
completed checkpoint */
-               private Map<String, StreamStateHandle> baseSstFiles;
+               private Map<StateHandleID, StreamStateHandle> baseSstFiles;
 
                /** The state meta data */
                private final 
List<RegisteredKeyedBackendStateMetaInfo.Snapshot<?, ?>> stateMetaInfoSnapshots 
= new ArrayList<>();
@@ -735,13 +737,13 @@ public class RocksDBKeyedStateBackend<K> extends 
AbstractKeyedStateBackend<K> {
                private final CloseableRegistry closeableRegistry = new 
CloseableRegistry();
 
                // new sst files since the last completed checkpoint
-               private final Map<String, StreamStateHandle> newSstFiles = new 
HashMap<>();
+               private final Map<StateHandleID, StreamStateHandle> newSstFiles 
= new HashMap<>();
 
                // old sst files which have been materialized in previous 
completed checkpoints
-               private final Map<String, StreamStateHandle> oldSstFiles = new 
HashMap<>();
+               private final Map<StateHandleID, StreamStateHandle> oldSstFiles 
= new HashMap<>();
 
                // handles to the misc files in the current snapshot
-               private final Map<String, StreamStateHandle> miscFiles = new 
HashMap<>();
+               private final Map<StateHandleID, StreamStateHandle> miscFiles = 
new HashMap<>();
 
                private StreamStateHandle metaStateHandle = null;
 
@@ -865,8 +867,9 @@ public class RocksDBKeyedStateBackend<K> extends 
AbstractKeyedStateBackend<K> {
                        FileStatus[] fileStatuses = 
backupFileSystem.listStatus(backupPath);
                        if (fileStatuses != null) {
                                for (FileStatus fileStatus : fileStatuses) {
-                                       Path filePath = fileStatus.getPath();
-                                       String fileName = filePath.getName();
+                                       final Path filePath = 
fileStatus.getPath();
+                                       final String fileName = 
filePath.getName();
+                                       final StateHandleID stateHandleID = new 
StateHandleID(fileName);
 
                                        if (fileName.endsWith(SST_FILE_SUFFIX)) 
{
                                                StreamStateHandle fileHandle =
@@ -874,20 +877,24 @@ public class RocksDBKeyedStateBackend<K> extends 
AbstractKeyedStateBackend<K> {
 
                                                if (fileHandle == null) {
                                                        fileHandle = 
materializeStateData(filePath);
-                                                       
newSstFiles.put(fileName, fileHandle);
+                                                       
newSstFiles.put(stateHandleID, fileHandle);
                                                } else {
                                                        // we introduce a 
placeholder state handle, that is replaced with the
                                                        // original from the 
shared state registry (created from a previous checkpoint)
-                                                       
oldSstFiles.put(fileName, new 
PlaceholderStreamStateHandle(fileHandle.getStateSize()));
+                                                       oldSstFiles.put(
+                                                               stateHandleID,
+                                                               new 
PlaceholderStreamStateHandle(fileHandle.getStateSize()));
                                                }
                                        } else {
                                                StreamStateHandle fileHandle = 
materializeStateData(filePath);
-                                               miscFiles.put(fileName, 
fileHandle);
+                                               miscFiles.put(stateHandleID, 
fileHandle);
                                        }
                                }
                        }
 
-                       Map<String, StreamStateHandle> sstFiles = new 
HashMap<>(newSstFiles.size() + oldSstFiles.size());
+                       Map<StateHandleID, StreamStateHandle> sstFiles =
+                               new HashMap<>(newSstFiles.size() + 
oldSstFiles.size());
+
                        sstFiles.putAll(newSstFiles);
                        sstFiles.putAll(oldSstFiles);
 
@@ -895,7 +902,7 @@ public class RocksDBKeyedStateBackend<K> extends 
AbstractKeyedStateBackend<K> {
                                
stateBackend.materializedSstFiles.put(checkpointId, sstFiles);
                        }
 
-                       return new RocksDBIncrementalKeyedStateHandle(
+                       return new IncrementalKeyedStateHandle(
                                stateBackend.operatorIdentifier,
                                stateBackend.keyGroupRange,
                                checkpointId,
@@ -940,39 +947,6 @@ public class RocksDBKeyedStateBackend<K> extends 
AbstractKeyedStateBackend<K> {
                                }
                        }
                }
-
-               /**
-                * A placeholder state handle for shared state that will 
replaced by an original that was
-                * created in a previous checkpoint. So we don't have to send 
the handle twice, e.g. in
-                * case of {@link ByteStreamStateHandle}.
-                */
-               private static final class PlaceholderStreamStateHandle 
implements StreamStateHandle {
-
-                       private static final long serialVersionUID = 1L;
-
-                       /** We remember the size of the original file for which 
this is a placeholder */
-                       private final long originalSize;
-
-                       public PlaceholderStreamStateHandle(long originalSize) {
-                               this.originalSize = originalSize;
-                       }
-
-                       @Override
-                       public FSDataInputStream openInputStream() {
-                               throw new UnsupportedOperationException(
-                                       "This is only a placeholder to be 
replaced by a real StreamStateHandle in the checkpoint coordinator.");
-                       }
-
-                       @Override
-                       public void discardState() throws Exception {
-                               // nothing to do.
-                       }
-
-                       @Override
-                       public long getStateSize() {
-                               return originalSize;
-                       }
-               }
        }
 
        @Override
@@ -989,7 +963,7 @@ public class RocksDBKeyedStateBackend<K> extends 
AbstractKeyedStateBackend<K> {
                        } else if 
(MigrationUtil.isOldSavepointKeyedState(restoreState)) {
                                LOG.info("Converting RocksDB state from old 
savepoint.");
                                restoreOldSavepointKeyedState(restoreState);
-                       } else if (restoreState.iterator().next() instanceof 
RocksDBIncrementalKeyedStateHandle) {
+                       } else if (restoreState.iterator().next() instanceof 
IncrementalKeyedStateHandle) {
                                RocksDBIncrementalRestoreOperation 
restoreOperation = new RocksDBIncrementalRestoreOperation(this);
                                restoreOperation.restore(restoreState);
                        } else {
@@ -1302,7 +1276,7 @@ public class RocksDBKeyedStateBackend<K> extends 
AbstractKeyedStateBackend<K> {
                }
 
                private void restoreInstance(
-                               RocksDBIncrementalKeyedStateHandle 
restoreStateHandle,
+                               IncrementalKeyedStateHandle restoreStateHandle,
                                boolean hasExtraKeys) throws Exception {
 
                        // read state data
@@ -1311,29 +1285,16 @@ public class RocksDBKeyedStateBackend<K> extends 
AbstractKeyedStateBackend<K> {
                                UUID.randomUUID().toString());
 
                        try {
-                               Map<String, StreamStateHandle> newSstFiles = 
restoreStateHandle.getUnregisteredSstFiles();
-                               for (Map.Entry<String, StreamStateHandle> 
newSstFileEntry : newSstFiles.entrySet()) {
-                                       String fileName = 
newSstFileEntry.getKey();
-                                       StreamStateHandle remoteFileHandle = 
newSstFileEntry.getValue();
+                               final Map<StateHandleID, StreamStateHandle> 
newSstFiles =
+                                       
restoreStateHandle.getCreatedSharedState();
+                               final Map<StateHandleID, StreamStateHandle> 
oldSstFiles =
+                                       
restoreStateHandle.getReferencedSharedState();
+                               final Map<StateHandleID, StreamStateHandle> 
miscFiles =
+                                       restoreStateHandle.getPrivateState();
 
-                                       readStateData(new 
Path(restoreInstancePath, fileName), remoteFileHandle);
-                               }
-
-                               Map<String, StreamStateHandle> oldSstFiles = 
restoreStateHandle.getRegisteredSstFiles();
-                               for (Map.Entry<String, StreamStateHandle> 
oldSstFileEntry : oldSstFiles.entrySet()) {
-                                       String fileName = 
oldSstFileEntry.getKey();
-                                       StreamStateHandle remoteFileHandle = 
oldSstFileEntry.getValue();
-
-                                       readStateData(new 
Path(restoreInstancePath, fileName), remoteFileHandle);
-                               }
-
-                               Map<String, StreamStateHandle> miscFiles = 
restoreStateHandle.getMiscFiles();
-                               for (Map.Entry<String, StreamStateHandle> 
miscFileEntry : miscFiles.entrySet()) {
-                                       String fileName = 
miscFileEntry.getKey();
-                                       StreamStateHandle remoteFileHandle = 
miscFileEntry.getValue();
-
-                                       readStateData(new 
Path(restoreInstancePath, fileName), remoteFileHandle);
-                               }
+                               readAllStateData(newSstFiles, 
restoreInstancePath);
+                               readAllStateData(oldSstFiles, 
restoreInstancePath);
+                               readAllStateData(miscFiles, 
restoreInstancePath);
 
                                // read meta data
                                
List<RegisteredKeyedBackendStateMetaInfo.Snapshot<?, ?>> stateMetaInfoSnapshots 
=
@@ -1425,26 +1386,9 @@ public class RocksDBKeyedStateBackend<K> extends 
AbstractKeyedStateBackend<K> {
                                                throw new IOException("Could 
not create RocksDB data directory.");
                                        }
 
-                                       for (String newSstFileName : 
newSstFiles.keySet()) {
-                                               File restoreFile = new 
File(restoreInstancePath.getPath(), newSstFileName);
-                                               File targetFile = new 
File(stateBackend.instanceRocksDBPath, newSstFileName);
-
-                                               
Files.createLink(targetFile.toPath(), restoreFile.toPath());
-                                       }
-
-                                       for (String oldSstFileName : 
oldSstFiles.keySet()) {
-                                               File restoreFile = new 
File(restoreInstancePath.getPath(), oldSstFileName);
-                                               File targetFile = new 
File(stateBackend.instanceRocksDBPath, oldSstFileName);
-
-                                               
Files.createLink(targetFile.toPath(), restoreFile.toPath());
-                                       }
-
-                                       for (String miscFileName : 
miscFiles.keySet()) {
-                                               File restoreFile = new 
File(restoreInstancePath.getPath(), miscFileName);
-                                               File targetFile = new 
File(stateBackend.instanceRocksDBPath, miscFileName);
-
-                                               
Files.createLink(targetFile.toPath(), restoreFile.toPath());
-                                       }
+                                       
createFileHardLinksInRestorePath(newSstFiles, restoreInstancePath);
+                                       
createFileHardLinksInRestorePath(oldSstFiles, restoreInstancePath);
+                                       
createFileHardLinksInRestorePath(miscFiles, restoreInstancePath);
 
                                        List<ColumnFamilyHandle> 
columnFamilyHandles = new ArrayList<>();
                                        stateBackend.db = stateBackend.openDB(
@@ -1470,7 +1414,7 @@ public class RocksDBKeyedStateBackend<K> extends 
AbstractKeyedStateBackend<K> {
 
 
                                        // use the restore sst files as the 
base for succeeding checkpoints
-                                       Map<String, StreamStateHandle> sstFiles 
= new HashMap<>();
+                                       Map<StateHandleID, StreamStateHandle> 
sstFiles = new HashMap<>();
                                        sstFiles.putAll(newSstFiles);
                                        sstFiles.putAll(oldSstFiles);
                                        
stateBackend.materializedSstFiles.put(restoreStateHandle.getCheckpointId(), 
sstFiles);
@@ -1485,6 +1429,29 @@ public class RocksDBKeyedStateBackend<K> extends 
AbstractKeyedStateBackend<K> {
                        }
                }
 
+               private void readAllStateData(
+                       Map<StateHandleID, StreamStateHandle> stateHandleMap,
+                       Path restoreInstancePath) throws IOException {
+
+                       for (Map.Entry<StateHandleID, StreamStateHandle> entry 
: stateHandleMap.entrySet()) {
+                               StateHandleID stateHandleID = entry.getKey();
+                               StreamStateHandle remoteFileHandle = 
entry.getValue();
+                               readStateData(new Path(restoreInstancePath, 
stateHandleID.toString()), remoteFileHandle);
+                       }
+               }
+
+               private void createFileHardLinksInRestorePath(
+                       Map<StateHandleID, StreamStateHandle> stateHandleMap,
+                       Path restoreInstancePath) throws IOException {
+
+                       for (StateHandleID stateHandleID : 
stateHandleMap.keySet()) {
+                               String newSstFileName = 
stateHandleID.toString();
+                               File restoreFile = new 
File(restoreInstancePath.getPath(), newSstFileName);
+                               File targetFile = new 
File(stateBackend.instanceRocksDBPath, newSstFileName);
+                               Files.createLink(targetFile.toPath(), 
restoreFile.toPath());
+                       }
+               }
+
                void restore(Collection<KeyedStateHandle> restoreStateHandles) 
throws Exception {
 
                        boolean hasExtraKeys = (restoreStateHandles.size() > 1 
||
@@ -1496,13 +1463,13 @@ public class RocksDBKeyedStateBackend<K> extends 
AbstractKeyedStateBackend<K> {
 
                        for (KeyedStateHandle rawStateHandle : 
restoreStateHandles) {
 
-                               if (! (rawStateHandle instanceof 
RocksDBIncrementalKeyedStateHandle)) {
+                               if (! (rawStateHandle instanceof 
IncrementalKeyedStateHandle)) {
                                        throw new 
IllegalStateException("Unexpected state handle type, " +
-                                               "expected " + 
RocksDBIncrementalKeyedStateHandle.class +
+                                               "expected " + 
IncrementalKeyedStateHandle.class +
                                                ", but found " + 
rawStateHandle.getClass());
                                }
 
-                               RocksDBIncrementalKeyedStateHandle 
keyedStateHandle = (RocksDBIncrementalKeyedStateHandle) rawStateHandle;
+                               IncrementalKeyedStateHandle keyedStateHandle = 
(IncrementalKeyedStateHandle) rawStateHandle;
 
                                restoreInstance(keyedStateHandle, hasExtraKeys);
                        }

http://git-wip-us.apache.org/repos/asf/flink/blob/098e46f2/flink-core/src/main/java/org/apache/flink/util/StringBasedID.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/util/StringBasedID.java 
b/flink-core/src/main/java/org/apache/flink/util/StringBasedID.java
new file mode 100644
index 0000000..7245e61
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/util/StringBasedID.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.util;
+
+import java.io.Serializable;
+
+/**
+ * Base class for typed IDs that are internally represented by a string. This 
class is not intended
+ * for direct use, but should be subclassed for type-safety.
+ */
+public class StringBasedID implements Serializable {
+
+       private static final long serialVersionUID = 1L;
+
+       /**
+        * Uses a String as internal representation
+        */
+       private final String keyString;
+
+       /**
+        * Protected constructor to enfore that subclassing.
+        */
+       protected StringBasedID(String keyString) {
+               this.keyString = Preconditions.checkNotNull(keyString);
+       }
+
+       public String getKeyString() {
+               return keyString;
+       }
+
+       @Override
+       public boolean equals(Object o) {
+               if (this == o) {
+                       return true;
+               }
+               if (o == null || getClass() != o.getClass()) {
+                       return false;
+               }
+
+               StringBasedID that = (StringBasedID) o;
+               return keyString.equals(that.keyString);
+       }
+
+       @Override
+       public int hashCode() {
+               return keyString.hashCode();
+       }
+
+       @Override
+       public String toString() {
+               return keyString;
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/098e46f2/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV2Serializer.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV2Serializer.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV2Serializer.java
index 1b5f2c6..b71418b 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV2Serializer.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV2Serializer.java
@@ -20,14 +20,17 @@ package org.apache.flink.runtime.checkpoint.savepoint;
 
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.runtime.checkpoint.MasterState;
-import org.apache.flink.runtime.jobgraph.OperatorID;
 import org.apache.flink.runtime.checkpoint.OperatorState;
 import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
+import org.apache.flink.runtime.jobgraph.OperatorID;
+import org.apache.flink.runtime.state.IncrementalKeyedStateHandle;
 import org.apache.flink.runtime.state.KeyGroupRange;
 import org.apache.flink.runtime.state.KeyGroupRangeOffsets;
 import org.apache.flink.runtime.state.KeyGroupsStateHandle;
 import org.apache.flink.runtime.state.KeyedStateHandle;
 import org.apache.flink.runtime.state.OperatorStateHandle;
+import org.apache.flink.runtime.state.PlaceholderStreamStateHandle;
+import org.apache.flink.runtime.state.StateHandleID;
 import org.apache.flink.runtime.state.StreamStateHandle;
 import org.apache.flink.runtime.state.filesystem.FileStateHandle;
 import org.apache.flink.runtime.state.memory.ByteStreamStateHandle;
@@ -71,6 +74,8 @@ class SavepointV2Serializer implements 
SavepointSerializer<SavepointV2> {
        private static final byte FILE_STREAM_STATE_HANDLE = 2;
        private static final byte KEY_GROUPS_HANDLE = 3;
        private static final byte PARTITIONABLE_OPERATOR_STATE_HANDLE = 4;
+       private static final byte INCREMENTAL_KEY_GROUPS_HANDLE = 5;
+       private static final byte PLACEHOLDER_STREAM_STATE_HANDLE = 6;
 
        /** The singleton instance of the serializer */
        public static final SavepointV2Serializer INSTANCE = new 
SavepointV2Serializer();
@@ -287,7 +292,6 @@ class SavepointV2Serializer implements 
SavepointSerializer<SavepointV2> {
 
                KeyedStateHandle keyedStateStream = 
deserializeKeyedStateHandle(dis);
 
-
                return new OperatorSubtaskState(
                                nonPartitionableState,
                                operatorStateBackend,
@@ -311,19 +315,63 @@ class SavepointV2Serializer implements 
SavepointSerializer<SavepointV2> {
                                
dos.writeLong(keyGroupsStateHandle.getOffsetForKeyGroup(keyGroup));
                        }
                        
serializeStreamStateHandle(keyGroupsStateHandle.getDelegateStateHandle(), dos);
+               } else if (stateHandle instanceof IncrementalKeyedStateHandle) {
+                       IncrementalKeyedStateHandle incrementalKeyedStateHandle 
=
+                               (IncrementalKeyedStateHandle) stateHandle;
+
+                       dos.writeByte(INCREMENTAL_KEY_GROUPS_HANDLE);
+
+                       
dos.writeLong(incrementalKeyedStateHandle.getCheckpointId());
+                       
dos.writeUTF(incrementalKeyedStateHandle.getOperatorIdentifier());
+                       
dos.writeInt(incrementalKeyedStateHandle.getKeyGroupRange().getStartKeyGroup());
+                       
dos.writeInt(incrementalKeyedStateHandle.getKeyGroupRange().getNumberOfKeyGroups());
+
+                       
serializeStreamStateHandle(incrementalKeyedStateHandle.getMetaStateHandle(), 
dos);
+
+                       
serializeStreamStateHandleMap(incrementalKeyedStateHandle.getCreatedSharedState(),
 dos);
+                       
serializeStreamStateHandleMap(incrementalKeyedStateHandle.getReferencedSharedState(),
 dos);
+                       
serializeStreamStateHandleMap(incrementalKeyedStateHandle.getPrivateState(), 
dos);
                } else {
                        throw new IllegalStateException("Unknown 
KeyedStateHandle type: " + stateHandle.getClass());
                }
        }
 
+       private static void serializeStreamStateHandleMap(
+               Map<StateHandleID, StreamStateHandle> map,
+               DataOutputStream dos) throws IOException {
+               dos.writeInt(map.size());
+               for (Map.Entry<StateHandleID, StreamStateHandle> entry : 
map.entrySet()) {
+                       dos.writeUTF(entry.getKey().toString());
+                       serializeStreamStateHandle(entry.getValue(), dos);
+               }
+       }
+
+       private static Map<StateHandleID, StreamStateHandle> 
deserializeStreamStateHandleMap(
+               DataInputStream dis) throws IOException {
+
+               final int size = dis.readInt();
+               Map<StateHandleID, StreamStateHandle> result = new 
HashMap<>(size);
+
+               for (int i = 0; i < size; ++i) {
+                       StateHandleID stateHandleID = new 
StateHandleID(dis.readUTF());
+                       StreamStateHandle stateHandle = 
deserializeStreamStateHandle(dis);
+                       result.put(stateHandleID, stateHandle);
+               }
+
+               return result;
+       }
+
        private static KeyedStateHandle 
deserializeKeyedStateHandle(DataInputStream dis) throws IOException {
                final int type = dis.readByte();
                if (NULL_HANDLE == type) {
+
                        return null;
                } else if (KEY_GROUPS_HANDLE == type) {
+
                        int startKeyGroup = dis.readInt();
                        int numKeyGroups = dis.readInt();
-                       KeyGroupRange keyGroupRange = 
KeyGroupRange.of(startKeyGroup, startKeyGroup + numKeyGroups - 1);
+                       KeyGroupRange keyGroupRange =
+                               KeyGroupRange.of(startKeyGroup, startKeyGroup + 
numKeyGroups - 1);
                        long[] offsets = new long[numKeyGroups];
                        for (int i = 0; i < numKeyGroups; ++i) {
                                offsets[i] = dis.readLong();
@@ -332,6 +380,28 @@ class SavepointV2Serializer implements 
SavepointSerializer<SavepointV2> {
                                keyGroupRange, offsets);
                        StreamStateHandle stateHandle = 
deserializeStreamStateHandle(dis);
                        return new KeyGroupsStateHandle(keyGroupRangeOffsets, 
stateHandle);
+               } else if (INCREMENTAL_KEY_GROUPS_HANDLE == type) {
+
+                       long checkpointId = dis.readLong();
+                       String operatorId = dis.readUTF();
+                       int startKeyGroup = dis.readInt();
+                       int numKeyGroups = dis.readInt();
+                       KeyGroupRange keyGroupRange =
+                               KeyGroupRange.of(startKeyGroup, startKeyGroup + 
numKeyGroups - 1);
+
+                       StreamStateHandle metaDataStateHandle = 
deserializeStreamStateHandle(dis);
+                       Map<StateHandleID, StreamStateHandle> createdStates = 
deserializeStreamStateHandleMap(dis);
+                       Map<StateHandleID, StreamStateHandle> referencedStates 
= deserializeStreamStateHandleMap(dis);
+                       Map<StateHandleID, StreamStateHandle> privateStates = 
deserializeStreamStateHandleMap(dis);
+
+                       return new IncrementalKeyedStateHandle(
+                               operatorId,
+                               keyGroupRange,
+                               checkpointId,
+                               createdStates,
+                               referencedStates,
+                               privateStates,
+                               metaDataStateHandle);
                } else {
                        throw new IllegalStateException("Reading invalid 
KeyedStateHandle, type: " + type);
                }
@@ -415,7 +485,10 @@ class SavepointV2Serializer implements 
SavepointSerializer<SavepointV2> {
                        byte[] internalData = byteStreamStateHandle.getData();
                        dos.writeInt(internalData.length);
                        dos.write(byteStreamStateHandle.getData());
-
+               } else if (stateHandle instanceof PlaceholderStreamStateHandle) 
{
+                       PlaceholderStreamStateHandle placeholder = 
(PlaceholderStreamStateHandle) stateHandle;
+                       dos.writeByte(PLACEHOLDER_STREAM_STATE_HANDLE);
+                       dos.writeLong(placeholder.getStateSize());
                } else {
                        throw new IOException("Unknown implementation of 
StreamStateHandle: " + stateHandle.getClass());
                }
@@ -437,6 +510,8 @@ class SavepointV2Serializer implements 
SavepointSerializer<SavepointV2> {
                        byte[] data = new byte[numBytes];
                        dis.readFully(data);
                        return new ByteStreamStateHandle(handleName, data);
+               } else if (PLACEHOLDER_STREAM_STATE_HANDLE == type) {
+                       return new PlaceholderStreamStateHandle(dis.readLong());
                } else {
                        throw new IOException("Unknown implementation of 
StreamStateHandle, code: " + type);
                }

http://git-wip-us.apache.org/repos/asf/flink/blob/098e46f2/flink-runtime/src/main/java/org/apache/flink/runtime/state/IncrementalKeyedStateHandle.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/IncrementalKeyedStateHandle.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/IncrementalKeyedStateHandle.java
new file mode 100644
index 0000000..706e219
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/IncrementalKeyedStateHandle.java
@@ -0,0 +1,324 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.util.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+
+/**
+ * The handle to states of an incremental snapshot.
+ * <p>
+ * The states contained in an incremental snapshot include
+ * <ul>
+ * <li> Created shared state which includes (the supposed to be) shared files 
produced since the last
+ * completed checkpoint. These files can be referenced by succeeding 
checkpoints if the
+ * checkpoint succeeds to complete. </li>
+ * <li> Referenced shared state which includes the shared files materialized 
in previous
+ * checkpoints. </li>
+ * <li> Private state which includes all other files, typically mutable, that 
cannot be shared by
+ * other checkpoints. </li>
+ * <li> Backend meta state which includes the information of existing states. 
</li>
+ * </ul>
+ *
+ * IMPORTANT: This class currently overrides equals and hash code only for 
testing purposes. They
+ * should not be called from production code. This means this class is also 
not suited to serve as
+ * a key, e.g. in hash maps.
+ */
+public class IncrementalKeyedStateHandle implements KeyedStateHandle {
+
+       private static final Logger LOG = 
LoggerFactory.getLogger(IncrementalKeyedStateHandle.class);
+
+       private static final long serialVersionUID = -8328808513197388231L;
+
+       /**
+        * The operator instance identifier for this handle
+        */
+       private final String operatorIdentifier;
+
+       /**
+        * The key-group range covered by this state handle
+        */
+       private final KeyGroupRange keyGroupRange;
+
+       /**
+        * The checkpoint Id
+        */
+       private final long checkpointId;
+
+       /**
+        * State that the incremental checkpoint created new
+        */
+       private final Map<StateHandleID, StreamStateHandle> createdSharedState;
+
+       /**
+        * State that the incremental checkpoint references from previous 
checkpoints
+        */
+       private final Map<StateHandleID, StreamStateHandle> 
referencedSharedState;
+
+       /**
+        * Private state in the incremental checkpoint
+        */
+       private final Map<StateHandleID, StreamStateHandle> privateState;
+
+       /**
+        * Primary meta data state of the incremental checkpoint
+        */
+       private final StreamStateHandle metaStateHandle;
+
+       /**
+        * True if the state handle has already registered shared states.
+        * <p>
+        * Once the shared states are registered, it's the {@link 
SharedStateRegistry}'s
+        * responsibility to maintain the shared states. But in the cases where 
the
+        * state handle is discarded before performing the registration, the 
handle
+        * should delete all the shared states created by it.
+        */
+       private boolean registered;
+
+       public IncrementalKeyedStateHandle(
+               String operatorIdentifier,
+               KeyGroupRange keyGroupRange,
+               long checkpointId,
+               Map<StateHandleID, StreamStateHandle> createdSharedState,
+               Map<StateHandleID, StreamStateHandle> referencedSharedState,
+               Map<StateHandleID, StreamStateHandle> privateState,
+               StreamStateHandle metaStateHandle) {
+
+               this.operatorIdentifier = 
Preconditions.checkNotNull(operatorIdentifier);
+               this.keyGroupRange = Preconditions.checkNotNull(keyGroupRange);
+               this.checkpointId = checkpointId;
+               this.createdSharedState = 
Preconditions.checkNotNull(createdSharedState);
+               this.referencedSharedState = 
Preconditions.checkNotNull(referencedSharedState);
+               this.privateState = Preconditions.checkNotNull(privateState);
+               this.metaStateHandle = 
Preconditions.checkNotNull(metaStateHandle);
+               this.registered = false;
+       }
+
+       @Override
+       public KeyGroupRange getKeyGroupRange() {
+               return keyGroupRange;
+       }
+
+       public long getCheckpointId() {
+               return checkpointId;
+       }
+
+       public Map<StateHandleID, StreamStateHandle> getCreatedSharedState() {
+               return createdSharedState;
+       }
+
+       public Map<StateHandleID, StreamStateHandle> getReferencedSharedState() 
{
+               return referencedSharedState;
+       }
+
+       public Map<StateHandleID, StreamStateHandle> getPrivateState() {
+               return privateState;
+       }
+
+       public StreamStateHandle getMetaStateHandle() {
+               return metaStateHandle;
+       }
+
+       public String getOperatorIdentifier() {
+               return operatorIdentifier;
+       }
+
+       @Override
+       public KeyedStateHandle getIntersection(KeyGroupRange keyGroupRange) {
+               if (this.keyGroupRange.getIntersection(keyGroupRange) != 
KeyGroupRange.EMPTY_KEY_GROUP_RANGE) {
+                       return this;
+               } else {
+                       return null;
+               }
+       }
+
+       @Override
+       public void discardState() throws Exception {
+
+               Preconditions.checkState(!registered, "Attempt to dispose a 
registered composite state with registered shared state. Must unregister 
first.");
+
+               try {
+                       metaStateHandle.discardState();
+               } catch (Exception e) {
+                       LOG.warn("Could not properly discard meta data.", e);
+               }
+
+               try {
+                       
StateUtil.bestEffortDiscardAllStateObjects(privateState.values());
+               } catch (Exception e) {
+                       LOG.warn("Could not properly discard misc file 
states.", e);
+               }
+
+               try {
+                       
StateUtil.bestEffortDiscardAllStateObjects(createdSharedState.values());
+               } catch (Exception e) {
+                       LOG.warn("Could not properly discard new sst file 
states.", e);
+               }
+
+       }
+
+       @Override
+       public long getStateSize() {
+               long size = getPrivateStateSize();
+
+               for (StreamStateHandle oldSstFileHandle : 
referencedSharedState.values()) {
+                       size += oldSstFileHandle.getStateSize();
+               }
+
+               return size;
+       }
+
+       /**
+        * Returns the size of the state that is privately owned by this handle.
+        */
+       public long getPrivateStateSize() {
+               long size = StateUtil.getStateSize(metaStateHandle);
+
+               for (StreamStateHandle newSstFileHandle : 
createdSharedState.values()) {
+                       size += newSstFileHandle.getStateSize();
+               }
+
+               for (StreamStateHandle miscFileHandle : privateState.values()) {
+                       size += miscFileHandle.getStateSize();
+               }
+
+               return size;
+       }
+
+       @Override
+       public void registerSharedStates(SharedStateRegistry stateRegistry) {
+
+               Preconditions.checkState(!registered, "The state handle has 
already registered its shared states.");
+
+               for (Map.Entry<StateHandleID, StreamStateHandle> 
newSstFileEntry : createdSharedState.entrySet()) {
+                       SharedStateRegistryKey registryKey =
+                               
createSharedStateRegistryKeyFromFileName(newSstFileEntry.getKey());
+
+                       SharedStateRegistry.Result result =
+                               stateRegistry.registerNewReference(registryKey, 
newSstFileEntry.getValue());
+
+                       // We update our reference with the result from the 
registry, to prevent the following
+                       // problem:
+                       // A previous checkpoint n has already registered the 
state. This can happen if a
+                       // following checkpoint (n + x) wants to reference the 
same state before the backend got
+                       // notified that checkpoint n completed. In this case, 
the shared registry did
+                       // deduplication and returns the previous reference.
+                       newSstFileEntry.setValue(result.getReference());
+               }
+
+               for (Map.Entry<StateHandleID, StreamStateHandle> oldSstFileName 
: referencedSharedState.entrySet()) {
+                       SharedStateRegistryKey registryKey =
+                               
createSharedStateRegistryKeyFromFileName(oldSstFileName.getKey());
+
+                       SharedStateRegistry.Result result = 
stateRegistry.obtainReference(registryKey);
+
+                       // Again we update our state handle with the result 
from the registry, thus replacing
+                       // placeholder state handles with the originals.
+                       oldSstFileName.setValue(result.getReference());
+               }
+
+               // Migrate state from unregistered to registered, so that it 
will not count as private state
+               // for #discardState() from now.
+               referencedSharedState.putAll(createdSharedState);
+               createdSharedState.clear();
+
+               registered = true;
+       }
+
+       @Override
+       public void unregisterSharedStates(SharedStateRegistry stateRegistry) {
+
+               Preconditions.checkState(registered, "The state handle has not 
registered its shared states yet.");
+
+               for (Map.Entry<StateHandleID, StreamStateHandle> 
newSstFileEntry : createdSharedState.entrySet()) {
+                       SharedStateRegistryKey registryKey =
+                               
createSharedStateRegistryKeyFromFileName(newSstFileEntry.getKey());
+                       stateRegistry.releaseReference(registryKey);
+               }
+
+               for (Map.Entry<StateHandleID, StreamStateHandle> 
oldSstFileEntry : referencedSharedState.entrySet()) {
+                       SharedStateRegistryKey registryKey =
+                               
createSharedStateRegistryKeyFromFileName(oldSstFileEntry.getKey());
+                       stateRegistry.releaseReference(registryKey);
+               }
+
+               registered = false;
+       }
+
+       private SharedStateRegistryKey 
createSharedStateRegistryKeyFromFileName(StateHandleID shId) {
+               return new SharedStateRegistryKey(operatorIdentifier + '-' + 
keyGroupRange, shId);
+       }
+
+       /**
+        * This method is should only be called in tests! This should never 
serve as key in a hash map.
+        */
+       @VisibleForTesting
+       @Override
+       public boolean equals(Object o) {
+               if (this == o) {
+                       return true;
+               }
+               if (o == null || getClass() != o.getClass()) {
+                       return false;
+               }
+
+               IncrementalKeyedStateHandle that = 
(IncrementalKeyedStateHandle) o;
+
+               if (getCheckpointId() != that.getCheckpointId()) {
+                       return false;
+               }
+               if 
(!getOperatorIdentifier().equals(that.getOperatorIdentifier())) {
+                       return false;
+               }
+               if (!getKeyGroupRange().equals(that.getKeyGroupRange())) {
+                       return false;
+               }
+               if 
(!getCreatedSharedState().equals(that.getCreatedSharedState())) {
+                       return false;
+               }
+               if 
(!getReferencedSharedState().equals(that.getReferencedSharedState())) {
+                       return false;
+               }
+               if (!getPrivateState().equals(that.getPrivateState())) {
+                       return false;
+               }
+               return getMetaStateHandle().equals(that.getMetaStateHandle());
+       }
+
+       /**
+        * This method should only be called in tests! This should never serve 
as key in a hash map.
+        */
+       @VisibleForTesting
+       @Override
+       public int hashCode() {
+               int result = getOperatorIdentifier().hashCode();
+               result = 31 * result + getKeyGroupRange().hashCode();
+               result = 31 * result + (int) (getCheckpointId() ^ 
(getCheckpointId() >>> 32));
+               result = 31 * result + getCreatedSharedState().hashCode();
+               result = 31 * result + getReferencedSharedState().hashCode();
+               result = 31 * result + getPrivateState().hashCode();
+               result = 31 * result + getMetaStateHandle().hashCode();
+               return result;
+       }
+}
+

http://git-wip-us.apache.org/repos/asf/flink/blob/098e46f2/flink-runtime/src/main/java/org/apache/flink/runtime/state/PlaceholderStreamStateHandle.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/PlaceholderStreamStateHandle.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/PlaceholderStreamStateHandle.java
new file mode 100644
index 0000000..2136061
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/PlaceholderStreamStateHandle.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.core.fs.FSDataInputStream;
+import org.apache.flink.runtime.state.memory.ByteStreamStateHandle;
+
+/**
+ * A placeholder state handle for shared state that will replaced by an 
original that was
+ * created in a previous checkpoint. So we don't have to send the handle 
twice, e.g. in
+ * case of {@link ByteStreamStateHandle}. To be used in the referenced states 
of
+ * {@link IncrementalKeyedStateHandle}.
+ * <p>
+ * IMPORTANT: This class currently overrides equals and hash code only for 
testing purposes. They
+ * should not be called from production code. This means this class is also 
not suited to serve as
+ * a key, e.g. in hash maps.
+ */
+public class PlaceholderStreamStateHandle implements StreamStateHandle {
+
+       private static final long serialVersionUID = 1L;
+
+       /** We remember the size of the original file for which this is a 
placeholder */
+       private final long originalSize;
+
+       public PlaceholderStreamStateHandle(long originalSize) {
+               this.originalSize = originalSize;
+       }
+
+       @Override
+       public FSDataInputStream openInputStream() {
+               throw new UnsupportedOperationException(
+                       "This is only a placeholder to be replaced by a real 
StreamStateHandle in the checkpoint coordinator.");
+       }
+
+       @Override
+       public void discardState() throws Exception {
+               // nothing to do.
+       }
+
+       @Override
+       public long getStateSize() {
+               return originalSize;
+       }
+
+       /**
+        * This method is should only be called in tests! This should never 
serve as key in a hash map.
+        */
+       @VisibleForTesting
+       @Override
+       public boolean equals(Object o) {
+               if (this == o) {
+                       return true;
+               }
+               if (o == null || getClass() != o.getClass()) {
+                       return false;
+               }
+
+               PlaceholderStreamStateHandle that = 
(PlaceholderStreamStateHandle) o;
+
+               return originalSize == that.originalSize;
+       }
+
+       /**
+        * This method is should only be called in tests! This should never 
serve as key in a hash map.
+        */
+       @VisibleForTesting
+       @Override
+       public int hashCode() {
+               return (int) (originalSize ^ (originalSize >>> 32));
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/098e46f2/flink-runtime/src/main/java/org/apache/flink/runtime/state/SharedStateHandle.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/SharedStateHandle.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/SharedStateHandle.java
deleted file mode 100644
index c8c4046..0000000
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/SharedStateHandle.java
+++ /dev/null
@@ -1,39 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.state;
-
-/**
- * A handle to those states that are referenced by different checkpoints.
- *
- * <p> Each shared state handle is identified by a unique key. Two shared 
states
- * are considered equal if their keys are identical.
- *
- * <p> All shared states are registered at the {@link SharedStateRegistry} once
- * they are received by the {@link 
org.apache.flink.runtime.checkpoint.CheckpointCoordinator}
- * and will be unregistered when the checkpoints are discarded. A shared state
- * will be discarded once it is not referenced by any checkpoint. A shared 
state
- * should not be referenced any more if it has been discarded.
- */
-public interface SharedStateHandle extends StateObject {
-
-       /**
-        * Return the identifier of the shared state.
-        */
-       String getRegistrationKey();
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/098e46f2/flink-runtime/src/main/java/org/apache/flink/runtime/state/SharedStateRegistry.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/SharedStateRegistry.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/SharedStateRegistry.java
index dbf4642..9cfdec7 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/SharedStateRegistry.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/SharedStateRegistry.java
@@ -31,7 +31,7 @@ import java.util.concurrent.Executor;
 /**
  * A {@code SharedStateRegistry} will be deployed in the 
  * {@link org.apache.flink.runtime.checkpoint.CompletedCheckpointStore} to
- * maintain the reference count of {@link SharedStateHandle}s which are shared
+ * maintain the reference count of {@link StreamStateHandle}s which are shared
  * among different incremental checkpoints.
  */
 public class SharedStateRegistry {

http://git-wip-us.apache.org/repos/asf/flink/blob/098e46f2/flink-runtime/src/main/java/org/apache/flink/runtime/state/SharedStateRegistryKey.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/SharedStateRegistryKey.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/SharedStateRegistryKey.java
index 9e59359..58262ca 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/SharedStateRegistryKey.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/SharedStateRegistryKey.java
@@ -18,9 +18,8 @@
 
 package org.apache.flink.runtime.state;
 
-import org.apache.flink.util.Preconditions;
-
-import java.io.Serializable;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.util.StringBasedID;
 
 /**
  * This class represents a key that uniquely identifies (on a logical level) 
state handles for
@@ -28,41 +27,16 @@ import java.io.Serializable;
  * be the same should have the same {@link SharedStateRegistryKey}. The 
meaning of logical
  * equivalence is up to the application.
  */
-public class SharedStateRegistryKey implements Serializable {
+public class SharedStateRegistryKey extends StringBasedID {
 
        private static final long serialVersionUID = 1L;
 
-       /** Uses a String as internal representation */
-       private final String keyString;
-
-       public SharedStateRegistryKey(String keyString) {
-               this.keyString = Preconditions.checkNotNull(keyString);
-       }
-
-       public String getKeyString() {
-               return keyString;
+       public SharedStateRegistryKey(String prefix, StateHandleID 
stateHandleID) {
+               super(prefix + '-' + stateHandleID);
        }
 
-       @Override
-       public boolean equals(Object o) {
-               if (this == o) {
-                       return true;
-               }
-               if (o == null || getClass() != o.getClass()) {
-                       return false;
-               }
-
-               SharedStateRegistryKey that = (SharedStateRegistryKey) o;
-               return keyString.equals(that.keyString);
-       }
-
-       @Override
-       public int hashCode() {
-               return keyString.hashCode();
-       }
-
-       @Override
-       public String toString() {
-               return keyString;
+       @VisibleForTesting
+       public SharedStateRegistryKey(String keyString) {
+               super(keyString);
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/098e46f2/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateHandle.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateHandle.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateHandle.java
deleted file mode 100644
index b736252..0000000
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateHandle.java
+++ /dev/null
@@ -1,37 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.state;
-
-/**
- * StateHandle is a general handle interface meant to abstract operator state 
fetching. 
- * A StateHandle implementation can for example include the state itself in 
cases where the state 
- * is lightweight or fetching it lazily from some external storage when the 
state is too large.
- */
-public interface StateHandle<T> extends StateObject {
-
-       /**
-        * This retrieves and return the state represented by the handle.
-        *
-        * @param userCodeClassLoader Class loader for deserializing user code 
specific classes
-        *
-        * @return The state represented by the handle.
-        * @throws java.lang.Exception Thrown, if the state cannot be fetched.
-        */
-       T getState(ClassLoader userCodeClassLoader) throws Exception;
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/098e46f2/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateHandleID.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateHandleID.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateHandleID.java
new file mode 100644
index 0000000..5e95cff
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateHandleID.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state;
+
+import org.apache.flink.util.StringBasedID;
+
+/**
+ * Unique ID that allows for logical comparison between state handles.
+ * <p>
+ * Two state handles that are considered as logically equal should always 
return the same ID
+ * (whatever logically equal means is up to the implementation). For example, 
this could be based
+ * on the string representation of the full filepath for a state that is based 
on a file.
+ */
+public class StateHandleID extends StringBasedID {
+
+       private static final long serialVersionUID = 1L;
+
+       public StateHandleID(String keyString) {
+               super(keyString);
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/098e46f2/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/CheckpointTestUtils.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/CheckpointTestUtils.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/CheckpointTestUtils.java
index ba77dbc..b63782d 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/CheckpointTestUtils.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/CheckpointTestUtils.java
@@ -27,10 +27,15 @@ import org.apache.flink.runtime.checkpoint.TaskState;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.jobgraph.OperatorID;
 import org.apache.flink.runtime.state.ChainedStateHandle;
+import org.apache.flink.runtime.state.IncrementalKeyedStateHandle;
+import org.apache.flink.runtime.state.KeyGroupRange;
 import org.apache.flink.runtime.state.KeyGroupRangeOffsets;
 import org.apache.flink.runtime.state.KeyGroupsStateHandle;
+import org.apache.flink.runtime.state.KeyedStateHandle;
 import org.apache.flink.runtime.state.OperatorStateHandle;
 import org.apache.flink.runtime.state.OperatorStateHandle.StateMetaInfo;
+import org.apache.flink.runtime.state.PlaceholderStreamStateHandle;
+import org.apache.flink.runtime.state.StateHandleID;
 import org.apache.flink.runtime.state.StreamStateHandle;
 import org.apache.flink.runtime.util.TestByteStreamStateHandleDeepCompare;
 import org.apache.flink.util.StringUtils;
@@ -41,6 +46,7 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Random;
+import java.util.UUID;
 
 import static org.junit.Assert.assertArrayEquals;
 import static org.junit.Assert.assertEquals;
@@ -78,6 +84,7 @@ public class CheckpointTestUtils {
 
                        boolean hasKeyedBackend = random.nextInt(4) != 0;
                        boolean hasKeyedStream = random.nextInt(4) != 0;
+                       boolean isIncremental = random.nextInt(3) == 0;
 
                        for (int subtaskIdx = 0; subtaskIdx < 
numSubtasksPerTask; subtaskIdx++) {
 
@@ -108,21 +115,19 @@ public class CheckpointTestUtils {
                                        operatorStateHandleStream = new 
OperatorStateHandle(offsetsMap, operatorStateStream);
                                }
 
-                               KeyGroupsStateHandle keyedStateBackend = null;
-                               KeyGroupsStateHandle keyedStateStream = null;
+                               KeyedStateHandle keyedStateBackend = null;
+                               KeyedStateHandle keyedStateStream = null;
 
                                if (hasKeyedBackend) {
-                                       keyedStateBackend = new 
KeyGroupsStateHandle(
-                                                       new 
KeyGroupRangeOffsets(1, 1, new long[]{42}),
-                                                       new 
TestByteStreamStateHandleDeepCompare("c", "Hello"
-                                                                       
.getBytes(ConfigConstants.DEFAULT_CHARSET)));
+                                       if (isIncremental) {
+                                               keyedStateBackend = 
createDummyIncrementalKeyedStateHandle(random);
+                                       } else {
+                                               keyedStateBackend = 
createDummyKeyGroupStateHandle(random);
+                                       }
                                }
 
                                if (hasKeyedStream) {
-                                       keyedStateStream = new 
KeyGroupsStateHandle(
-                                                       new 
KeyGroupRangeOffsets(1, 1, new long[]{23}),
-                                                       new 
TestByteStreamStateHandleDeepCompare("d", "World"
-                                                                       
.getBytes(ConfigConstants.DEFAULT_CHARSET)));
+                                       keyedStateStream = 
createDummyKeyGroupStateHandle(random);
                                }
 
                                taskState.putState(subtaskIdx, new 
OperatorSubtaskState(
@@ -210,17 +215,11 @@ public class CheckpointTestUtils {
                                KeyGroupsStateHandle keyedStateStream = null;
 
                                if (hasKeyedBackend) {
-                                       keyedStateBackend = new 
KeyGroupsStateHandle(
-                                                       new 
KeyGroupRangeOffsets(1, 1, new long[]{42}),
-                                                       new 
TestByteStreamStateHandleDeepCompare("c", "Hello"
-                                                               
.getBytes(ConfigConstants.DEFAULT_CHARSET)));
+                                       keyedStateBackend = 
createDummyKeyGroupStateHandle(random);
                                }
 
                                if (hasKeyedStream) {
-                                       keyedStateStream = new 
KeyGroupsStateHandle(
-                                                       new 
KeyGroupRangeOffsets(1, 1, new long[]{23}),
-                                                       new 
TestByteStreamStateHandleDeepCompare("d", "World"
-                                                               
.getBytes(ConfigConstants.DEFAULT_CHARSET)));
+                                       keyedStateStream = 
createDummyKeyGroupStateHandle(random);
                                }
 
                                taskState.putState(subtaskIdx, new SubtaskState(
@@ -272,4 +271,56 @@ public class CheckpointTestUtils {
 
        /** utility class, not meant to be instantiated */
        private CheckpointTestUtils() {}
+
+
+       private static IncrementalKeyedStateHandle 
createDummyIncrementalKeyedStateHandle(Random rnd) {
+               return new IncrementalKeyedStateHandle(
+                       createRandomUUID(rnd).toString(),
+                       new KeyGroupRange(1, 1),
+                       42L,
+                       createRandomOwnedHandleMap(rnd),
+                       createRandomReferencedHandleMap(rnd),
+                       createRandomOwnedHandleMap(rnd),
+                       createDummyStreamStateHandle(rnd));
+       }
+
+       private static Map<StateHandleID, StreamStateHandle> 
createRandomOwnedHandleMap(Random rnd) {
+               final int size = rnd.nextInt(4);
+               Map<StateHandleID, StreamStateHandle> result = new 
HashMap<>(size);
+               for (int i = 0; i < size; ++i) {
+                       StateHandleID randomId = new 
StateHandleID(createRandomUUID(rnd).toString());
+                       StreamStateHandle stateHandle = 
createDummyStreamStateHandle(rnd);
+                       result.put(randomId, stateHandle);
+               }
+
+               return result;
+       }
+
+       private static Map<StateHandleID, StreamStateHandle> 
createRandomReferencedHandleMap(Random rnd) {
+               final int size = rnd.nextInt(4);
+               Map<StateHandleID, StreamStateHandle> result = new 
HashMap<>(size);
+               for (int i = 0; i < size; ++i) {
+                       StateHandleID randomId = new 
StateHandleID(createRandomUUID(rnd).toString());
+                       result.put(randomId, new 
PlaceholderStreamStateHandle(rnd.nextInt(1024)));
+               }
+
+               return result;
+       }
+
+       private static KeyGroupsStateHandle 
createDummyKeyGroupStateHandle(Random rnd) {
+               return new KeyGroupsStateHandle(
+                       new KeyGroupRangeOffsets(1, 1, new 
long[]{rnd.nextInt(1024)}),
+                       createDummyStreamStateHandle(rnd));
+       }
+
+       private static StreamStateHandle createDummyStreamStateHandle(Random 
rnd) {
+               return new TestByteStreamStateHandleDeepCompare(
+                       String.valueOf(createRandomUUID(rnd)),
+                       
String.valueOf(createRandomUUID(rnd)).getBytes(ConfigConstants.DEFAULT_CHARSET));
+       }
+
+       private static UUID createRandomUUID(Random rnd) {
+               return new UUID(rnd.nextLong(), rnd.nextLong());
+       }
+
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/098e46f2/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV2SerializerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV2SerializerTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV2SerializerTest.java
index 154d761..602390b 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV2SerializerTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV2SerializerTest.java
@@ -24,7 +24,6 @@ import 
org.apache.flink.core.memory.DataInputViewStreamWrapper;
 import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
 import org.apache.flink.runtime.checkpoint.MasterState;
 import org.apache.flink.runtime.checkpoint.OperatorState;
-
 import org.junit.Test;
 
 import java.io.DataInputStream;

http://git-wip-us.apache.org/repos/asf/flink/blob/098e46f2/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateUtilTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateUtilTest.java 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateUtilTest.java
deleted file mode 100644
index d6966d0..0000000
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateUtilTest.java
+++ /dev/null
@@ -1,36 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.state;
-
-import org.apache.flink.util.TestLogger;
-import org.junit.Test;
-
-import java.util.concurrent.RunnableFuture;
-
-public class StateUtilTest extends TestLogger {
-
-       /**
-        * Tests that {@link StateUtil#discardStateFuture} can handle state 
futures with null value.
-        */
-       @Test
-       public void testDiscardRunnableFutureWithNullValue() throws Exception {
-               RunnableFuture<StateHandle<?>> stateFuture = 
DoneFuture.nullValue();
-               StateUtil.discardStateFuture(stateFuture);
-       }
-}

Reply via email to