[FLINK-6633] Register shared state before adding to CompletedCheckpointStore


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/0162543a
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/0162543a
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/0162543a

Branch: refs/heads/master
Commit: 0162543ac13f048ef67a6586d8a6e8021ec9dcd4
Parents: 3d119e1
Author: Stefan Richter <s.rich...@data-artisans.com>
Authored: Tue May 16 12:32:05 2017 +0200
Committer: Stefan Richter <s.rich...@data-artisans.com>
Committed: Fri May 19 10:57:32 2017 +0200

----------------------------------------------------------------------
 .../state/RocksDBKeyedStateBackend.java         |  56 +--
 .../state/RocksDBStateBackendTest.java          |  88 ++++-
 .../runtime/checkpoint/CompletedCheckpoint.java | 144 ++-----
 .../flink/runtime/checkpoint/OperatorState.java |   7 -
 .../checkpoint/OperatorSubtaskState.java        |  11 -
 .../StandaloneCompletedCheckpointStore.java     |   4 +-
 .../flink/runtime/checkpoint/SubtaskState.java  |  11 -
 .../flink/runtime/checkpoint/TaskState.java     |   7 -
 .../ZooKeeperCompletedCheckpointStore.java      | 149 +++-----
 .../savepoint/SavepointV2Serializer.java        |  17 +-
 .../runtime/state/CompositeStateHandle.java     |  15 +-
 .../state/IncrementalKeyedStateHandle.java      | 171 ++++-----
 .../runtime/state/KeyGroupsStateHandle.java     |   5 -
 .../state/PlaceholderStreamStateHandle.java     |  44 +--
 .../runtime/state/SharedStateRegistry.java      |  54 +--
 .../state/memory/ByteStreamStateHandle.java     |   7 +
 .../checkpoint/CheckpointCoordinatorTest.java   |  25 --
 .../CompletedCheckpointStoreTest.java           |  61 +--
 .../checkpoint/CompletedCheckpointTest.java     |   3 -
 .../checkpoint/PendingCheckpointTest.java       |   1 -
 ...ZooKeeperCompletedCheckpointStoreITCase.java |   7 +-
 .../savepoint/CheckpointTestUtils.java          |  25 +-
 .../state/IncrementalKeyedStateHandleTest.java  | 206 ++++++++++
 .../runtime/state/SharedStateRegistryTest.java  |  14 +-
 .../runtime/state/StateBackendTestBase.java     |   2 -
 .../RecoverableCompletedCheckpointStore.java    |   5 +-
 ...tractEventTimeWindowCheckpointingITCase.java |   9 +-
 .../JobManagerHACheckpointRecoveryITCase.java   | 375 +++++++++----------
 28 files changed, 747 insertions(+), 776 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/0162543a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
----------------------------------------------------------------------
diff --git 
a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
 
b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
index 88a759d..1f32a89 100644
--- 
a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
+++ 
b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
@@ -105,6 +105,7 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.PriorityQueue;
+import java.util.Set;
 import java.util.SortedMap;
 import java.util.TreeMap;
 import java.util.UUID;
@@ -170,8 +171,8 @@ public class RocksDBKeyedStateBackend<K> extends 
AbstractKeyedStateBackend<K> {
        /** True if incremental checkpointing is enabled */
        private final boolean enableIncrementalCheckpointing;
 
-       /** The sst files materialized in pending checkpoints */
-       private final SortedMap<Long, Map<StateHandleID, StreamStateHandle>> 
materializedSstFiles = new TreeMap<>();
+       /** The state handle ids of all sst files materialized in snapshots for 
previous checkpoints */
+       private final SortedMap<Long, Set<StateHandleID>> materializedSstFiles 
= new TreeMap<>();
 
        /** The identifier of the last completed checkpoint */
        private long lastCompletedCheckpointId = -1;
@@ -720,7 +721,7 @@ public class RocksDBKeyedStateBackend<K> extends 
AbstractKeyedStateBackend<K> {
                private final long checkpointTimestamp;
 
                /** All sst files that were part of the last previously 
completed checkpoint */
-               private Map<StateHandleID, StreamStateHandle> baseSstFiles;
+               private Set<StateHandleID> baseSstFiles;
 
                /** The state meta data */
                private final 
List<RegisteredKeyedBackendStateMetaInfo.Snapshot<?, ?>> stateMetaInfoSnapshots 
= new ArrayList<>();
@@ -732,10 +733,7 @@ public class RocksDBKeyedStateBackend<K> extends 
AbstractKeyedStateBackend<K> {
                private final CloseableRegistry closeableRegistry = new 
CloseableRegistry();
 
                // new sst files since the last completed checkpoint
-               private final Map<StateHandleID, StreamStateHandle> newSstFiles 
= new HashMap<>();
-
-               // old sst files which have been materialized in previous 
completed checkpoints
-               private final Map<StateHandleID, StreamStateHandle> oldSstFiles 
= new HashMap<>();
+               private final Map<StateHandleID, StreamStateHandle> sstFiles = 
new HashMap<>();
 
                // handles to the misc files in the current snapshot
                private final Map<StateHandleID, StreamStateHandle> miscFiles = 
new HashMap<>();
@@ -830,7 +828,6 @@ public class RocksDBKeyedStateBackend<K> extends 
AbstractKeyedStateBackend<K> {
                        // use the last completed checkpoint as the comparison 
base.
                        baseSstFiles = 
stateBackend.materializedSstFiles.get(stateBackend.lastCompletedCheckpointId);
 
-
                        // save meta data
                        for (Map.Entry<String, Tuple2<ColumnFamilyHandle, 
RegisteredKeyedBackendStateMetaInfo<?, ?>>> stateMetaInfoEntry
                                        : 
stateBackend.kvStateInformation.entrySet()) {
@@ -867,18 +864,17 @@ public class RocksDBKeyedStateBackend<K> extends 
AbstractKeyedStateBackend<K> {
                                        final StateHandleID stateHandleID = new 
StateHandleID(fileName);
 
                                        if (fileName.endsWith(SST_FILE_SUFFIX)) 
{
-                                               StreamStateHandle fileHandle =
-                                                       baseSstFiles == null ? 
null : baseSstFiles.get(fileName);
+                                               final boolean existsAlready =
+                                                       baseSstFiles == null ? 
false : baseSstFiles.contains(stateHandleID);
 
-                                               if (fileHandle == null) {
-                                                       fileHandle = 
materializeStateData(filePath);
-                                                       
newSstFiles.put(stateHandleID, fileHandle);
-                                               } else {
+                                               if (existsAlready) {
                                                        // we introduce a 
placeholder state handle, that is replaced with the
                                                        // original from the 
shared state registry (created from a previous checkpoint)
-                                                       oldSstFiles.put(
+                                                       sstFiles.put(
                                                                stateHandleID,
-                                                               new 
PlaceholderStreamStateHandle(fileHandle.getStateSize()));
+                                                               new 
PlaceholderStreamStateHandle());
+                                               } else {
+                                                       
sstFiles.put(stateHandleID, materializeStateData(filePath));
                                                }
                                        } else {
                                                StreamStateHandle fileHandle = 
materializeStateData(filePath);
@@ -887,22 +883,17 @@ public class RocksDBKeyedStateBackend<K> extends 
AbstractKeyedStateBackend<K> {
                                }
                        }
 
-                       Map<StateHandleID, StreamStateHandle> sstFiles =
-                               new HashMap<>(newSstFiles.size() + 
oldSstFiles.size());
 
-                       sstFiles.putAll(newSstFiles);
-                       sstFiles.putAll(oldSstFiles);
 
                        synchronized (stateBackend.asyncSnapshotLock) {
-                               
stateBackend.materializedSstFiles.put(checkpointId, sstFiles);
+                               
stateBackend.materializedSstFiles.put(checkpointId, sstFiles.keySet());
                        }
 
                        return new IncrementalKeyedStateHandle(
                                stateBackend.operatorIdentifier,
                                stateBackend.keyGroupRange,
                                checkpointId,
-                               newSstFiles,
-                               oldSstFiles,
+                               sstFiles,
                                miscFiles,
                                metaStateHandle);
                }
@@ -933,7 +924,7 @@ public class RocksDBKeyedStateBackend<K> extends 
AbstractKeyedStateBackend<K> {
 
                                statesToDiscard.add(metaStateHandle);
                                statesToDiscard.addAll(miscFiles.values());
-                               statesToDiscard.addAll(newSstFiles.values());
+                               statesToDiscard.addAll(sstFiles.values());
 
                                try {
                                        
StateUtil.bestEffortDiscardAllStateObjects(statesToDiscard);
@@ -1308,15 +1299,12 @@ public class RocksDBKeyedStateBackend<K> extends 
AbstractKeyedStateBackend<K> {
                                UUID.randomUUID().toString());
 
                        try {
-                               final Map<StateHandleID, StreamStateHandle> 
newSstFiles =
-                                       
restoreStateHandle.getCreatedSharedState();
-                               final Map<StateHandleID, StreamStateHandle> 
oldSstFiles =
-                                       
restoreStateHandle.getReferencedSharedState();
+                               final Map<StateHandleID, StreamStateHandle> 
sstFiles =
+                                       restoreStateHandle.getSharedState();
                                final Map<StateHandleID, StreamStateHandle> 
miscFiles =
                                        restoreStateHandle.getPrivateState();
 
-                               readAllStateData(newSstFiles, 
restoreInstancePath);
-                               readAllStateData(oldSstFiles, 
restoreInstancePath);
+                               readAllStateData(sstFiles, restoreInstancePath);
                                readAllStateData(miscFiles, 
restoreInstancePath);
 
                                // read meta data
@@ -1409,8 +1397,7 @@ public class RocksDBKeyedStateBackend<K> extends 
AbstractKeyedStateBackend<K> {
                                                throw new IOException("Could 
not create RocksDB data directory.");
                                        }
 
-                                       
createFileHardLinksInRestorePath(newSstFiles, restoreInstancePath);
-                                       
createFileHardLinksInRestorePath(oldSstFiles, restoreInstancePath);
+                                       
createFileHardLinksInRestorePath(sstFiles, restoreInstancePath);
                                        
createFileHardLinksInRestorePath(miscFiles, restoreInstancePath);
 
                                        List<ColumnFamilyHandle> 
columnFamilyHandles = new ArrayList<>();
@@ -1437,10 +1424,7 @@ public class RocksDBKeyedStateBackend<K> extends 
AbstractKeyedStateBackend<K> {
 
 
                                        // use the restore sst files as the 
base for succeeding checkpoints
-                                       Map<StateHandleID, StreamStateHandle> 
sstFiles = new HashMap<>();
-                                       sstFiles.putAll(newSstFiles);
-                                       sstFiles.putAll(oldSstFiles);
-                                       
stateBackend.materializedSstFiles.put(restoreStateHandle.getCheckpointId(), 
sstFiles);
+                                       
stateBackend.materializedSstFiles.put(restoreStateHandle.getCheckpointId(), 
sstFiles.keySet());
 
                                        stateBackend.lastCompletedCheckpointId 
= restoreStateHandle.getCheckpointId();
                                }

http://git-wip-us.apache.org/repos/asf/flink/blob/0162543a/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendTest.java
----------------------------------------------------------------------
diff --git 
a/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendTest.java
 
b/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendTest.java
index 9340455..89eb1d5 100644
--- 
a/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendTest.java
+++ 
b/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendTest.java
@@ -26,18 +26,22 @@ import org.apache.flink.api.common.state.ValueState;
 import org.apache.flink.api.common.state.ValueStateDescriptor;
 import org.apache.flink.api.common.typeutils.base.IntSerializer;
 import org.apache.flink.core.testutils.OneShotLatch;
-import org.apache.flink.runtime.util.BlockerCheckpointStreamFactory;
 import org.apache.flink.runtime.checkpoint.CheckpointOptions;
 import org.apache.flink.runtime.execution.Environment;
 import org.apache.flink.runtime.operators.testutils.DummyEnvironment;
 import org.apache.flink.runtime.query.TaskKvStateRegistry;
 import org.apache.flink.runtime.state.AbstractKeyedStateBackend;
+import org.apache.flink.runtime.state.IncrementalKeyedStateHandle;
 import org.apache.flink.runtime.state.KeyGroupRange;
 import org.apache.flink.runtime.state.KeyedStateHandle;
+import org.apache.flink.runtime.state.SharedStateRegistry;
 import org.apache.flink.runtime.state.StateBackendTestBase;
+import org.apache.flink.runtime.state.StateHandleID;
+import org.apache.flink.runtime.state.StreamStateHandle;
 import org.apache.flink.runtime.state.VoidNamespace;
 import org.apache.flink.runtime.state.VoidNamespaceSerializer;
 import org.apache.flink.runtime.state.filesystem.FsStateBackend;
+import org.apache.flink.runtime.util.BlockerCheckpointStreamFactory;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.TemporaryFolder;
@@ -58,7 +62,11 @@ import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
+import java.util.HashMap;
+import java.util.LinkedList;
 import java.util.List;
+import java.util.Map;
+import java.util.Queue;
 import java.util.concurrent.RunnableFuture;
 
 import static junit.framework.TestCase.assertNotNull;
@@ -67,6 +75,7 @@ import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 import static org.mockito.Matchers.any;
 import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.reset;
 import static org.mockito.Mockito.verify;
 import static org.mockito.internal.verification.VerificationModeFactory.times;
 import static org.powermock.api.mockito.PowerMockito.mock;
@@ -351,6 +360,83 @@ public class RocksDBStateBackendTest extends 
StateBackendTestBase<RocksDBStateBa
                assertEquals(1, allFilesInDbDir.size());
        }
 
+       @Test
+       public void testSharedIncrementalStateDeRegistration() throws Exception 
{
+               if (enableIncrementalCheckpointing) {
+                       AbstractKeyedStateBackend<Integer> backend = 
createKeyedBackend(IntSerializer.INSTANCE);
+                       ValueStateDescriptor<String> kvId =
+                               new ValueStateDescriptor<>("id", String.class, 
null);
+
+                       kvId.initializeSerializerUnlessSet(new 
ExecutionConfig());
+
+                       ValueState<String> state =
+                               
backend.getPartitionedState(VoidNamespace.INSTANCE, 
VoidNamespaceSerializer.INSTANCE, kvId);
+
+
+                       Queue<IncrementalKeyedStateHandle> previousStateHandles 
= new LinkedList<>();
+                       SharedStateRegistry sharedStateRegistry = spy(new 
SharedStateRegistry());
+                       for (int checkpointId = 0; checkpointId < 3; 
++checkpointId) {
+
+                               reset(sharedStateRegistry);
+
+                               backend.setCurrentKey(checkpointId);
+                               state.update("Hello-" + checkpointId);
+
+                               RunnableFuture<KeyedStateHandle> snapshot = 
backend.snapshot(
+                                       checkpointId,
+                                       checkpointId,
+                                       createStreamFactory(),
+                                       CheckpointOptions.forFullCheckpoint());
+
+                               snapshot.run();
+
+                               IncrementalKeyedStateHandle stateHandle = 
(IncrementalKeyedStateHandle) snapshot.get();
+                               Map<StateHandleID, StreamStateHandle> 
sharedState =
+                                       new 
HashMap<>(stateHandle.getSharedState());
+
+                               
stateHandle.registerSharedStates(sharedStateRegistry);
+
+                               for (Map.Entry<StateHandleID, 
StreamStateHandle> e : sharedState.entrySet()) {
+                                       
verify(sharedStateRegistry).registerReference(
+                                               
stateHandle.createSharedStateRegistryKeyFromFileName(e.getKey()),
+                                               e.getValue());
+                               }
+
+                               previousStateHandles.add(stateHandle);
+                               backend.notifyCheckpointComplete(checkpointId);
+
+                               
//-----------------------------------------------------------------
+
+                               if (previousStateHandles.size() > 1) {
+                                       
checkRemove(previousStateHandles.remove(), sharedStateRegistry);
+                               }
+                       }
+
+                       while (!previousStateHandles.isEmpty()) {
+
+                               reset(sharedStateRegistry);
+
+                               checkRemove(previousStateHandles.remove(), 
sharedStateRegistry);
+                       }
+
+                       backend.close();
+                       backend.dispose();
+               }
+       }
+
+       private void checkRemove(IncrementalKeyedStateHandle remove, 
SharedStateRegistry registry) throws Exception {
+               for (StateHandleID id : remove.getSharedState().keySet()) {
+                       verify(registry, times(0)).unregisterReference(
+                               
remove.createSharedStateRegistryKeyFromFileName(id));
+               }
+
+               remove.discardState();
+
+               for (StateHandleID id : remove.getSharedState().keySet()) {
+                       verify(registry).unregisterReference(
+                               
remove.createSharedStateRegistryKeyFromFileName(id));
+               }
+       }
 
        private void runStateUpdates() throws Exception{
                for (int i = 50; i < 150; ++i) {

http://git-wip-us.apache.org/repos/asf/flink/blob/0162543a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpoint.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpoint.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpoint.java
index 1ab5b41..b382080 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpoint.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpoint.java
@@ -25,8 +25,6 @@ import org.apache.flink.runtime.state.SharedStateRegistry;
 import org.apache.flink.runtime.state.StateUtil;
 import org.apache.flink.runtime.state.StreamStateHandle;
 import org.apache.flink.util.ExceptionUtils;
-import org.apache.flink.util.Preconditions;
-
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -177,13 +175,13 @@ public class CompletedCheckpoint implements Serializable {
        }
 
        public void discardOnFailedStoring() throws Exception {
-               new UnstoredDiscardStategy().discard();
+               doDiscard();
        }
 
        public boolean discardOnSubsume(SharedStateRegistry 
sharedStateRegistry) throws Exception {
 
                if (props.discardOnSubsumed()) {
-                       new 
StoredDiscardStrategy(sharedStateRegistry).discard();
+                       doDiscard();
                        return true;
                }
 
@@ -197,7 +195,7 @@ public class CompletedCheckpoint implements Serializable {
                                jobStatus == JobStatus.FAILED && 
props.discardOnJobFailed() ||
                                jobStatus == JobStatus.SUSPENDED && 
props.discardOnJobSuspended()) {
 
-                       new 
StoredDiscardStrategy(sharedStateRegistry).discard();
+                       doDiscard();
                        return true;
                } else {
                        if (externalPointer != null) {
@@ -209,6 +207,42 @@ public class CompletedCheckpoint implements Serializable {
                }
        }
 
+       private void doDiscard() throws Exception {
+
+               try {
+                       // collect exceptions and continue cleanup
+                       Exception exception = null;
+
+                       // drop the metadata, if we have some
+                       if (externalizedMetadata != null) {
+                               try {
+                                       externalizedMetadata.discardState();
+                               } catch (Exception e) {
+                                       exception = e;
+                               }
+                       }
+
+                       // discard private state objects
+                       try {
+                               
StateUtil.bestEffortDiscardAllStateObjects(operatorStates.values());
+                       } catch (Exception e) {
+                               exception = ExceptionUtils.firstOrSuppressed(e, 
exception);
+                       }
+
+                       if (exception != null) {
+                               throw exception;
+                       }
+               } finally {
+                       operatorStates.clear();
+
+                       // to be null-pointer safe, copy reference to stack
+                       CompletedCheckpointStats.DiscardCallback 
discardCallback = this.discardCallback;
+                       if (discardCallback != null) {
+                               discardCallback.notifyDiscardedCheckpoint();
+                       }
+               }
+       }
+
        public long getStateSize() {
                long result = 0L;
 
@@ -252,7 +286,7 @@ public class CompletedCheckpoint implements Serializable {
 
        /**
         * Register all shared states in the given registry. This is method is 
called
-        * when the completed checkpoint has been successfully added into the 
store.
+        * before the checkpoint is added into the store.
         *
         * @param sharedStateRegistry The registry where shared states are 
registered
         */
@@ -266,102 +300,4 @@ public class CompletedCheckpoint implements Serializable {
        public String toString() {
                return String.format("Checkpoint %d @ %d for %s", checkpointID, 
timestamp, job);
        }
-
-       /**
-        * Base class for the discarding strategies of {@link 
CompletedCheckpoint}.
-        */
-       private abstract class DiscardStrategy {
-
-               protected Exception storedException;
-
-               public DiscardStrategy() {
-                       this.storedException = null;
-               }
-
-               public void discard() throws Exception {
-
-                       try {
-                               // collect exceptions and continue cleanup
-                               storedException = null;
-
-                               doDiscardExternalizedMetaData();
-                               doDiscardSharedState();
-                               doDiscardPrivateState();
-                               doReportStoredExceptions();
-                       } finally {
-                               clearTaskStatesAndNotifyDiscardCompleted();
-                       }
-               }
-
-               protected void doDiscardExternalizedMetaData() {
-                       // drop the metadata, if we have some
-                       if (externalizedMetadata != null) {
-                               try {
-                                       externalizedMetadata.discardState();
-                               } catch (Exception e) {
-                                       storedException = e;
-                               }
-                       }
-               }
-
-               protected void doDiscardPrivateState() {
-                       // discard private state objects
-                       try {
-                               
StateUtil.bestEffortDiscardAllStateObjects(operatorStates.values());
-                       } catch (Exception e) {
-                               storedException = 
ExceptionUtils.firstOrSuppressed(e, storedException);
-                       }
-               }
-
-               protected abstract void doDiscardSharedState();
-
-               protected void doReportStoredExceptions() throws Exception {
-                       if (storedException != null) {
-                               throw storedException;
-                       }
-               }
-
-               protected void clearTaskStatesAndNotifyDiscardCompleted() {
-                       operatorStates.clear();
-                       // to be null-pointer safe, copy reference to stack
-                       CompletedCheckpointStats.DiscardCallback 
discardCallback =
-                               CompletedCheckpoint.this.discardCallback;
-
-                       if (discardCallback != null) {
-                               discardCallback.notifyDiscardedCheckpoint();
-                       }
-               }
-       }
-
-       /**
-        * Discard all shared states created in the checkpoint. This strategy 
is applied
-        * when the completed checkpoint fails to be added into the store.
-        */
-       private class UnstoredDiscardStategy extends 
CompletedCheckpoint.DiscardStrategy {
-
-               @Override
-               protected void doDiscardSharedState() {
-                       // nothing to do because we did not register any shared 
state yet. unregistered, new
-                       // shared state is then still considered private state 
and deleted as part of
-                       // doDiscardPrivateState().
-               }
-       }
-
-       /**
-        * Unregister all shared states from the given registry. This is 
strategy is
-        * applied when the completed checkpoint is subsumed or the job 
terminates.
-        */
-       private class StoredDiscardStrategy extends 
CompletedCheckpoint.DiscardStrategy {
-
-               SharedStateRegistry sharedStateRegistry;
-
-               public StoredDiscardStrategy(SharedStateRegistry 
sharedStateRegistry) {
-                       this.sharedStateRegistry = 
Preconditions.checkNotNull(sharedStateRegistry);
-               }
-
-               @Override
-               protected void doDiscardSharedState() {
-                       
sharedStateRegistry.unregisterAll(operatorStates.values());
-               }
-       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/0162543a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/OperatorState.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/OperatorState.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/OperatorState.java
index aa676e7..b153028 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/OperatorState.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/OperatorState.java
@@ -126,13 +126,6 @@ public class OperatorState implements CompositeStateHandle 
{
        }
 
        @Override
-       public void unregisterSharedStates(SharedStateRegistry 
sharedStateRegistry) {
-               for (OperatorSubtaskState operatorSubtaskState : 
operatorSubtaskStates.values()) {
-                       
operatorSubtaskState.unregisterSharedStates(sharedStateRegistry);
-               }
-       }
-
-       @Override
        public long getStateSize() {
                long result = 0L;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/0162543a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/OperatorSubtaskState.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/OperatorSubtaskState.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/OperatorSubtaskState.java
index 49ef863..e2ae632 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/OperatorSubtaskState.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/OperatorSubtaskState.java
@@ -158,17 +158,6 @@ public class OperatorSubtaskState implements 
CompositeStateHandle {
        }
 
        @Override
-       public void unregisterSharedStates(SharedStateRegistry 
sharedStateRegistry) {
-               if (managedKeyedState != null) {
-                       
managedKeyedState.unregisterSharedStates(sharedStateRegistry);
-               }
-
-               if (rawKeyedState != null) {
-                       
rawKeyedState.unregisterSharedStates(sharedStateRegistry);
-               }
-       }
-
-       @Override
        public long getStateSize() {
                return stateSize;
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/0162543a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StandaloneCompletedCheckpointStore.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StandaloneCompletedCheckpointStore.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StandaloneCompletedCheckpointStore.java
index f5e1db3..233cfc8 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StandaloneCompletedCheckpointStore.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StandaloneCompletedCheckpointStore.java
@@ -63,10 +63,10 @@ public class StandaloneCompletedCheckpointStore extends 
AbstractCompletedCheckpo
        @Override
        public void addCheckpoint(CompletedCheckpoint checkpoint) throws 
Exception {
                
-               checkpoints.addLast(checkpoint);
-
                checkpoint.registerSharedStates(sharedStateRegistry);
 
+               checkpoints.addLast(checkpoint);
+
                if (checkpoints.size() > maxNumberOfCheckpointsToRetain) {
                        try {
                                CompletedCheckpoint checkpointToSubsume = 
checkpoints.removeFirst();

http://git-wip-us.apache.org/repos/asf/flink/blob/0162543a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/SubtaskState.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/SubtaskState.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/SubtaskState.java
index a77baf3..20d675b 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/SubtaskState.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/SubtaskState.java
@@ -162,17 +162,6 @@ public class SubtaskState implements CompositeStateHandle {
        }
 
        @Override
-       public void unregisterSharedStates(SharedStateRegistry 
sharedStateRegistry) {
-               if (managedKeyedState != null) {
-                       
managedKeyedState.unregisterSharedStates(sharedStateRegistry);
-               }
-
-               if (rawKeyedState != null) {
-                       
rawKeyedState.unregisterSharedStates(sharedStateRegistry);
-               }
-       }
-
-       @Override
        public long getStateSize() {
                return stateSize;
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/0162543a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/TaskState.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/TaskState.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/TaskState.java
index aa5c516..ed847a4 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/TaskState.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/TaskState.java
@@ -141,13 +141,6 @@ public class TaskState implements CompositeStateHandle {
        }
 
        @Override
-       public void unregisterSharedStates(SharedStateRegistry 
sharedStateRegistry) {
-               for (SubtaskState subtaskState : subtaskStates.values()) {
-                       
subtaskState.unregisterSharedStates(sharedStateRegistry);
-               }
-       }
-
-       @Override
        public long getStateSize() {
                long result = 0L;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/0162543a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java
index 084d93e..4c3c1ff 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java
@@ -35,7 +35,6 @@ import javax.annotation.Nullable;
 import java.util.ArrayDeque;
 import java.util.ArrayList;
 import java.util.ConcurrentModificationException;
-import java.util.Iterator;
 import java.util.List;
 import java.util.concurrent.Executor;
 
@@ -79,7 +78,7 @@ public class ZooKeeperCompletedCheckpointStore extends 
AbstractCompletedCheckpoi
        private final int maxNumberOfCheckpointsToRetain;
 
        /** Local completed checkpoints. */
-       private final 
ArrayDeque<Tuple2<RetrievableStateHandle<CompletedCheckpoint>, String>> 
checkpointStateHandles;
+       private final ArrayDeque<CompletedCheckpoint> completedCheckpoints;
 
        /**
         * Creates a {@link ZooKeeperCompletedCheckpointStore} instance.
@@ -122,7 +121,7 @@ public class ZooKeeperCompletedCheckpointStore extends 
AbstractCompletedCheckpoi
 
                this.checkpointsInZooKeeper = new 
ZooKeeperStateHandleStore<>(this.client, stateStorage, executor);
 
-               this.checkpointStateHandles = new 
ArrayDeque<>(maxNumberOfCheckpointsToRetain + 1);
+               this.completedCheckpoints = new 
ArrayDeque<>(maxNumberOfCheckpointsToRetain + 1);
                
                LOG.info("Initialized in '{}'.", checkpointsPath);
        }
@@ -146,7 +145,7 @@ public class ZooKeeperCompletedCheckpointStore extends 
AbstractCompletedCheckpoi
                // Clear local handles in order to prevent duplicates on
                // recovery. The local handles should reflect the state
                // of ZooKeeper.
-               checkpointStateHandles.clear();
+               completedCheckpoints.clear();
 
                // Get all there is first
                List<Tuple2<RetrievableStateHandle<CompletedCheckpoint>, 
String>> initialCheckpoints;
@@ -170,6 +169,11 @@ public class ZooKeeperCompletedCheckpointStore extends 
AbstractCompletedCheckpoi
 
                        try {
                                completedCheckpoint = 
retrieveCompletedCheckpoint(checkpointStateHandle);
+                               if (completedCheckpoint != null) {
+                                       // Re-register all shared states in the 
checkpoint.
+                                       
completedCheckpoint.registerSharedStates(sharedStateRegistry);
+                                       
completedCheckpoints.add(completedCheckpoint);
+                               }
                        } catch (Exception e) {
                                LOG.warn("Could not retrieve checkpoint. 
Removing it from the completed " +
                                        "checkpoint store.", e);
@@ -177,11 +181,6 @@ public class ZooKeeperCompletedCheckpointStore extends 
AbstractCompletedCheckpoi
                                // remove the checkpoint with broken state 
handle
                                
removeBrokenStateHandle(checkpointStateHandle.f1, checkpointStateHandle.f0);
                        }
-
-                       if (completedCheckpoint != null) {
-                               
completedCheckpoint.registerSharedStates(sharedStateRegistry);
-                               
checkpointStateHandles.add(checkpointStateHandle);
-                       }
                }
        }
 
@@ -195,20 +194,19 @@ public class ZooKeeperCompletedCheckpointStore extends 
AbstractCompletedCheckpoi
                checkNotNull(checkpoint, "Checkpoint");
                
                final String path = 
checkpointIdToPath(checkpoint.getCheckpointID());
-               final RetrievableStateHandle<CompletedCheckpoint> stateHandle;
 
-               // First add the new one. If it fails, we don't want to loose 
existing data.
-               stateHandle = checkpointsInZooKeeper.addAndLock(path, 
checkpoint);
+               // First, register all shared states in the checkpoint to 
consolidates placeholder.
+               checkpoint.registerSharedStates(sharedStateRegistry);
 
-               checkpointStateHandles.addLast(new Tuple2<>(stateHandle, path));
+               // Now add the new one. If it fails, we don't want to loose 
existing data.
+               checkpointsInZooKeeper.addAndLock(path, checkpoint);
 
-               // Register all shared states in the checkpoint
-               checkpoint.registerSharedStates(sharedStateRegistry);
+               completedCheckpoints.addLast(checkpoint);
 
                // Everything worked, let's remove a previous checkpoint if 
necessary.
-               while (checkpointStateHandles.size() > 
maxNumberOfCheckpointsToRetain) {
+               while (completedCheckpoints.size() > 
maxNumberOfCheckpointsToRetain) {
                        try {
-                               
removeSubsumed(checkpointStateHandles.removeFirst().f1, sharedStateRegistry);
+                               
removeSubsumed(completedCheckpoints.removeFirst(), sharedStateRegistry);
                        } catch (Exception e) {
                                LOG.warn("Failed to subsume the old 
checkpoint", e);
                        }
@@ -219,60 +217,23 @@ public class ZooKeeperCompletedCheckpointStore extends 
AbstractCompletedCheckpoi
 
        @Override
        public CompletedCheckpoint getLatestCheckpoint() {
-               if (checkpointStateHandles.isEmpty()) {
+               if (completedCheckpoints.isEmpty()) {
                        return null;
                }
                else {
-                       while(!checkpointStateHandles.isEmpty()) {
-                               
Tuple2<RetrievableStateHandle<CompletedCheckpoint>, String> 
checkpointStateHandle = checkpointStateHandles.peekLast();
-
-                               try {
-                                       return 
retrieveCompletedCheckpoint(checkpointStateHandle);
-                               } catch (Exception e) {
-                                       LOG.warn("Could not retrieve latest 
checkpoint. Removing it from " +
-                                               "the completed checkpoint 
store.", e);
-
-                                       try {
-                                               // remove the checkpoint with 
broken state handle
-                                               
Tuple2<RetrievableStateHandle<CompletedCheckpoint>, String> checkpoint = 
checkpointStateHandles.pollLast();
-                                               
removeBrokenStateHandle(checkpoint.f1, checkpoint.f0);
-                                       } catch (Exception removeException) {
-                                               LOG.warn("Could not remove the 
latest checkpoint with a broken state handle.", removeException);
-                                       }
-                               }
-                       }
-
-                       return null;
+                       return completedCheckpoints.peekLast();
                }
        }
 
        @Override
        public List<CompletedCheckpoint> getAllCheckpoints() throws Exception {
-               List<CompletedCheckpoint> checkpoints = new 
ArrayList<>(checkpointStateHandles.size());
-
-               Iterator<Tuple2<RetrievableStateHandle<CompletedCheckpoint>, 
String>> stateHandleIterator = checkpointStateHandles.iterator();
-
-               while (stateHandleIterator.hasNext()) {
-                       Tuple2<RetrievableStateHandle<CompletedCheckpoint>, 
String> stateHandlePath = stateHandleIterator.next();
-
-                       try {
-                               
checkpoints.add(retrieveCompletedCheckpoint(stateHandlePath));
-                       } catch (Exception e) {
-                               LOG.warn("Could not retrieve checkpoint. 
Removing it from the completed " +
-                                       "checkpoint store.", e);
-
-                               // remove the checkpoint with broken state 
handle
-                               stateHandleIterator.remove();
-                               removeBrokenStateHandle(stateHandlePath.f1, 
stateHandlePath.f0);
-                       }
-               }
-
+               List<CompletedCheckpoint> checkpoints = new 
ArrayList<>(completedCheckpoints);
                return checkpoints;
        }
 
        @Override
        public int getNumberOfRetainedCheckpoints() {
-               return checkpointStateHandles.size();
+               return completedCheckpoints.size();
        }
 
        @Override
@@ -285,15 +246,15 @@ public class ZooKeeperCompletedCheckpointStore extends 
AbstractCompletedCheckpoi
                if (jobStatus.isGloballyTerminalState()) {
                        LOG.info("Shutting down");
 
-                       for 
(Tuple2<RetrievableStateHandle<CompletedCheckpoint>, String> checkpoint : 
checkpointStateHandles) {
+                       for (CompletedCheckpoint checkpoint : 
completedCheckpoints) {
                                try {
-                                       removeShutdown(checkpoint.f1, 
jobStatus, sharedStateRegistry);
+                                       removeShutdown(checkpoint, jobStatus, 
sharedStateRegistry);
                                } catch (Exception e) {
                                        LOG.error("Failed to discard 
checkpoint.", e);
                                }
                        }
 
-                       checkpointStateHandles.clear();
+                       completedCheckpoints.clear();
 
                        String path = "/" + client.getNamespace();
 
@@ -303,7 +264,7 @@ public class ZooKeeperCompletedCheckpointStore extends 
AbstractCompletedCheckpoi
                        LOG.info("Suspending");
 
                        // Clear the local handles, but don't remove any state
-                       checkpointStateHandles.clear();
+                       completedCheckpoints.clear();
 
                        // Release the state handle locks in ZooKeeper such 
that they can be deleted
                        checkpointsInZooKeeper.releaseAll();
@@ -313,21 +274,18 @@ public class ZooKeeperCompletedCheckpointStore extends 
AbstractCompletedCheckpoi
        // 
------------------------------------------------------------------------
 
        private void removeSubsumed(
-               final String pathInZooKeeper,
+               final CompletedCheckpoint completedCheckpoint,
                final SharedStateRegistry sharedStateRegistry) throws Exception 
{
-               
-               ZooKeeperStateHandleStore.RemoveCallback<CompletedCheckpoint> 
action = new ZooKeeperStateHandleStore.RemoveCallback<CompletedCheckpoint>() {
-                       @Override
-                       public void apply(@Nullable 
RetrievableStateHandle<CompletedCheckpoint> value) throws FlinkException {
-                               if (value != null) {
-                                       final CompletedCheckpoint 
completedCheckpoint;
-                                       try {
-                                               completedCheckpoint = 
value.retrieveState();
-                                       } catch (Exception e) {
-                                               throw new FlinkException("Could 
not retrieve the completed checkpoint from the given state handle.", e);
-                                       }
 
-                                       if (completedCheckpoint != null) {
+               if(completedCheckpoint == null) {
+                       return;
+               }
+
+               ZooKeeperStateHandleStore.RemoveCallback<CompletedCheckpoint> 
action =
+                       new 
ZooKeeperStateHandleStore.RemoveCallback<CompletedCheckpoint>() {
+                               @Override
+                               public void apply(@Nullable 
RetrievableStateHandle<CompletedCheckpoint> value) throws FlinkException {
+                                       if (value != null) {
                                                try {
                                                        
completedCheckpoint.discardOnSubsume(sharedStateRegistry);
                                                } catch (Exception e) {
@@ -335,46 +293,41 @@ public class ZooKeeperCompletedCheckpointStore extends 
AbstractCompletedCheckpoi
                                                }
                                        }
                                }
-                       }
-               };
+                       };
 
-               checkpointsInZooKeeper.releaseAndTryRemove(pathInZooKeeper, 
action);
+               checkpointsInZooKeeper.releaseAndTryRemove(
+                       
checkpointIdToPath(completedCheckpoint.getCheckpointID()),
+                       action);
        }
 
        private void removeShutdown(
-                       final String pathInZooKeeper,
+                       final CompletedCheckpoint completedCheckpoint,
                        final JobStatus jobStatus,
                        final SharedStateRegistry sharedStateRegistry) throws 
Exception {
 
+               if(completedCheckpoint == null) {
+                       return;
+               }
+
                ZooKeeperStateHandleStore.RemoveCallback<CompletedCheckpoint> 
removeAction = new 
ZooKeeperStateHandleStore.RemoveCallback<CompletedCheckpoint>() {
                        @Override
                        public void apply(@Nullable 
RetrievableStateHandle<CompletedCheckpoint> value) throws FlinkException {
-                               if (value != null) {
-                                       final CompletedCheckpoint 
completedCheckpoint;
-
-                                       try {
-                                               completedCheckpoint = 
value.retrieveState();
-                                       } catch (Exception e) {
-                                               throw new FlinkException("Could 
not retrieve the completed checkpoint from the given state handle.", e);
-                                       }
-
-                                       if (completedCheckpoint != null) {
-                                               try {
-                                                       
completedCheckpoint.discardOnShutdown(jobStatus, sharedStateRegistry);
-                                               } catch (Exception e) {
-                                                       throw new 
FlinkException("Could not discard the completed checkpoint on subsume.", e);
-                                               }
-                                       }
+                               try {
+                                       
completedCheckpoint.discardOnShutdown(jobStatus, sharedStateRegistry);
+                               } catch (Exception e) {
+                                       throw new FlinkException("Could not 
discard the completed checkpoint on subsume.", e);
                                }
                        }
                };
 
-               checkpointsInZooKeeper.releaseAndTryRemove(pathInZooKeeper, 
removeAction);
+               checkpointsInZooKeeper.releaseAndTryRemove(
+                       
checkpointIdToPath(completedCheckpoint.getCheckpointID()),
+                       removeAction);
        }
 
        private void removeBrokenStateHandle(
-                       final String pathInZooKeeper,
-                       final RetrievableStateHandle<CompletedCheckpoint> 
retrievableStateHandle) throws Exception {
+               final String pathInZooKeeper,
+               final RetrievableStateHandle<CompletedCheckpoint> 
retrievableStateHandle) throws Exception {
                checkpointsInZooKeeper.releaseAndTryRemove(pathInZooKeeper, new 
ZooKeeperStateHandleStore.RemoveCallback<CompletedCheckpoint>() {
                        @Override
                        public void apply(@Nullable 
RetrievableStateHandle<CompletedCheckpoint> value) throws FlinkException {

http://git-wip-us.apache.org/repos/asf/flink/blob/0162543a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV2Serializer.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV2Serializer.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV2Serializer.java
index b71418b..da0022c 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV2Serializer.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV2Serializer.java
@@ -29,7 +29,6 @@ import org.apache.flink.runtime.state.KeyGroupRangeOffsets;
 import org.apache.flink.runtime.state.KeyGroupsStateHandle;
 import org.apache.flink.runtime.state.KeyedStateHandle;
 import org.apache.flink.runtime.state.OperatorStateHandle;
-import org.apache.flink.runtime.state.PlaceholderStreamStateHandle;
 import org.apache.flink.runtime.state.StateHandleID;
 import org.apache.flink.runtime.state.StreamStateHandle;
 import org.apache.flink.runtime.state.filesystem.FileStateHandle;
@@ -75,7 +74,6 @@ class SavepointV2Serializer implements 
SavepointSerializer<SavepointV2> {
        private static final byte KEY_GROUPS_HANDLE = 3;
        private static final byte PARTITIONABLE_OPERATOR_STATE_HANDLE = 4;
        private static final byte INCREMENTAL_KEY_GROUPS_HANDLE = 5;
-       private static final byte PLACEHOLDER_STREAM_STATE_HANDLE = 6;
 
        /** The singleton instance of the serializer */
        public static final SavepointV2Serializer INSTANCE = new 
SavepointV2Serializer();
@@ -328,8 +326,7 @@ class SavepointV2Serializer implements 
SavepointSerializer<SavepointV2> {
 
                        
serializeStreamStateHandle(incrementalKeyedStateHandle.getMetaStateHandle(), 
dos);
 
-                       
serializeStreamStateHandleMap(incrementalKeyedStateHandle.getCreatedSharedState(),
 dos);
-                       
serializeStreamStateHandleMap(incrementalKeyedStateHandle.getReferencedSharedState(),
 dos);
+                       
serializeStreamStateHandleMap(incrementalKeyedStateHandle.getSharedState(), 
dos);
                        
serializeStreamStateHandleMap(incrementalKeyedStateHandle.getPrivateState(), 
dos);
                } else {
                        throw new IllegalStateException("Unknown 
KeyedStateHandle type: " + stateHandle.getClass());
@@ -390,16 +387,14 @@ class SavepointV2Serializer implements 
SavepointSerializer<SavepointV2> {
                                KeyGroupRange.of(startKeyGroup, startKeyGroup + 
numKeyGroups - 1);
 
                        StreamStateHandle metaDataStateHandle = 
deserializeStreamStateHandle(dis);
-                       Map<StateHandleID, StreamStateHandle> createdStates = 
deserializeStreamStateHandleMap(dis);
-                       Map<StateHandleID, StreamStateHandle> referencedStates 
= deserializeStreamStateHandleMap(dis);
+                       Map<StateHandleID, StreamStateHandle> sharedStates = 
deserializeStreamStateHandleMap(dis);
                        Map<StateHandleID, StreamStateHandle> privateStates = 
deserializeStreamStateHandleMap(dis);
 
                        return new IncrementalKeyedStateHandle(
                                operatorId,
                                keyGroupRange,
                                checkpointId,
-                               createdStates,
-                               referencedStates,
+                               sharedStates,
                                privateStates,
                                metaDataStateHandle);
                } else {
@@ -485,10 +480,6 @@ class SavepointV2Serializer implements 
SavepointSerializer<SavepointV2> {
                        byte[] internalData = byteStreamStateHandle.getData();
                        dos.writeInt(internalData.length);
                        dos.write(byteStreamStateHandle.getData());
-               } else if (stateHandle instanceof PlaceholderStreamStateHandle) 
{
-                       PlaceholderStreamStateHandle placeholder = 
(PlaceholderStreamStateHandle) stateHandle;
-                       dos.writeByte(PLACEHOLDER_STREAM_STATE_HANDLE);
-                       dos.writeLong(placeholder.getStateSize());
                } else {
                        throw new IOException("Unknown implementation of 
StreamStateHandle: " + stateHandle.getClass());
                }
@@ -510,8 +501,6 @@ class SavepointV2Serializer implements 
SavepointSerializer<SavepointV2> {
                        byte[] data = new byte[numBytes];
                        dis.readFully(data);
                        return new ByteStreamStateHandle(handleName, data);
-               } else if (PLACEHOLDER_STREAM_STATE_HANDLE == type) {
-                       return new PlaceholderStreamStateHandle(dis.readLong());
                } else {
                        throw new IOException("Unknown implementation of 
StreamStateHandle, code: " + type);
                }

http://git-wip-us.apache.org/repos/asf/flink/blob/0162543a/flink-runtime/src/main/java/org/apache/flink/runtime/state/CompositeStateHandle.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/CompositeStateHandle.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/CompositeStateHandle.java
index 002b7c3..1bc6a0f 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/CompositeStateHandle.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/CompositeStateHandle.java
@@ -34,7 +34,8 @@ package org.apache.flink.runtime.state;
  * this handle and considered as private state until it is registered for the 
first time. Registration
  * transfers ownership to the {@link SharedStateRegistry}.
  * The composite state handle should only delete all private states in the
- * {@link StateObject#discardState()} method.
+ * {@link StateObject#discardState()} method, the {@link SharedStateRegistry} 
is responsible for
+ * deleting shared states after they were registered.
  */
 public interface CompositeStateHandle extends StateObject {
 
@@ -45,18 +46,10 @@ public interface CompositeStateHandle extends StateObject {
         * <p>
         * After this is completed, newly created shared state is considered as 
published is no longer
         * owned by this handle. This means that it should no longer be deleted 
as part of calls to
-        * {@link #discardState()}.
+        * {@link #discardState()}. Instead, {@link #discardState()} will 
trigger an unregistration
+        * from the registry.
         *
         * @param stateRegistry The registry where shared states are registered.
         */
        void registerSharedStates(SharedStateRegistry stateRegistry);
-
-       /**
-        * Unregister both created and referenced shared states in the given
-        * {@link SharedStateRegistry}. This method is called when the 
checkpoint is
-        * subsumed or the job is shut down.
-        *
-        * @param stateRegistry The registry where shared states are registered.
-        */
-       void unregisterSharedStates(SharedStateRegistry stateRegistry);
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/0162543a/flink-runtime/src/main/java/org/apache/flink/runtime/state/IncrementalKeyedStateHandle.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/IncrementalKeyedStateHandle.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/IncrementalKeyedStateHandle.java
index 706e219..770b5a9 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/IncrementalKeyedStateHandle.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/IncrementalKeyedStateHandle.java
@@ -28,18 +28,24 @@ import java.util.Map;
 /**
  * The handle to states of an incremental snapshot.
  * <p>
- * The states contained in an incremental snapshot include
+ * The states contained in an incremental snapshot include:
  * <ul>
- * <li> Created shared state which includes (the supposed to be) shared files 
produced since the last
+ * <li> Created shared state which includes shared files produced since the 
last
  * completed checkpoint. These files can be referenced by succeeding 
checkpoints if the
  * checkpoint succeeds to complete. </li>
  * <li> Referenced shared state which includes the shared files materialized 
in previous
- * checkpoints. </li>
+ * checkpoints. Until we this is registered to a {@link SharedStateRegistry}, 
all referenced
+ * shared state handles are only placeholders, so that we do not send state 
handles twice
+ * from which we know that they already exist on the checkpoint 
coordinator.</li>
  * <li> Private state which includes all other files, typically mutable, that 
cannot be shared by
  * other checkpoints. </li>
  * <li> Backend meta state which includes the information of existing states. 
</li>
  * </ul>
  *
+ * When this should become a completed checkpoint on the checkpoint 
coordinator, it must first be
+ * registered with a {@link SharedStateRegistry}, so that all placeholder 
state handles to
+ * previously existing state are replaced with the originals.
+ *
  * IMPORTANT: This class currently overrides equals and hash code only for 
testing purposes. They
  * should not be called from production code. This means this class is also 
not suited to serve as
  * a key, e.g. in hash maps.
@@ -66,14 +72,9 @@ public class IncrementalKeyedStateHandle implements 
KeyedStateHandle {
        private final long checkpointId;
 
        /**
-        * State that the incremental checkpoint created new
-        */
-       private final Map<StateHandleID, StreamStateHandle> createdSharedState;
-
-       /**
-        * State that the incremental checkpoint references from previous 
checkpoints
+        * Shared state in the incremental checkpoint. This i
         */
-       private final Map<StateHandleID, StreamStateHandle> 
referencedSharedState;
+       private final Map<StateHandleID, StreamStateHandle> sharedState;
 
        /**
         * Private state in the incremental checkpoint
@@ -86,32 +87,30 @@ public class IncrementalKeyedStateHandle implements 
KeyedStateHandle {
        private final StreamStateHandle metaStateHandle;
 
        /**
-        * True if the state handle has already registered shared states.
-        * <p>
-        * Once the shared states are registered, it's the {@link 
SharedStateRegistry}'s
-        * responsibility to maintain the shared states. But in the cases where 
the
-        * state handle is discarded before performing the registration, the 
handle
-        * should delete all the shared states created by it.
+        * Once the shared states are registered, it is the {@link 
SharedStateRegistry}'s
+        * responsibility to cleanup those shared states.
+        * But in the cases where the state handle is discarded before 
performing the registration,
+        * the handle should delete all the shared states created by it.
+        *
+        * This variable is not null iff the handles was registered.
         */
-       private boolean registered;
+       private transient SharedStateRegistry sharedStateRegistry;
 
        public IncrementalKeyedStateHandle(
                String operatorIdentifier,
                KeyGroupRange keyGroupRange,
                long checkpointId,
-               Map<StateHandleID, StreamStateHandle> createdSharedState,
-               Map<StateHandleID, StreamStateHandle> referencedSharedState,
+               Map<StateHandleID, StreamStateHandle> sharedState,
                Map<StateHandleID, StreamStateHandle> privateState,
                StreamStateHandle metaStateHandle) {
 
                this.operatorIdentifier = 
Preconditions.checkNotNull(operatorIdentifier);
                this.keyGroupRange = Preconditions.checkNotNull(keyGroupRange);
                this.checkpointId = checkpointId;
-               this.createdSharedState = 
Preconditions.checkNotNull(createdSharedState);
-               this.referencedSharedState = 
Preconditions.checkNotNull(referencedSharedState);
+               this.sharedState = Preconditions.checkNotNull(sharedState);
                this.privateState = Preconditions.checkNotNull(privateState);
                this.metaStateHandle = 
Preconditions.checkNotNull(metaStateHandle);
-               this.registered = false;
+               this.sharedStateRegistry = null;
        }
 
        @Override
@@ -123,12 +122,8 @@ public class IncrementalKeyedStateHandle implements 
KeyedStateHandle {
                return checkpointId;
        }
 
-       public Map<StateHandleID, StreamStateHandle> getCreatedSharedState() {
-               return createdSharedState;
-       }
-
-       public Map<StateHandleID, StreamStateHandle> getReferencedSharedState() 
{
-               return referencedSharedState;
+       public Map<StateHandleID, StreamStateHandle> getSharedState() {
+               return sharedState;
        }
 
        public Map<StateHandleID, StreamStateHandle> getPrivateState() {
@@ -155,8 +150,6 @@ public class IncrementalKeyedStateHandle implements 
KeyedStateHandle {
        @Override
        public void discardState() throws Exception {
 
-               Preconditions.checkState(!registered, "Attempt to dispose a 
registered composite state with registered shared state. Must unregister 
first.");
-
                try {
                        metaStateHandle.discardState();
                } catch (Exception e) {
@@ -169,37 +162,35 @@ public class IncrementalKeyedStateHandle implements 
KeyedStateHandle {
                        LOG.warn("Could not properly discard misc file 
states.", e);
                }
 
-               try {
-                       
StateUtil.bestEffortDiscardAllStateObjects(createdSharedState.values());
-               } catch (Exception e) {
-                       LOG.warn("Could not properly discard new sst file 
states.", e);
+               // If this was not registered, we can delete the shared state. 
We can simply apply this
+               // to all handles, because all handles that have not been 
created for the first time for this
+               // are only placeholders at this point (disposing them is a 
NOP).
+               if (sharedStateRegistry == null) {
+                       try {
+                               
StateUtil.bestEffortDiscardAllStateObjects(sharedState.values());
+                       } catch (Exception e) {
+                               LOG.warn("Could not properly discard new sst 
file states.", e);
+                       }
+               } else {
+                       // If this was registered, we only unregister all our 
referenced shared states
+                       // from the registry.
+                       for (StateHandleID stateHandleID : 
sharedState.keySet()) {
+                               sharedStateRegistry.unregisterReference(
+                                       
createSharedStateRegistryKeyFromFileName(stateHandleID));
+                       }
                }
-
        }
 
        @Override
        public long getStateSize() {
-               long size = getPrivateStateSize();
-
-               for (StreamStateHandle oldSstFileHandle : 
referencedSharedState.values()) {
-                       size += oldSstFileHandle.getStateSize();
-               }
-
-               return size;
-       }
-
-       /**
-        * Returns the size of the state that is privately owned by this handle.
-        */
-       public long getPrivateStateSize() {
                long size = StateUtil.getStateSize(metaStateHandle);
 
-               for (StreamStateHandle newSstFileHandle : 
createdSharedState.values()) {
-                       size += newSstFileHandle.getStateSize();
+               for (StreamStateHandle sharedStateHandle : 
sharedState.values()) {
+                       size += sharedStateHandle.getStateSize();
                }
 
-               for (StreamStateHandle miscFileHandle : privateState.values()) {
-                       size += miscFileHandle.getStateSize();
+               for (StreamStateHandle privateStateHandle : 
privateState.values()) {
+                       size += privateStateHandle.getStateSize();
                }
 
                return size;
@@ -208,64 +199,38 @@ public class IncrementalKeyedStateHandle implements 
KeyedStateHandle {
        @Override
        public void registerSharedStates(SharedStateRegistry stateRegistry) {
 
-               Preconditions.checkState(!registered, "The state handle has 
already registered its shared states.");
+               Preconditions.checkState(sharedStateRegistry == null, "The 
state handle has already registered its shared states.");
+
+               sharedStateRegistry = Preconditions.checkNotNull(stateRegistry);
 
-               for (Map.Entry<StateHandleID, StreamStateHandle> 
newSstFileEntry : createdSharedState.entrySet()) {
+               for (Map.Entry<StateHandleID, StreamStateHandle> 
sharedStateHandle : sharedState.entrySet()) {
                        SharedStateRegistryKey registryKey =
-                               
createSharedStateRegistryKeyFromFileName(newSstFileEntry.getKey());
+                               
createSharedStateRegistryKeyFromFileName(sharedStateHandle.getKey());
 
                        SharedStateRegistry.Result result =
-                               stateRegistry.registerNewReference(registryKey, 
newSstFileEntry.getValue());
-
-                       // We update our reference with the result from the 
registry, to prevent the following
-                       // problem:
+                               stateRegistry.registerReference(registryKey, 
sharedStateHandle.getValue());
+
+                       // This step consolidates our shared handles with the 
registry, which does two things:
+                       //
+                       // 1) Replace placeholder state handle with already 
registered, actual state handles.
+                       //
+                       // 2) Deduplicate re-uploads of incremental state due 
to missing confirmations about
+                       // completed checkpoints.
+                       //
+                       // This prevents the following problem:
                        // A previous checkpoint n has already registered the 
state. This can happen if a
                        // following checkpoint (n + x) wants to reference the 
same state before the backend got
                        // notified that checkpoint n completed. In this case, 
the shared registry did
                        // deduplication and returns the previous reference.
-                       newSstFileEntry.setValue(result.getReference());
-               }
-
-               for (Map.Entry<StateHandleID, StreamStateHandle> oldSstFileName 
: referencedSharedState.entrySet()) {
-                       SharedStateRegistryKey registryKey =
-                               
createSharedStateRegistryKeyFromFileName(oldSstFileName.getKey());
-
-                       SharedStateRegistry.Result result = 
stateRegistry.obtainReference(registryKey);
-
-                       // Again we update our state handle with the result 
from the registry, thus replacing
-                       // placeholder state handles with the originals.
-                       oldSstFileName.setValue(result.getReference());
-               }
-
-               // Migrate state from unregistered to registered, so that it 
will not count as private state
-               // for #discardState() from now.
-               referencedSharedState.putAll(createdSharedState);
-               createdSharedState.clear();
-
-               registered = true;
-       }
-
-       @Override
-       public void unregisterSharedStates(SharedStateRegistry stateRegistry) {
-
-               Preconditions.checkState(registered, "The state handle has not 
registered its shared states yet.");
-
-               for (Map.Entry<StateHandleID, StreamStateHandle> 
newSstFileEntry : createdSharedState.entrySet()) {
-                       SharedStateRegistryKey registryKey =
-                               
createSharedStateRegistryKeyFromFileName(newSstFileEntry.getKey());
-                       stateRegistry.releaseReference(registryKey);
-               }
-
-               for (Map.Entry<StateHandleID, StreamStateHandle> 
oldSstFileEntry : referencedSharedState.entrySet()) {
-                       SharedStateRegistryKey registryKey =
-                               
createSharedStateRegistryKeyFromFileName(oldSstFileEntry.getKey());
-                       stateRegistry.releaseReference(registryKey);
+                       sharedStateHandle.setValue(result.getReference());
                }
-
-               registered = false;
        }
 
-       private SharedStateRegistryKey 
createSharedStateRegistryKeyFromFileName(StateHandleID shId) {
+       /**
+        * Create a unique key to register one of our shared state handles.
+        */
+       @VisibleForTesting
+       public SharedStateRegistryKey 
createSharedStateRegistryKeyFromFileName(StateHandleID shId) {
                return new SharedStateRegistryKey(operatorIdentifier + '-' + 
keyGroupRange, shId);
        }
 
@@ -293,10 +258,7 @@ public class IncrementalKeyedStateHandle implements 
KeyedStateHandle {
                if (!getKeyGroupRange().equals(that.getKeyGroupRange())) {
                        return false;
                }
-               if 
(!getCreatedSharedState().equals(that.getCreatedSharedState())) {
-                       return false;
-               }
-               if 
(!getReferencedSharedState().equals(that.getReferencedSharedState())) {
+               if (!getSharedState().equals(that.getSharedState())) {
                        return false;
                }
                if (!getPrivateState().equals(that.getPrivateState())) {
@@ -314,8 +276,7 @@ public class IncrementalKeyedStateHandle implements 
KeyedStateHandle {
                int result = getOperatorIdentifier().hashCode();
                result = 31 * result + getKeyGroupRange().hashCode();
                result = 31 * result + (int) (getCheckpointId() ^ 
(getCheckpointId() >>> 32));
-               result = 31 * result + getCreatedSharedState().hashCode();
-               result = 31 * result + getReferencedSharedState().hashCode();
+               result = 31 * result + getSharedState().hashCode();
                result = 31 * result + getPrivateState().hashCode();
                result = 31 * result + getMetaStateHandle().hashCode();
                return result;

http://git-wip-us.apache.org/repos/asf/flink/blob/0162543a/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyGroupsStateHandle.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyGroupsStateHandle.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyGroupsStateHandle.java
index 8280460..8e38ad4 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyGroupsStateHandle.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyGroupsStateHandle.java
@@ -98,11 +98,6 @@ public class KeyGroupsStateHandle implements 
StreamStateHandle, KeyedStateHandle
        }
 
        @Override
-       public void unregisterSharedStates(SharedStateRegistry stateRegistry) {
-               // No shared states
-       }
-
-       @Override
        public void discardState() throws Exception {
                stateHandle.discardState();
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/0162543a/flink-runtime/src/main/java/org/apache/flink/runtime/state/PlaceholderStreamStateHandle.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/PlaceholderStreamStateHandle.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/PlaceholderStreamStateHandle.java
index 2136061..7c948a1 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/PlaceholderStreamStateHandle.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/PlaceholderStreamStateHandle.java
@@ -18,29 +18,20 @@
 
 package org.apache.flink.runtime.state;
 
-import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.core.fs.FSDataInputStream;
 import org.apache.flink.runtime.state.memory.ByteStreamStateHandle;
 
 /**
  * A placeholder state handle for shared state that will replaced by an 
original that was
- * created in a previous checkpoint. So we don't have to send the handle 
twice, e.g. in
- * case of {@link ByteStreamStateHandle}. To be used in the referenced states 
of
+ * created in a previous checkpoint. So we don't have to send a state handle 
twice, e.g. in
+ * case of {@link ByteStreamStateHandle}. This class is used in the referenced 
states of
  * {@link IncrementalKeyedStateHandle}.
- * <p>
- * IMPORTANT: This class currently overrides equals and hash code only for 
testing purposes. They
- * should not be called from production code. This means this class is also 
not suited to serve as
- * a key, e.g. in hash maps.
  */
 public class PlaceholderStreamStateHandle implements StreamStateHandle {
 
        private static final long serialVersionUID = 1L;
 
-       /** We remember the size of the original file for which this is a 
placeholder */
-       private final long originalSize;
-
-       public PlaceholderStreamStateHandle(long originalSize) {
-               this.originalSize = originalSize;
+       public PlaceholderStreamStateHandle() {
        }
 
        @Override
@@ -56,33 +47,6 @@ public class PlaceholderStreamStateHandle implements 
StreamStateHandle {
 
        @Override
        public long getStateSize() {
-               return originalSize;
-       }
-
-       /**
-        * This method is should only be called in tests! This should never 
serve as key in a hash map.
-        */
-       @VisibleForTesting
-       @Override
-       public boolean equals(Object o) {
-               if (this == o) {
-                       return true;
-               }
-               if (o == null || getClass() != o.getClass()) {
-                       return false;
-               }
-
-               PlaceholderStreamStateHandle that = 
(PlaceholderStreamStateHandle) o;
-
-               return originalSize == that.originalSize;
-       }
-
-       /**
-        * This method is should only be called in tests! This should never 
serve as key in a hash map.
-        */
-       @VisibleForTesting
-       @Override
-       public int hashCode() {
-               return (int) (originalSize ^ (originalSize >>> 32));
+               return 0L;
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/0162543a/flink-runtime/src/main/java/org/apache/flink/runtime/state/SharedStateRegistry.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/SharedStateRegistry.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/SharedStateRegistry.java
index f9161b0..a5e0f84 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/SharedStateRegistry.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/SharedStateRegistry.java
@@ -54,7 +54,7 @@ public class SharedStateRegistry {
        }
 
        /**
-        * Register a reference to the given (supposedly new) shared state in 
the registry.
+        * Register a reference to the given shared state in the registry.
         * This does the following: We check if the state handle is actually 
new by the
         * registrationKey. If it is new, we register it with a reference count 
of 1. If there is
         * already a state handle registered under the given key, we dispose 
the given "new" state
@@ -62,14 +62,14 @@ public class SharedStateRegistry {
         * a replacement with the result.
         *
         * <p>IMPORTANT: caller should check the state handle returned by the 
result, because the
-        * registry is performing deduplication and could potentially return a 
handle that is supposed
+        * registry is performing de-duplication and could potentially return a 
handle that is supposed
         * to replace the one from the registration request.
         *
         * @param state the shared state for which we register a reference.
         * @return the result of this registration request, consisting of the 
state handle that is
         * registered under the key by the end of the oepration and its current 
reference count.
         */
-       public Result registerNewReference(SharedStateRegistryKey 
registrationKey, StreamStateHandle state) {
+       public Result registerReference(SharedStateRegistryKey registrationKey, 
StreamStateHandle state) {
 
                Preconditions.checkNotNull(state);
 
@@ -96,28 +96,6 @@ public class SharedStateRegistry {
        }
 
        /**
-        * Obtains one reference to the given shared state in the registry. 
This increases the
-        * reference count by one.
-        *
-        * @param registrationKey the shared state for which we obtain a 
reference.
-        * @return the shared state for which we release a reference.
-        * @return the result of the request, consisting of the reference count 
after this operation
-        * and the state handle.
-        */
-       public Result obtainReference(SharedStateRegistryKey registrationKey) {
-
-               Preconditions.checkNotNull(registrationKey);
-
-               synchronized (registeredStates) {
-                       SharedStateRegistry.SharedStateEntry entry =
-                               
Preconditions.checkNotNull(registeredStates.get(registrationKey),
-                                       "Could not find a state for the given 
registration key!");
-                       entry.increaseReferenceCount();
-                       return new Result(entry);
-               }
-       }
-
-       /**
         * Releases one reference to the given shared state in the registry. 
This decreases the
         * reference count by one. Once the count reaches zero, the shared 
state is deleted.
         *
@@ -125,7 +103,7 @@ public class SharedStateRegistry {
         * @return the result of the request, consisting of the reference count 
after this operation
         * and the state handle, or null if the state handle was deleted 
through this request.
         */
-       public Result releaseReference(SharedStateRegistryKey registrationKey) {
+       public Result unregisterReference(SharedStateRegistryKey 
registrationKey) {
 
                Preconditions.checkNotNull(registrationKey);
 
@@ -172,30 +150,18 @@ public class SharedStateRegistry {
                }
        }
 
-       /**
-        * Unregister all the shared states referenced by the given.
-        *
-        * @param stateHandles The shared states to unregister.
-        */
-       public void unregisterAll(Iterable<? extends CompositeStateHandle> 
stateHandles) {
-               if (stateHandles == null) {
-                       return;
-               }
-
-               synchronized (registeredStates) {
-                       for (CompositeStateHandle stateHandle : stateHandles) {
-                               stateHandle.unregisterSharedStates(this);
-                       }
-               }
-       }
-
        private void scheduleAsyncDelete(StreamStateHandle streamStateHandle) {
-               if (streamStateHandle != null) {
+               // We do the small optimization to not issue discards for 
placeholders, which are NOPs.
+               if (streamStateHandle != null && 
!isPlaceholder(streamStateHandle)) {
                        asyncDisposalExecutor.execute(
                                new 
SharedStateRegistry.AsyncDisposalRunnable(streamStateHandle));
                }
        }
 
+       private boolean isPlaceholder(StreamStateHandle stateHandle) {
+               return stateHandle instanceof PlaceholderStreamStateHandle;
+       }
+
        /**
         * An entry in the registry, tracking the handle and the corresponding 
reference count.
         */

http://git-wip-us.apache.org/repos/asf/flink/blob/0162543a/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/ByteStreamStateHandle.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/ByteStreamStateHandle.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/ByteStreamStateHandle.java
index 42703f8..9ba9d35 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/ByteStreamStateHandle.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/ByteStreamStateHandle.java
@@ -91,6 +91,13 @@ public class ByteStreamStateHandle implements 
StreamStateHandle {
                return 31 * handleName.hashCode();
        }
 
+       @Override
+       public String toString() {
+               return "ByteStreamStateHandle{" +
+                       "handleName='" + handleName + '\'' +
+                       '}';
+       }
+
        /**
         * An input stream view on a byte array.
         */

http://git-wip-us.apache.org/repos/asf/flink/blob/0162543a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
index 9250634..3b44d9a 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
@@ -639,12 +639,6 @@ public class CheckpointCoordinatorTest {
                        assertEquals(checkpointIdNew, 
successNew.getCheckpointID());
                        assertTrue(successNew.getOperatorStates().isEmpty());
 
-                       // validate that the subtask states in old savepoint 
have unregister their shared states
-                       {
-                               verify(subtaskState1, 
times(1)).unregisterSharedStates(any(SharedStateRegistry.class));
-                               verify(subtaskState2, 
times(1)).unregisterSharedStates(any(SharedStateRegistry.class));
-                       }
-
                        // validate that the relevant tasks got a confirmation 
message
                        {
                                verify(vertex1.getCurrentExecutionAttempt(), 
times(1)).triggerCheckpoint(eq(checkpointIdNew), eq(timestampNew), 
any(CheckpointOptions.class));
@@ -925,9 +919,6 @@ public class CheckpointCoordinatorTest {
                        verify(subtaskState1_2, times(1)).discardState();
 
                        // validate that all subtask states in the second 
checkpoint are not discarded
-                       verify(subtaskState2_1, 
never()).unregisterSharedStates(any(SharedStateRegistry.class));
-                       verify(subtaskState2_2, 
never()).unregisterSharedStates(any(SharedStateRegistry.class));
-                       verify(subtaskState2_3, 
never()).unregisterSharedStates(any(SharedStateRegistry.class));
                        verify(subtaskState2_1, never()).discardState();
                        verify(subtaskState2_2, never()).discardState();
                        verify(subtaskState2_3, never()).discardState();
@@ -951,9 +942,6 @@ public class CheckpointCoordinatorTest {
                        coord.shutdown(JobStatus.FINISHED);
 
                        // validate that the states in the second checkpoint 
have been discarded
-                       verify(subtaskState2_1, 
times(1)).unregisterSharedStates(any(SharedStateRegistry.class));
-                       verify(subtaskState2_2, 
times(1)).unregisterSharedStates(any(SharedStateRegistry.class));
-                       verify(subtaskState2_3, 
times(1)).unregisterSharedStates(any(SharedStateRegistry.class));
                        verify(subtaskState2_1, times(1)).discardState();
                        verify(subtaskState2_2, times(1)).discardState();
                        verify(subtaskState2_3, times(1)).discardState();
@@ -1562,10 +1550,6 @@ public class CheckpointCoordinatorTest {
                verify(subtaskState1, never()).discardState();
                verify(subtaskState2, never()).discardState();
 
-               // Savepoints are not supposed to have any shared state.
-               verify(subtaskState1, 
never()).unregisterSharedStates(any(SharedStateRegistry.class));
-               verify(subtaskState2, 
never()).unregisterSharedStates(any(SharedStateRegistry.class));
-
                // validate that the relevant tasks got a confirmation message
                {
                        verify(vertex1.getCurrentExecutionAttempt(), 
times(1)).triggerCheckpoint(eq(checkpointIdNew), eq(timestampNew), 
any(CheckpointOptions.class));
@@ -2088,15 +2072,6 @@ public class CheckpointCoordinatorTest {
                // shutdown the store
                store.shutdown(JobStatus.SUSPENDED);
 
-               // All shared states should be unregistered once the store is 
shut down
-               for (CompletedCheckpoint completedCheckpoint : 
completedCheckpoints) {
-                       for (OperatorState taskState : 
completedCheckpoint.getOperatorStates().values()) {
-                               for (OperatorSubtaskState subtaskState : 
taskState.getStates()) {
-                                       verify(subtaskState, 
times(1)).unregisterSharedStates(any(SharedStateRegistry.class));
-                               }
-                       }
-               }
-
                // restore the store
                Map<JobVertexID, ExecutionJobVertex> tasks = new HashMap<>();
 

http://git-wip-us.apache.org/repos/asf/flink/blob/0162543a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointStoreTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointStoreTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointStoreTest.java
index 985c662..fb5d7c3 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointStoreTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointStoreTest.java
@@ -23,8 +23,8 @@ import org.apache.flink.runtime.jobgraph.JobStatus;
 import org.apache.flink.runtime.jobgraph.OperatorID;
 import org.apache.flink.runtime.state.SharedStateRegistry;
 import org.apache.flink.util.TestLogger;
+import org.junit.Assert;
 import org.junit.Test;
-import org.mockito.Mockito;
 
 import java.io.IOException;
 import java.util.Collection;
@@ -37,11 +37,6 @@ import java.util.concurrent.TimeUnit;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
-import static org.mockito.Matchers.any;
-import static org.mockito.Matchers.eq;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
 
 /**
  * Test for basic {@link CompletedCheckpointStore} contract.
@@ -114,12 +109,6 @@ public abstract class CompletedCheckpointStoreTest extends 
TestLogger {
                        expected[i - 1].awaitDiscard();
                        assertTrue(expected[i - 1].isDiscarded());
                        assertEquals(1, 
checkpoints.getNumberOfRetainedCheckpoints());
-
-                       for (OperatorState operatorState : taskStates) {
-                               for (OperatorSubtaskState subtaskState : 
operatorState.getStates()) {
-                                       verify(subtaskState, 
times(1)).unregisterSharedStates(any(SharedStateRegistry.class));
-                               }
-                       }
                }
        }
 
@@ -209,7 +198,8 @@ public abstract class CompletedCheckpointStoreTest extends 
TestLogger {
                operatorGroupState.put(operatorID, operatorState);
 
                for (int i = 0; i < numberOfStates; i++) {
-                       OperatorSubtaskState subtaskState = 
mock(OperatorSubtaskState.class);
+                       OperatorSubtaskState subtaskState =
+                               new TestOperatorSubtaskState();
 
                        operatorState.putState(i, subtaskState);
                }
@@ -217,18 +207,10 @@ public abstract class CompletedCheckpointStoreTest 
extends TestLogger {
                return new TestCompletedCheckpoint(new JobID(), id, 0, 
operatorGroupState, props);
        }
 
-       protected void resetCheckpoint(Collection<OperatorState> 
operatorStates) {
-               for (OperatorState operatorState : operatorStates) {
-                       for (OperatorSubtaskState subtaskState : 
operatorState.getStates()) {
-                               Mockito.reset(subtaskState);
-                       }
-               }
-       }
-
        protected void verifyCheckpointRegistered(Collection<OperatorState> 
operatorStates, SharedStateRegistry registry) {
                for (OperatorState operatorState : operatorStates) {
                        for (OperatorSubtaskState subtaskState : 
operatorState.getStates()) {
-                               verify(subtaskState, 
times(1)).registerSharedStates(eq(registry));
+                               
Assert.assertTrue(((TestOperatorSubtaskState)subtaskState).registered);
                        }
                }
        }
@@ -236,7 +218,7 @@ public abstract class CompletedCheckpointStoreTest extends 
TestLogger {
        protected void verifyCheckpointDiscarded(Collection<OperatorState> 
operatorStates) {
                for (OperatorState operatorState : operatorStates) {
                        for (OperatorSubtaskState subtaskState : 
operatorState.getStates()) {
-                               verify(subtaskState, times(1)).discardState();
+                               
Assert.assertTrue(((TestOperatorSubtaskState)subtaskState).discarded);
                        }
                }
        }
@@ -333,4 +315,37 @@ public abstract class CompletedCheckpointStoreTest extends 
TestLogger {
                }
        }
 
+       static class TestOperatorSubtaskState extends OperatorSubtaskState {
+               private static final long serialVersionUID = 
522580433699164230L;
+
+               boolean registered;
+               boolean discarded;
+
+               public TestOperatorSubtaskState() {
+                       super(null, null, null, null, null);
+                       this.registered = false;
+                       this.discarded = false;
+               }
+
+               @Override
+               public void discardState() {
+                       super.discardState();
+                       Assert.assertFalse(discarded);
+                       discarded = true;
+                       registered = false;
+               }
+
+               @Override
+               public void registerSharedStates(SharedStateRegistry 
sharedStateRegistry) {
+                       super.registerSharedStates(sharedStateRegistry);
+                       Assert.assertFalse(discarded);
+                       registered = true;
+               }
+
+               public void reset() {
+                       registered = false;
+                       discarded = false;
+               }
+       }
+
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/0162543a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointTest.java
index 589ff46..0bbb961 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointTest.java
@@ -100,7 +100,6 @@ public class CompletedCheckpointTest {
                checkpoint.discardOnSubsume(sharedStateRegistry);
 
                verify(state, times(1)).discardState();
-               verify(state, 
times(1)).unregisterSharedStates(sharedStateRegistry);
        }
 
        /**
@@ -138,7 +137,6 @@ public class CompletedCheckpointTest {
                        checkpoint.discardOnShutdown(status, 
sharedStateRegistry);
                        verify(state, times(0)).discardState();
                        assertEquals(true, file.exists());
-                       verify(state, 
times(0)).unregisterSharedStates(sharedStateRegistry);
 
                        // Discard
                        props = new CheckpointProperties(false, false, true, 
true, true, true, true);
@@ -152,7 +150,6 @@ public class CompletedCheckpointTest {
 
                        checkpoint.discardOnShutdown(status, 
sharedStateRegistry);
                        verify(state, times(1)).discardState();
-                       verify(state, 
times(1)).unregisterSharedStates(sharedStateRegistry);
                }
        }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/0162543a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PendingCheckpointTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PendingCheckpointTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PendingCheckpointTest.java
index 6df01a0..a96b597 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PendingCheckpointTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PendingCheckpointTest.java
@@ -197,7 +197,6 @@ public class PendingCheckpointTest {
 
                OperatorState state = mock(OperatorState.class);
                
doNothing().when(state).registerSharedStates(any(SharedStateRegistry.class));
-               
doNothing().when(state).unregisterSharedStates(any(SharedStateRegistry.class));
 
                String targetDir = tmpFolder.newFolder().getAbsolutePath();
 

http://git-wip-us.apache.org/repos/asf/flink/blob/0162543a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreITCase.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreITCase.java
index 0d93289..44c802b 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreITCase.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreITCase.java
@@ -34,6 +34,7 @@ import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import static org.junit.Assert.assertEquals;
@@ -100,11 +101,7 @@ public class ZooKeeperCompletedCheckpointStoreITCase 
extends CompletedCheckpoint
                assertEquals(3, 
ZOOKEEPER.getClient().getChildren().forPath(CHECKPOINT_PATH).size());
                assertEquals(3, checkpoints.getNumberOfRetainedCheckpoints());
 
-               resetCheckpoint(expected[0].getOperatorStates().values());
-               resetCheckpoint(expected[1].getOperatorStates().values());
-               resetCheckpoint(expected[2].getOperatorStates().values());
-
-               // Recover TODO!!! clear registry!
+               // Recover
                checkpoints.recover();
 
                assertEquals(3, 
ZOOKEEPER.getClient().getChildren().forPath(CHECKPOINT_PATH).size());

http://git-wip-us.apache.org/repos/asf/flink/blob/0162543a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/CheckpointTestUtils.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/CheckpointTestUtils.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/CheckpointTestUtils.java
index b63782d..f985573 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/CheckpointTestUtils.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/CheckpointTestUtils.java
@@ -34,7 +34,6 @@ import org.apache.flink.runtime.state.KeyGroupsStateHandle;
 import org.apache.flink.runtime.state.KeyedStateHandle;
 import org.apache.flink.runtime.state.OperatorStateHandle;
 import org.apache.flink.runtime.state.OperatorStateHandle.StateMetaInfo;
-import org.apache.flink.runtime.state.PlaceholderStreamStateHandle;
 import org.apache.flink.runtime.state.StateHandleID;
 import org.apache.flink.runtime.state.StreamStateHandle;
 import org.apache.flink.runtime.util.TestByteStreamStateHandleDeepCompare;
@@ -273,18 +272,17 @@ public class CheckpointTestUtils {
        private CheckpointTestUtils() {}
 
 
-       private static IncrementalKeyedStateHandle 
createDummyIncrementalKeyedStateHandle(Random rnd) {
+       public static IncrementalKeyedStateHandle 
createDummyIncrementalKeyedStateHandle(Random rnd) {
                return new IncrementalKeyedStateHandle(
                        createRandomUUID(rnd).toString(),
                        new KeyGroupRange(1, 1),
                        42L,
-                       createRandomOwnedHandleMap(rnd),
-                       createRandomReferencedHandleMap(rnd),
-                       createRandomOwnedHandleMap(rnd),
+                       createRandomStateHandleMap(rnd),
+                       createRandomStateHandleMap(rnd),
                        createDummyStreamStateHandle(rnd));
        }
 
-       private static Map<StateHandleID, StreamStateHandle> 
createRandomOwnedHandleMap(Random rnd) {
+       public static Map<StateHandleID, StreamStateHandle> 
createRandomStateHandleMap(Random rnd) {
                final int size = rnd.nextInt(4);
                Map<StateHandleID, StreamStateHandle> result = new 
HashMap<>(size);
                for (int i = 0; i < size; ++i) {
@@ -296,24 +294,13 @@ public class CheckpointTestUtils {
                return result;
        }
 
-       private static Map<StateHandleID, StreamStateHandle> 
createRandomReferencedHandleMap(Random rnd) {
-               final int size = rnd.nextInt(4);
-               Map<StateHandleID, StreamStateHandle> result = new 
HashMap<>(size);
-               for (int i = 0; i < size; ++i) {
-                       StateHandleID randomId = new 
StateHandleID(createRandomUUID(rnd).toString());
-                       result.put(randomId, new 
PlaceholderStreamStateHandle(rnd.nextInt(1024)));
-               }
-
-               return result;
-       }
-
-       private static KeyGroupsStateHandle 
createDummyKeyGroupStateHandle(Random rnd) {
+       public static KeyGroupsStateHandle 
createDummyKeyGroupStateHandle(Random rnd) {
                return new KeyGroupsStateHandle(
                        new KeyGroupRangeOffsets(1, 1, new 
long[]{rnd.nextInt(1024)}),
                        createDummyStreamStateHandle(rnd));
        }
 
-       private static StreamStateHandle createDummyStreamStateHandle(Random 
rnd) {
+       public static StreamStateHandle createDummyStreamStateHandle(Random 
rnd) {
                return new TestByteStreamStateHandleDeepCompare(
                        String.valueOf(createRandomUUID(rnd)),
                        
String.valueOf(createRandomUUID(rnd)).getBytes(ConfigConstants.DEFAULT_CHARSET));

Reply via email to