Repository: flink
Updated Branches:
  refs/heads/master b93396c5a -> 040356391


[FLINK-6640] Ensure registration of shared state happens before externalizing a 
checkpoint


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/04035639
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/04035639
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/04035639

Branch: refs/heads/master
Commit: 040356391f621858199948b15160b1e382152cc1
Parents: b93396c
Author: Stefan Richter <[email protected]>
Authored: Fri May 19 16:46:06 2017 +0200
Committer: Stefan Richter <[email protected]>
Committed: Sat May 20 15:45:45 2017 +0200

----------------------------------------------------------------------
 .../AbstractCompletedCheckpointStore.java       | 43 --------------
 .../checkpoint/CheckpointCoordinator.java       | 17 +++---
 .../runtime/checkpoint/CompletedCheckpoint.java |  6 +-
 .../checkpoint/CompletedCheckpointStore.java    |  5 +-
 .../runtime/checkpoint/PendingCheckpoint.java   |  1 -
 .../StandaloneCompletedCheckpointStore.java     | 11 ++--
 .../ZooKeeperCompletedCheckpointStore.java      | 26 ++++-----
 .../runtime/state/SharedStateRegistry.java      | 16 +++++-
 .../CheckpointCoordinatorFailureTest.java       |  3 +-
 .../CompletedCheckpointStoreTest.java           | 33 +++++++----
 .../checkpoint/CompletedCheckpointTest.java     | 14 ++---
 .../StandaloneCompletedCheckpointStoreTest.java | 21 +++----
 ...ZooKeeperCompletedCheckpointStoreITCase.java | 60 ++++++++++++--------
 .../ZooKeeperCompletedCheckpointStoreTest.java  |  4 +-
 .../RecoverableCompletedCheckpointStore.java    | 13 ++---
 .../JobManagerHACheckpointRecoveryITCase.java   | 37 +++++++-----
 16 files changed, 156 insertions(+), 154 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/04035639/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/AbstractCompletedCheckpointStore.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/AbstractCompletedCheckpointStore.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/AbstractCompletedCheckpointStore.java
deleted file mode 100644
index bf70501..0000000
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/AbstractCompletedCheckpointStore.java
+++ /dev/null
@@ -1,43 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.checkpoint;
-
-import org.apache.flink.runtime.state.SharedStateRegistry;
-
-import java.util.concurrent.Executor;
-
-/**
- * This is the base class that provides implementation of some aspects common 
for all
- * {@link CompletedCheckpointStore}s.
- */
-public abstract class AbstractCompletedCheckpointStore implements 
CompletedCheckpointStore {
-
-       /**
-        * Registry for shared states.
-        */
-       protected final SharedStateRegistry sharedStateRegistry;
-
-       public AbstractCompletedCheckpointStore() {
-               this.sharedStateRegistry = new SharedStateRegistry();
-       }
-
-       public AbstractCompletedCheckpointStore(Executor asyncIOExecutor) {
-               this.sharedStateRegistry = new 
SharedStateRegistry(asyncIOExecutor);
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/04035639/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
index 0bbf6b5..e224780 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
@@ -40,11 +40,11 @@ import org.apache.flink.runtime.jobgraph.OperatorID;
 import org.apache.flink.runtime.jobgraph.tasks.ExternalizedCheckpointSettings;
 import org.apache.flink.runtime.messages.checkpoint.AcknowledgeCheckpoint;
 import org.apache.flink.runtime.messages.checkpoint.DeclineCheckpoint;
+import org.apache.flink.runtime.state.SharedStateRegistry;
 import org.apache.flink.runtime.state.TaskStateHandles;
 import org.apache.flink.runtime.taskmanager.DispatcherThreadFactory;
 import org.apache.flink.util.Preconditions;
 import org.apache.flink.util.StringUtils;
-
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -173,6 +173,9 @@ public class CheckpointCoordinator {
        @Nullable
        private CheckpointStatsTracker statsTracker;
 
+       /** Registry that tracks state which is shared across (incremental) 
checkpoints */
+       private final SharedStateRegistry sharedStateRegistry;
+
        // 
--------------------------------------------------------------------------------------------
 
        public CheckpointCoordinator(
@@ -226,6 +229,7 @@ public class CheckpointCoordinator {
                this.completedCheckpointStore = 
checkNotNull(completedCheckpointStore);
                this.checkpointDirectory = checkpointDirectory;
                this.executor = checkNotNull(executor);
+               this.sharedStateRegistry = new SharedStateRegistry(executor);
 
                this.recentPendingCheckpoints = new 
ArrayDeque<>(NUM_GHOST_CHECKPOINT_IDS);
                this.masterHooks = new HashMap<>();
@@ -836,6 +840,10 @@ public class CheckpointCoordinator {
                final long checkpointId = pendingCheckpoint.getCheckpointId();
                final CompletedCheckpoint completedCheckpoint;
 
+               // As a first step to complete the checkpoint, we register its 
state with the registry
+               Map<OperatorID, OperatorState> operatorStates = 
pendingCheckpoint.getOperatorStates();
+               sharedStateRegistry.registerAll(operatorStates.values());
+
                try {
                        try {
                                // externalize the checkpoint if required
@@ -1003,7 +1011,7 @@ public class CheckpointCoordinator {
                        }
 
                        // Recover the checkpoints
-                       completedCheckpointStore.recover();
+                       completedCheckpointStore.recover(sharedStateRegistry);
 
                        // restore from the latest checkpoint
                        CompletedCheckpoint latest = 
completedCheckpointStore.getLatestCheckpoint();
@@ -1120,11 +1128,6 @@ public class CheckpointCoordinator {
                return completedCheckpointStore;
        }
 
-//     @VisibleForTesting
-//     SharedStateRegistry getSharedStateRegistry() {
-//             return sharedStateRegistry;
-//     }
-
        public CheckpointIDCounter getCheckpointIdCounter() {
                return checkpointIdCounter;
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/04035639/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpoint.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpoint.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpoint.java
index b382080..7c3edee 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpoint.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpoint.java
@@ -178,7 +178,7 @@ public class CompletedCheckpoint implements Serializable {
                doDiscard();
        }
 
-       public boolean discardOnSubsume(SharedStateRegistry 
sharedStateRegistry) throws Exception {
+       public boolean discardOnSubsume() throws Exception {
 
                if (props.discardOnSubsumed()) {
                        doDiscard();
@@ -188,7 +188,7 @@ public class CompletedCheckpoint implements Serializable {
                return false;
        }
 
-       public boolean discardOnShutdown(JobStatus jobStatus, 
SharedStateRegistry sharedStateRegistry) throws Exception {
+       public boolean discardOnShutdown(JobStatus jobStatus) throws Exception {
 
                if (jobStatus == JobStatus.FINISHED && 
props.discardOnJobFinished() ||
                                jobStatus == JobStatus.CANCELED && 
props.discardOnJobCancelled() ||
@@ -290,7 +290,7 @@ public class CompletedCheckpoint implements Serializable {
         *
         * @param sharedStateRegistry The registry where shared states are 
registered
         */
-       public void registerSharedStates(SharedStateRegistry 
sharedStateRegistry) {
+       public void registerSharedStatesAfterRestored(SharedStateRegistry 
sharedStateRegistry) {
                sharedStateRegistry.registerAll(operatorStates.values());
        }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/04035639/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointStore.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointStore.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointStore.java
index 82193b5..45d407e 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointStore.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointStore.java
@@ -19,6 +19,7 @@
 package org.apache.flink.runtime.checkpoint;
 
 import org.apache.flink.runtime.jobgraph.JobStatus;
+import org.apache.flink.runtime.state.SharedStateRegistry;
 
 import java.util.List;
 
@@ -32,8 +33,10 @@ public interface CompletedCheckpointStore {
         *
         * <p>After a call to this method, {@link #getLatestCheckpoint()} 
returns the latest
         * available checkpoint.
+        *
+        * @param sharedStateRegistry the shared state registry to register 
recovered states.
         */
-       void recover() throws Exception;
+       void recover(SharedStateRegistry sharedStateRegistry) throws Exception;
 
        /**
         * Adds a {@link CompletedCheckpoint} instance to the list of completed 
checkpoints.

http://git-wip-us.apache.org/repos/asf/flink/blob/04035639/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java
index 370032a..0633fec 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java
@@ -35,7 +35,6 @@ import org.apache.flink.runtime.state.StreamStateHandle;
 import org.apache.flink.runtime.state.filesystem.FileStateHandle;
 import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.Preconditions;
-
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/04035639/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StandaloneCompletedCheckpointStore.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StandaloneCompletedCheckpointStore.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StandaloneCompletedCheckpointStore.java
index 233cfc8..fbb0198 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StandaloneCompletedCheckpointStore.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StandaloneCompletedCheckpointStore.java
@@ -20,6 +20,7 @@ package org.apache.flink.runtime.checkpoint;
 
 import org.apache.flink.runtime.jobgraph.JobStatus;
 import org.apache.flink.runtime.jobmanager.HighAvailabilityMode;
+import org.apache.flink.runtime.state.SharedStateRegistry;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -32,7 +33,7 @@ import static 
org.apache.flink.util.Preconditions.checkArgument;
 /**
  * {@link CompletedCheckpointStore} for JobManagers running in {@link 
HighAvailabilityMode#NONE}.
  */
-public class StandaloneCompletedCheckpointStore extends 
AbstractCompletedCheckpointStore {
+public class StandaloneCompletedCheckpointStore implements 
CompletedCheckpointStore {
 
        private static final Logger LOG = 
LoggerFactory.getLogger(StandaloneCompletedCheckpointStore.class);
 
@@ -56,21 +57,19 @@ public class StandaloneCompletedCheckpointStore extends 
AbstractCompletedCheckpo
        }
 
        @Override
-       public void recover() throws Exception {
+       public void recover(SharedStateRegistry sharedStateRegistry) throws 
Exception {
                // Nothing to do
        }
 
        @Override
        public void addCheckpoint(CompletedCheckpoint checkpoint) throws 
Exception {
-               
-               checkpoint.registerSharedStates(sharedStateRegistry);
 
                checkpoints.addLast(checkpoint);
 
                if (checkpoints.size() > maxNumberOfCheckpointsToRetain) {
                        try {
                                CompletedCheckpoint checkpointToSubsume = 
checkpoints.removeFirst();
-                               
checkpointToSubsume.discardOnSubsume(sharedStateRegistry);
+                               checkpointToSubsume.discardOnSubsume();
                        } catch (Exception e) {
                                LOG.warn("Fail to subsume the old checkpoint.", 
e);
                        }
@@ -103,7 +102,7 @@ public class StandaloneCompletedCheckpointStore extends 
AbstractCompletedCheckpo
                        LOG.info("Shutting down");
 
                        for (CompletedCheckpoint checkpoint : checkpoints) {
-                               checkpoint.discardOnShutdown(jobStatus, 
sharedStateRegistry);
+                               checkpoint.discardOnShutdown(jobStatus);
                        }
                } finally {
                        checkpoints.clear();

http://git-wip-us.apache.org/repos/asf/flink/blob/04035639/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java
index 4c3c1ff..469c1b1 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java
@@ -64,7 +64,7 @@ import static 
org.apache.flink.util.Preconditions.checkNotNull;
  * checkpoints is consistent. Currently, after recovery we start out with only 
a single
  * checkpoint to circumvent those situations.
  */
-public class ZooKeeperCompletedCheckpointStore extends 
AbstractCompletedCheckpointStore {
+public class ZooKeeperCompletedCheckpointStore implements 
CompletedCheckpointStore {
 
        private static final Logger LOG = 
LoggerFactory.getLogger(ZooKeeperCompletedCheckpointStore.class);
 
@@ -102,8 +102,6 @@ public class ZooKeeperCompletedCheckpointStore extends 
AbstractCompletedCheckpoi
                        RetrievableStateStorageHelper<CompletedCheckpoint> 
stateStorage,
                        Executor executor) throws Exception {
 
-               super(executor);
-
                checkArgument(maxNumberOfCheckpointsToRetain >= 1, "Must retain 
at least one checkpoint.");
                checkNotNull(stateStorage, "State storage");
 
@@ -139,13 +137,14 @@ public class ZooKeeperCompletedCheckpointStore extends 
AbstractCompletedCheckpoi
         * that the history of checkpoints is consistent.
         */
        @Override
-       public void recover() throws Exception {
+       public void recover(SharedStateRegistry sharedStateRegistry) throws 
Exception {
                LOG.info("Recovering checkpoints from ZooKeeper.");
 
                // Clear local handles in order to prevent duplicates on
                // recovery. The local handles should reflect the state
                // of ZooKeeper.
                completedCheckpoints.clear();
+               sharedStateRegistry.clear();
 
                // Get all there is first
                List<Tuple2<RetrievableStateHandle<CompletedCheckpoint>, 
String>> initialCheckpoints;
@@ -171,7 +170,7 @@ public class ZooKeeperCompletedCheckpointStore extends 
AbstractCompletedCheckpoi
                                completedCheckpoint = 
retrieveCompletedCheckpoint(checkpointStateHandle);
                                if (completedCheckpoint != null) {
                                        // Re-register all shared states in the 
checkpoint.
-                                       
completedCheckpoint.registerSharedStates(sharedStateRegistry);
+                                       
completedCheckpoint.registerSharedStatesAfterRestored(sharedStateRegistry);
                                        
completedCheckpoints.add(completedCheckpoint);
                                }
                        } catch (Exception e) {
@@ -195,9 +194,6 @@ public class ZooKeeperCompletedCheckpointStore extends 
AbstractCompletedCheckpoi
                
                final String path = 
checkpointIdToPath(checkpoint.getCheckpointID());
 
-               // First, register all shared states in the checkpoint to 
consolidates placeholder.
-               checkpoint.registerSharedStates(sharedStateRegistry);
-
                // Now add the new one. If it fails, we don't want to loose 
existing data.
                checkpointsInZooKeeper.addAndLock(path, checkpoint);
 
@@ -206,7 +202,7 @@ public class ZooKeeperCompletedCheckpointStore extends 
AbstractCompletedCheckpoi
                // Everything worked, let's remove a previous checkpoint if 
necessary.
                while (completedCheckpoints.size() > 
maxNumberOfCheckpointsToRetain) {
                        try {
-                               
removeSubsumed(completedCheckpoints.removeFirst(), sharedStateRegistry);
+                               
removeSubsumed(completedCheckpoints.removeFirst());
                        } catch (Exception e) {
                                LOG.warn("Failed to subsume the old 
checkpoint", e);
                        }
@@ -248,7 +244,7 @@ public class ZooKeeperCompletedCheckpointStore extends 
AbstractCompletedCheckpoi
 
                        for (CompletedCheckpoint checkpoint : 
completedCheckpoints) {
                                try {
-                                       removeShutdown(checkpoint, jobStatus, 
sharedStateRegistry);
+                                       removeShutdown(checkpoint, jobStatus);
                                } catch (Exception e) {
                                        LOG.error("Failed to discard 
checkpoint.", e);
                                }
@@ -274,8 +270,7 @@ public class ZooKeeperCompletedCheckpointStore extends 
AbstractCompletedCheckpoi
        // 
------------------------------------------------------------------------
 
        private void removeSubsumed(
-               final CompletedCheckpoint completedCheckpoint,
-               final SharedStateRegistry sharedStateRegistry) throws Exception 
{
+               final CompletedCheckpoint completedCheckpoint) throws Exception 
{
 
                if(completedCheckpoint == null) {
                        return;
@@ -287,7 +282,7 @@ public class ZooKeeperCompletedCheckpointStore extends 
AbstractCompletedCheckpoi
                                public void apply(@Nullable 
RetrievableStateHandle<CompletedCheckpoint> value) throws FlinkException {
                                        if (value != null) {
                                                try {
-                                                       
completedCheckpoint.discardOnSubsume(sharedStateRegistry);
+                                                       
completedCheckpoint.discardOnSubsume();
                                                } catch (Exception e) {
                                                        throw new 
FlinkException("Could not discard the completed checkpoint on subsume.", e);
                                                }
@@ -302,8 +297,7 @@ public class ZooKeeperCompletedCheckpointStore extends 
AbstractCompletedCheckpoi
 
        private void removeShutdown(
                        final CompletedCheckpoint completedCheckpoint,
-                       final JobStatus jobStatus,
-                       final SharedStateRegistry sharedStateRegistry) throws 
Exception {
+                       final JobStatus jobStatus) throws Exception {
 
                if(completedCheckpoint == null) {
                        return;
@@ -313,7 +307,7 @@ public class ZooKeeperCompletedCheckpointStore extends 
AbstractCompletedCheckpoi
                        @Override
                        public void apply(@Nullable 
RetrievableStateHandle<CompletedCheckpoint> value) throws FlinkException {
                                try {
-                                       
completedCheckpoint.discardOnShutdown(jobStatus, sharedStateRegistry);
+                                       
completedCheckpoint.discardOnShutdown(jobStatus);
                                } catch (Exception e) {
                                        throw new FlinkException("Could not 
discard the completed checkpoint on subsume.", e);
                                }

http://git-wip-us.apache.org/repos/asf/flink/blob/04035639/flink-runtime/src/main/java/org/apache/flink/runtime/state/SharedStateRegistry.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/SharedStateRegistry.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/SharedStateRegistry.java
index a5e0f84..949839b 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/SharedStateRegistry.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/SharedStateRegistry.java
@@ -29,10 +29,13 @@ import java.util.Objects;
 import java.util.concurrent.Executor;
 
 /**
+ * This registry manages state that is shared across (incremental) 
checkpoints, and is responsible
+ * for deleting shared state that is no longer used in any valid checkpoint.
+ *
  * A {@code SharedStateRegistry} will be deployed in the 
- * {@link org.apache.flink.runtime.checkpoint.CompletedCheckpointStore} to
- * maintain the reference count of {@link StreamStateHandle}s which are shared
- * among different incremental checkpoints.
+ * {@link org.apache.flink.runtime.checkpoint.CheckpointCoordinator} to
+ * maintain the reference count of {@link StreamStateHandle}s by a key that 
(logically) identifies
+ * them.
  */
 public class SharedStateRegistry {
 
@@ -247,4 +250,11 @@ public class SharedStateRegistry {
                        }
                }
        }
+
+       /**
+        * Clears the registry.
+        */
+       public void clear() {
+               registeredStates.clear();
+       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/04035639/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorFailureTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorFailureTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorFailureTest.java
index 6e20be3..344b340 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorFailureTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorFailureTest.java
@@ -28,6 +28,7 @@ import 
org.apache.flink.runtime.messages.checkpoint.AcknowledgeCheckpoint;
 import org.apache.flink.runtime.state.ChainedStateHandle;
 import org.apache.flink.runtime.state.KeyedStateHandle;
 import org.apache.flink.runtime.state.OperatorStateHandle;
+import org.apache.flink.runtime.state.SharedStateRegistry;
 import org.apache.flink.runtime.state.StreamStateHandle;
 import org.apache.flink.util.TestLogger;
 import org.junit.Test;
@@ -135,7 +136,7 @@ public class CheckpointCoordinatorFailureTest extends 
TestLogger {
        private static final class FailingCompletedCheckpointStore implements 
CompletedCheckpointStore {
 
                @Override
-               public void recover() throws Exception {
+               public void recover(SharedStateRegistry sharedStateRegistry) 
throws Exception {
                        throw new UnsupportedOperationException("Not 
implemented.");
                }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/04035639/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointStoreTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointStoreTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointStoreTest.java
index fb5d7c3..1fe4e65 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointStoreTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointStoreTest.java
@@ -64,6 +64,7 @@ public abstract class CompletedCheckpointStoreTest extends 
TestLogger {
         */
        @Test
        public void testAddAndGetLatestCheckpoint() throws Exception {
+               SharedStateRegistry sharedStateRegistry = new 
SharedStateRegistry();
                CompletedCheckpointStore checkpoints = 
createCompletedCheckpoints(4);
                
                // Empty state
@@ -71,7 +72,7 @@ public abstract class CompletedCheckpointStoreTest extends 
TestLogger {
                assertEquals(0, checkpoints.getAllCheckpoints().size());
 
                TestCompletedCheckpoint[] expected = new 
TestCompletedCheckpoint[] {
-                               createCheckpoint(0), createCheckpoint(1) };
+                               createCheckpoint(0, sharedStateRegistry), 
createCheckpoint(1, sharedStateRegistry) };
 
                // Add and get latest
                checkpoints.addCheckpoint(expected[0]);
@@ -89,11 +90,12 @@ public abstract class CompletedCheckpointStoreTest extends 
TestLogger {
         */
        @Test
        public void testAddCheckpointMoreThanMaxRetained() throws Exception {
+               SharedStateRegistry sharedStateRegistry = new 
SharedStateRegistry();
                CompletedCheckpointStore checkpoints = 
createCompletedCheckpoints(1);
 
                TestCompletedCheckpoint[] expected = new 
TestCompletedCheckpoint[] {
-                               createCheckpoint(0), createCheckpoint(1),
-                               createCheckpoint(2), createCheckpoint(3)
+                               createCheckpoint(0, sharedStateRegistry), 
createCheckpoint(1, sharedStateRegistry),
+                               createCheckpoint(2, sharedStateRegistry), 
createCheckpoint(3, sharedStateRegistry)
                };
 
                // Add checkpoints
@@ -134,11 +136,12 @@ public abstract class CompletedCheckpointStoreTest 
extends TestLogger {
         */
        @Test
        public void testGetAllCheckpoints() throws Exception {
+               SharedStateRegistry sharedStateRegistry = new 
SharedStateRegistry();
                CompletedCheckpointStore checkpoints = 
createCompletedCheckpoints(4);
 
                TestCompletedCheckpoint[] expected = new 
TestCompletedCheckpoint[] {
-                               createCheckpoint(0), createCheckpoint(1),
-                               createCheckpoint(2), createCheckpoint(3)
+                               createCheckpoint(0, sharedStateRegistry), 
createCheckpoint(1, sharedStateRegistry),
+                               createCheckpoint(2, sharedStateRegistry), 
createCheckpoint(3, sharedStateRegistry)
                };
 
                for (TestCompletedCheckpoint checkpoint : expected) {
@@ -159,11 +162,12 @@ public abstract class CompletedCheckpointStoreTest 
extends TestLogger {
         */
        @Test
        public void testDiscardAllCheckpoints() throws Exception {
+               SharedStateRegistry sharedStateRegistry = new 
SharedStateRegistry();
                CompletedCheckpointStore checkpoints = 
createCompletedCheckpoints(4);
 
                TestCompletedCheckpoint[] expected = new 
TestCompletedCheckpoint[] {
-                               createCheckpoint(0), createCheckpoint(1),
-                               createCheckpoint(2), createCheckpoint(3)
+                               createCheckpoint(0, sharedStateRegistry), 
createCheckpoint(1, sharedStateRegistry),
+                               createCheckpoint(2, sharedStateRegistry), 
createCheckpoint(3, sharedStateRegistry)
                };
 
                for (TestCompletedCheckpoint checkpoint : expected) {
@@ -187,7 +191,10 @@ public abstract class CompletedCheckpointStoreTest extends 
TestLogger {
 
        // 
---------------------------------------------------------------------------------------------
 
-       protected TestCompletedCheckpoint createCheckpoint(int id) throws 
IOException {
+       protected TestCompletedCheckpoint createCheckpoint(
+               int id,
+               SharedStateRegistry sharedStateRegistry) throws IOException {
+
                int numberOfStates = 4;
                CheckpointProperties props = 
CheckpointProperties.forStandardCheckpoint();
 
@@ -204,6 +211,8 @@ public abstract class CompletedCheckpointStoreTest extends 
TestLogger {
                        operatorState.putState(i, subtaskState);
                }
 
+               operatorState.registerSharedStates(sharedStateRegistry);
+
                return new TestCompletedCheckpoint(new JobID(), id, 0, 
operatorGroupState, props);
        }
 
@@ -251,8 +260,8 @@ public abstract class CompletedCheckpointStoreTest extends 
TestLogger {
                }
 
                @Override
-               public boolean discardOnSubsume(SharedStateRegistry 
sharedStateRegistry) throws Exception {
-                       if (super.discardOnSubsume(sharedStateRegistry)) {
+               public boolean discardOnSubsume() throws Exception {
+                       if (super.discardOnSubsume()) {
                                discard();
                                return true;
                        } else {
@@ -261,8 +270,8 @@ public abstract class CompletedCheckpointStoreTest extends 
TestLogger {
                }
 
                @Override
-               public boolean discardOnShutdown(JobStatus jobStatus, 
SharedStateRegistry sharedStateRegistry) throws Exception {
-                       if (super.discardOnShutdown(jobStatus, 
sharedStateRegistry)) {
+               public boolean discardOnShutdown(JobStatus jobStatus) throws 
Exception {
+                       if (super.discardOnShutdown(jobStatus)) {
                                discard();
                                return true;
                        } else {

http://git-wip-us.apache.org/repos/asf/flink/blob/04035639/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointTest.java
index 0bbb961..4846879 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointTest.java
@@ -67,7 +67,7 @@ public class CompletedCheckpointTest {
                                new FileStateHandle(new Path(file.toURI()), 
file.length()),
                                file.getAbsolutePath());
 
-               checkpoint.discardOnShutdown(JobStatus.FAILED, new 
SharedStateRegistry());
+               checkpoint.discardOnShutdown(JobStatus.FAILED);
 
                assertEquals(false, file.exists());
        }
@@ -93,11 +93,11 @@ public class CompletedCheckpointTest {
                                null);
 
                SharedStateRegistry sharedStateRegistry = new 
SharedStateRegistry();
-               checkpoint.registerSharedStates(sharedStateRegistry);
+               
checkpoint.registerSharedStatesAfterRestored(sharedStateRegistry);
                verify(state, 
times(1)).registerSharedStates(sharedStateRegistry);
 
                // Subsume
-               checkpoint.discardOnSubsume(sharedStateRegistry);
+               checkpoint.discardOnSubsume();
 
                verify(state, times(1)).discardState();
        }
@@ -132,9 +132,9 @@ public class CompletedCheckpointTest {
                                        externalPath);
 
                        SharedStateRegistry sharedStateRegistry = new 
SharedStateRegistry();
-                       checkpoint.registerSharedStates(sharedStateRegistry);
+                       
checkpoint.registerSharedStatesAfterRestored(sharedStateRegistry);
 
-                       checkpoint.discardOnShutdown(status, 
sharedStateRegistry);
+                       checkpoint.discardOnShutdown(status);
                        verify(state, times(0)).discardState();
                        assertEquals(true, file.exists());
 
@@ -148,7 +148,7 @@ public class CompletedCheckpointTest {
                                        null,
                                        null);
 
-                       checkpoint.discardOnShutdown(status, 
sharedStateRegistry);
+                       checkpoint.discardOnShutdown(status);
                        verify(state, times(1)).discardState();
                }
        }
@@ -176,7 +176,7 @@ public class CompletedCheckpointTest {
                CompletedCheckpointStats.DiscardCallback callback = 
mock(CompletedCheckpointStats.DiscardCallback.class);
                completed.setDiscardCallback(callback);
 
-               completed.discardOnShutdown(JobStatus.FINISHED, new 
SharedStateRegistry());
+               completed.discardOnShutdown(JobStatus.FINISHED);
                verify(callback, times(1)).notifyDiscardedCheckpoint();
        }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/04035639/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/StandaloneCompletedCheckpointStoreTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/StandaloneCompletedCheckpointStoreTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/StandaloneCompletedCheckpointStoreTest.java
index be94762..6f3c60b 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/StandaloneCompletedCheckpointStoreTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/StandaloneCompletedCheckpointStoreTest.java
@@ -30,7 +30,6 @@ import java.util.List;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
-import static org.mockito.Matchers.any;
 import static org.powermock.api.mockito.PowerMockito.doReturn;
 import static org.powermock.api.mockito.PowerMockito.doThrow;
 import static org.powermock.api.mockito.PowerMockito.mock;
@@ -41,7 +40,7 @@ import static org.powermock.api.mockito.PowerMockito.mock;
 public class StandaloneCompletedCheckpointStoreTest extends 
CompletedCheckpointStoreTest {
 
        @Override
-       protected AbstractCompletedCheckpointStore createCompletedCheckpoints(
+       protected CompletedCheckpointStore createCompletedCheckpoints(
                        int maxNumberOfCheckpointsToRetain) throws Exception {
 
                return new 
StandaloneCompletedCheckpointStore(maxNumberOfCheckpointsToRetain);
@@ -52,13 +51,14 @@ public class StandaloneCompletedCheckpointStoreTest extends 
CompletedCheckpointS
         */
        @Test
        public void testShutdownDiscardsCheckpoints() throws Exception {
-               AbstractCompletedCheckpointStore store = 
createCompletedCheckpoints(1);
-               TestCompletedCheckpoint checkpoint = createCheckpoint(0);
+               SharedStateRegistry sharedStateRegistry = new 
SharedStateRegistry();
+               CompletedCheckpointStore store = createCompletedCheckpoints(1);
+               TestCompletedCheckpoint checkpoint = createCheckpoint(0, 
sharedStateRegistry);
                Collection<OperatorState> operatorStates = 
checkpoint.getOperatorStates().values();
 
                store.addCheckpoint(checkpoint);
                assertEquals(1, store.getNumberOfRetainedCheckpoints());
-               verifyCheckpointRegistered(operatorStates, 
store.sharedStateRegistry);
+               verifyCheckpointRegistered(operatorStates, sharedStateRegistry);
 
                store.shutdown(JobStatus.FINISHED);
                assertEquals(0, store.getNumberOfRetainedCheckpoints());
@@ -72,13 +72,14 @@ public class StandaloneCompletedCheckpointStoreTest extends 
CompletedCheckpointS
         */
        @Test
        public void testSuspendDiscardsCheckpoints() throws Exception {
-               AbstractCompletedCheckpointStore store = 
createCompletedCheckpoints(1);
-               TestCompletedCheckpoint checkpoint = createCheckpoint(0);
+               SharedStateRegistry sharedStateRegistry = new 
SharedStateRegistry();
+               CompletedCheckpointStore store = createCompletedCheckpoints(1);
+               TestCompletedCheckpoint checkpoint = createCheckpoint(0, 
sharedStateRegistry);
                Collection<OperatorState> taskStates = 
checkpoint.getOperatorStates().values();
 
                store.addCheckpoint(checkpoint);
                assertEquals(1, store.getNumberOfRetainedCheckpoints());
-               verifyCheckpointRegistered(taskStates, 
store.sharedStateRegistry);
+               verifyCheckpointRegistered(taskStates, sharedStateRegistry);
 
                store.shutdown(JobStatus.SUSPENDED);
                assertEquals(0, store.getNumberOfRetainedCheckpoints());
@@ -92,7 +93,7 @@ public class StandaloneCompletedCheckpointStoreTest extends 
CompletedCheckpointS
         */
        @Test
        public void testAddCheckpointWithFailedRemove() throws Exception {
-               
+
                final int numCheckpointsToRetain = 1;
                CompletedCheckpointStore store = 
createCompletedCheckpoints(numCheckpointsToRetain);
                
@@ -100,7 +101,7 @@ public class StandaloneCompletedCheckpointStoreTest extends 
CompletedCheckpointS
                        CompletedCheckpoint checkpointToAdd = 
mock(CompletedCheckpoint.class);
                        doReturn(i).when(checkpointToAdd).getCheckpointID();
                        
doReturn(Collections.emptyMap()).when(checkpointToAdd).getOperatorStates();
-                       doThrow(new 
IOException()).when(checkpointToAdd).discardOnSubsume(any(SharedStateRegistry.class));
+                       doThrow(new 
IOException()).when(checkpointToAdd).discardOnSubsume();
                        
                        try {
                                store.addCheckpoint(checkpointToAdd);

http://git-wip-us.apache.org/repos/asf/flink/blob/04035639/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreITCase.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreITCase.java
index 44c802b..81ee4f9 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreITCase.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreITCase.java
@@ -22,6 +22,7 @@ import org.apache.curator.framework.CuratorFramework;
 import org.apache.flink.runtime.concurrent.Executors;
 import org.apache.flink.runtime.jobgraph.JobStatus;
 import org.apache.flink.runtime.state.RetrievableStateHandle;
+import org.apache.flink.runtime.state.SharedStateRegistry;
 import org.apache.flink.runtime.zookeeper.RetrievableStateStorageHelper;
 import org.apache.flink.runtime.zookeeper.ZooKeeperTestEnvironment;
 import org.apache.zookeeper.data.Stat;
@@ -34,7 +35,6 @@ import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
-import java.util.Map;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import static org.junit.Assert.assertEquals;
@@ -82,10 +82,14 @@ public class ZooKeeperCompletedCheckpointStoreITCase 
extends CompletedCheckpoint
         */
        @Test
        public void testRecover() throws Exception {
-               AbstractCompletedCheckpointStore checkpoints = 
createCompletedCheckpoints(3);
 
-               TestCompletedCheckpoint[] expected = new 
TestCompletedCheckpoint[] {
-                               createCheckpoint(0), createCheckpoint(1), 
createCheckpoint(2)
+               SharedStateRegistry sharedStateRegistry = new 
SharedStateRegistry();
+               CompletedCheckpointStore checkpoints = 
createCompletedCheckpoints(3);
+
+               TestCompletedCheckpoint[] expected = new 
TestCompletedCheckpoint[]{
+                       createCheckpoint(0, sharedStateRegistry),
+                       createCheckpoint(1, sharedStateRegistry),
+                       createCheckpoint(2, sharedStateRegistry)
                };
 
                // Add multiple checkpoints
@@ -93,16 +97,17 @@ public class ZooKeeperCompletedCheckpointStoreITCase 
extends CompletedCheckpoint
                checkpoints.addCheckpoint(expected[1]);
                checkpoints.addCheckpoint(expected[2]);
 
-               
verifyCheckpointRegistered(expected[0].getOperatorStates().values(), 
checkpoints.sharedStateRegistry);
-               
verifyCheckpointRegistered(expected[1].getOperatorStates().values(), 
checkpoints.sharedStateRegistry);
-               
verifyCheckpointRegistered(expected[2].getOperatorStates().values(), 
checkpoints.sharedStateRegistry);
+               
verifyCheckpointRegistered(expected[0].getOperatorStates().values(), 
sharedStateRegistry);
+               
verifyCheckpointRegistered(expected[1].getOperatorStates().values(), 
sharedStateRegistry);
+               
verifyCheckpointRegistered(expected[2].getOperatorStates().values(), 
sharedStateRegistry);
 
                // All three should be in ZK
                assertEquals(3, 
ZOOKEEPER.getClient().getChildren().forPath(CHECKPOINT_PATH).size());
                assertEquals(3, checkpoints.getNumberOfRetainedCheckpoints());
 
                // Recover
-               checkpoints.recover();
+               sharedStateRegistry.clear();
+               checkpoints.recover(sharedStateRegistry);
 
                assertEquals(3, 
ZOOKEEPER.getClient().getChildren().forPath(CHECKPOINT_PATH).size());
                assertEquals(3, checkpoints.getNumberOfRetainedCheckpoints());
@@ -111,7 +116,7 @@ public class ZooKeeperCompletedCheckpointStoreITCase 
extends CompletedCheckpoint
                List<CompletedCheckpoint> expectedCheckpoints = new 
ArrayList<>(3);
                expectedCheckpoints.add(expected[1]);
                expectedCheckpoints.add(expected[2]);
-               expectedCheckpoints.add(createCheckpoint(3));
+               expectedCheckpoints.add(createCheckpoint(3, 
sharedStateRegistry));
 
                checkpoints.addCheckpoint(expectedCheckpoints.get(2));
 
@@ -120,7 +125,7 @@ public class ZooKeeperCompletedCheckpointStoreITCase 
extends CompletedCheckpoint
                assertEquals(expectedCheckpoints, actualCheckpoints);
 
                for (CompletedCheckpoint actualCheckpoint : actualCheckpoints) {
-                       
verifyCheckpointRegistered(actualCheckpoint.getOperatorStates().values(), 
checkpoints.sharedStateRegistry);
+                       
verifyCheckpointRegistered(actualCheckpoint.getOperatorStates().values(), 
sharedStateRegistry);
                }
        }
 
@@ -131,8 +136,9 @@ public class ZooKeeperCompletedCheckpointStoreITCase 
extends CompletedCheckpoint
        public void testShutdownDiscardsCheckpoints() throws Exception {
                CuratorFramework client = ZOOKEEPER.getClient();
 
+               SharedStateRegistry sharedStateRegistry = new 
SharedStateRegistry();
                CompletedCheckpointStore store = createCompletedCheckpoints(1);
-               TestCompletedCheckpoint checkpoint = createCheckpoint(0);
+               TestCompletedCheckpoint checkpoint = createCheckpoint(0, 
sharedStateRegistry);
 
                store.addCheckpoint(checkpoint);
                assertEquals(1, store.getNumberOfRetainedCheckpoints());
@@ -142,7 +148,8 @@ public class ZooKeeperCompletedCheckpointStoreITCase 
extends CompletedCheckpoint
                assertEquals(0, store.getNumberOfRetainedCheckpoints());
                assertNull(client.checkExists().forPath(CHECKPOINT_PATH + 
ZooKeeperCompletedCheckpointStore.checkpointIdToPath(checkpoint.getCheckpointID())));
 
-               store.recover();
+               sharedStateRegistry.clear();
+               store.recover(sharedStateRegistry);
 
                assertEquals(0, store.getNumberOfRetainedCheckpoints());
        }
@@ -156,8 +163,9 @@ public class ZooKeeperCompletedCheckpointStoreITCase 
extends CompletedCheckpoint
        public void testSuspendKeepsCheckpoints() throws Exception {
                CuratorFramework client = ZOOKEEPER.getClient();
 
+               SharedStateRegistry sharedStateRegistry = new 
SharedStateRegistry();
                CompletedCheckpointStore store = createCompletedCheckpoints(1);
-               TestCompletedCheckpoint checkpoint = createCheckpoint(0);
+               TestCompletedCheckpoint checkpoint = createCheckpoint(0, 
sharedStateRegistry);
 
                store.addCheckpoint(checkpoint);
                assertEquals(1, store.getNumberOfRetainedCheckpoints());
@@ -174,7 +182,8 @@ public class ZooKeeperCompletedCheckpointStoreITCase 
extends CompletedCheckpoint
                assertEquals("The checkpoint node should not be locked.", 0, 
stat.getNumChildren());
 
                // Recover again
-               store.recover();
+               sharedStateRegistry.clear();
+               store.recover(sharedStateRegistry);
 
                CompletedCheckpoint recovered = store.getLatestCheckpoint();
                assertEquals(checkpoint, recovered);
@@ -188,18 +197,20 @@ public class ZooKeeperCompletedCheckpointStoreITCase 
extends CompletedCheckpoint
        @Test
        public void testLatestCheckpointRecovery() throws Exception {
                final int numCheckpoints = 3;
-               AbstractCompletedCheckpointStore checkpointStore = 
createCompletedCheckpoints(numCheckpoints);
+               SharedStateRegistry sharedStateRegistry = new 
SharedStateRegistry();
+               CompletedCheckpointStore checkpointStore = 
createCompletedCheckpoints(numCheckpoints);
                List<CompletedCheckpoint> checkpoints = new 
ArrayList<>(numCheckpoints);
 
-               checkpoints.add(createCheckpoint(9));
-               checkpoints.add(createCheckpoint(10));
-               checkpoints.add(createCheckpoint(11));
+               checkpoints.add(createCheckpoint(9, sharedStateRegistry));
+               checkpoints.add(createCheckpoint(10, sharedStateRegistry));
+               checkpoints.add(createCheckpoint(11, sharedStateRegistry));
 
                for (CompletedCheckpoint checkpoint : checkpoints) {
                        checkpointStore.addCheckpoint(checkpoint);
                }
 
-               checkpointStore.recover();
+               sharedStateRegistry.clear();
+               checkpointStore.recover(sharedStateRegistry);
 
                CompletedCheckpoint latestCheckpoint = 
checkpointStore.getLatestCheckpoint();
 
@@ -220,13 +231,16 @@ public class ZooKeeperCompletedCheckpointStoreITCase 
extends CompletedCheckpoint
                ZooKeeperCompletedCheckpointStore zkCheckpointStore1 = 
createCompletedCheckpoints(numberOfCheckpoints);
                ZooKeeperCompletedCheckpointStore zkCheckpointStore2 = 
createCompletedCheckpoints(numberOfCheckpoints);
 
-               TestCompletedCheckpoint completedCheckpoint = 
createCheckpoint(1);
+               SharedStateRegistry sharedStateRegistry = new 
SharedStateRegistry();
+
+               TestCompletedCheckpoint completedCheckpoint = 
createCheckpoint(1, sharedStateRegistry);
 
                // complete the first checkpoint
                zkCheckpointStore1.addCheckpoint(completedCheckpoint);
 
                // recover the checkpoint by a different checkpoint store
-               zkCheckpointStore2.recover();
+               sharedStateRegistry.clear();
+               zkCheckpointStore2.recover(sharedStateRegistry);
 
                CompletedCheckpoint recoveredCheckpoint = 
zkCheckpointStore2.getLatestCheckpoint();
                assertTrue(recoveredCheckpoint instanceof 
TestCompletedCheckpoint);
@@ -237,7 +251,7 @@ public class ZooKeeperCompletedCheckpointStoreITCase 
extends CompletedCheckpoint
 
                // complete another checkpoint --> this should remove the first 
checkpoint from the store
                // because the number of retained checkpoints == 1
-               TestCompletedCheckpoint completedCheckpoint2 = 
createCheckpoint(2);
+               TestCompletedCheckpoint completedCheckpoint2 = 
createCheckpoint(2, sharedStateRegistry);
                zkCheckpointStore1.addCheckpoint(completedCheckpoint2);
 
                List<CompletedCheckpoint> allCheckpoints = 
zkCheckpointStore1.getAllCheckpoints();
@@ -251,7 +265,7 @@ public class ZooKeeperCompletedCheckpointStoreITCase 
extends CompletedCheckpoint
                // check that we have not discarded the first completed 
checkpoint
                assertFalse(recoveredTestCheckpoint.isDiscarded());
 
-               TestCompletedCheckpoint completedCheckpoint3 = 
createCheckpoint(3);
+               TestCompletedCheckpoint completedCheckpoint3 = 
createCheckpoint(3, sharedStateRegistry);
 
                // this should release the last lock on completedCheckoint and 
thus discard it
                zkCheckpointStore2.addCheckpoint(completedCheckpoint3);

http://git-wip-us.apache.org/repos/asf/flink/blob/04035639/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreTest.java
index 7d22d8e..23cc8c8 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreTest.java
@@ -27,6 +27,7 @@ import org.apache.curator.utils.EnsurePath;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.runtime.concurrent.Executors;
 import org.apache.flink.runtime.state.RetrievableStateHandle;
+import org.apache.flink.runtime.state.SharedStateRegistry;
 import org.apache.flink.runtime.zookeeper.RetrievableStateStorageHelper;
 import org.apache.flink.runtime.zookeeper.ZooKeeperStateHandleStore;
 import org.apache.flink.util.TestLogger;
@@ -158,7 +159,8 @@ public class ZooKeeperCompletedCheckpointStoreTest extends 
TestLogger {
                        stateStorage,
                        Executors.directExecutor());
 
-               zooKeeperCompletedCheckpointStore.recover();
+               SharedStateRegistry sharedStateRegistry = new 
SharedStateRegistry();
+               zooKeeperCompletedCheckpointStore.recover(sharedStateRegistry);
 
                CompletedCheckpoint latestCompletedCheckpoint = 
zooKeeperCompletedCheckpointStore.getLatestCheckpoint();
 

http://git-wip-us.apache.org/repos/asf/flink/blob/04035639/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/RecoverableCompletedCheckpointStore.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/RecoverableCompletedCheckpointStore.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/RecoverableCompletedCheckpointStore.java
index 2251e46..a0c4412 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/RecoverableCompletedCheckpointStore.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/RecoverableCompletedCheckpointStore.java
@@ -18,9 +18,10 @@
 
 package org.apache.flink.runtime.testutils;
 
-import org.apache.flink.runtime.checkpoint.AbstractCompletedCheckpointStore;
 import org.apache.flink.runtime.checkpoint.CompletedCheckpoint;
+import org.apache.flink.runtime.checkpoint.CompletedCheckpointStore;
 import org.apache.flink.runtime.jobgraph.JobStatus;
+import org.apache.flink.runtime.state.SharedStateRegistry;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -32,7 +33,7 @@ import java.util.List;
  * A checkpoint store, which supports shutdown and suspend. You can use this 
to test HA
  * as long as the factory always returns the same store instance.
  */
-public class RecoverableCompletedCheckpointStore extends 
AbstractCompletedCheckpointStore {
+public class RecoverableCompletedCheckpointStore implements 
CompletedCheckpointStore {
 
        private static final Logger LOG = 
LoggerFactory.getLogger(RecoverableCompletedCheckpointStore.class);
 
@@ -41,26 +42,24 @@ public class RecoverableCompletedCheckpointStore extends 
AbstractCompletedCheckp
        private final ArrayDeque<CompletedCheckpoint> suspended = new 
ArrayDeque<>(2);
 
        @Override
-       public void recover() throws Exception {
+       public void recover(SharedStateRegistry sharedStateRegistry) throws 
Exception {
                checkpoints.addAll(suspended);
                suspended.clear();
 
                for (CompletedCheckpoint checkpoint : checkpoints) {
-                       checkpoint.registerSharedStates(sharedStateRegistry);
+                       
checkpoint.registerSharedStatesAfterRestored(sharedStateRegistry);
                }
        }
 
        @Override
        public void addCheckpoint(CompletedCheckpoint checkpoint) throws 
Exception {
 
-               checkpoint.registerSharedStates(sharedStateRegistry);
-
                checkpoints.addLast(checkpoint);
 
 
                if (checkpoints.size() > 1) {
                        CompletedCheckpoint checkpointToSubsume = 
checkpoints.removeFirst();
-                       
checkpointToSubsume.discardOnSubsume(sharedStateRegistry);
+                       checkpointToSubsume.discardOnSubsume();
                }
        }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/04035639/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHACheckpointRecoveryITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHACheckpointRecoveryITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHACheckpointRecoveryITCase.java
index f9af603..33c3454 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHACheckpointRecoveryITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHACheckpointRecoveryITCase.java
@@ -62,6 +62,7 @@ import org.apache.flink.runtime.testutils.ZooKeeperTestUtils;
 import org.apache.flink.runtime.zookeeper.ZooKeeperTestEnvironment;
 import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
 import org.apache.flink.streaming.api.checkpoint.ListCheckpointed;
+import org.apache.flink.streaming.api.environment.CheckpointConfig;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.functions.sink.SinkFunction;
 import 
org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
@@ -147,6 +148,7 @@ public class JobManagerHACheckpointRecoveryITCase extends 
TestLogger {
        private static final int Parallelism = 8;
 
        private static CountDownLatch CompletedCheckpointsLatch = new 
CountDownLatch(4);
+       private static CountDownLatch CompletedCheckpointsLatch2 = new 
CountDownLatch(6);
 
        private static AtomicLongArray RecoveredStates = new 
AtomicLongArray(Parallelism);
 
@@ -171,8 +173,8 @@ public class JobManagerHACheckpointRecoveryITCase extends 
TestLogger {
                final String fileStateBackendPath = 
FileStateBackendBasePath.getAbsoluteFile().toString();
 
                Configuration config = 
ZooKeeperTestUtils.createZooKeeperHAConfig(
-                               zooKeeperQuorum,
-                               fileStateBackendPath);
+                       zooKeeperQuorum,
+                       fileStateBackendPath);
 
                config.setInteger(ConfigConstants.LOCAL_NUMBER_JOB_MANAGER, 2);
 
@@ -188,7 +190,7 @@ public class JobManagerHACheckpointRecoveryITCase extends 
TestLogger {
                try {
                        // Test actor system
                        testActorSystem = AkkaUtils.createActorSystem(new 
Configuration(),
-                                       new Some<>(new Tuple2<String, 
Object>("localhost", 0)));
+                               new Some<>(new Tuple2<String, 
Object>("localhost", 0)));
 
                        // The job managers
                        jobManagerProcess[0] = new JobManagerProcess(0, config);
@@ -204,7 +206,7 @@ public class JobManagerHACheckpointRecoveryITCase extends 
TestLogger {
 
                        // The task manager
                        taskManagerSystem = AkkaUtils.createActorSystem(
-                                       config, Option.apply(new Tuple2<String, 
Object>("localhost", 0)));
+                               config, Option.apply(new Tuple2<String, 
Object>("localhost", 0)));
                        TaskManager.startTaskManagerComponentsAndActor(
                                config,
                                ResourceID.generate(),
@@ -223,7 +225,7 @@ public class JobManagerHACheckpointRecoveryITCase extends 
TestLogger {
 
                        // Get the leader ref
                        ActorRef leaderRef = AkkaUtils.getActorRef(
-                                       leaderAddress, testActorSystem, 
testDeadline.timeLeft());
+                               leaderAddress, testActorSystem, 
testDeadline.timeLeft());
                        ActorGateway leader = new AkkaActorGateway(leaderRef, 
leaderId);
 
                        // Who's the boss?
@@ -248,10 +250,10 @@ public class JobManagerHACheckpointRecoveryITCase extends 
TestLogger {
 
                        // Wait for the job to be running
                        JobManagerActorTestUtils.waitForJobStatus(
-                                       jobGraph.getJobID(),
-                                       JobStatus.RUNNING,
-                                       leader,
-                                       testDeadline.timeLeft());
+                               jobGraph.getJobID(),
+                               JobStatus.RUNNING,
+                               leader,
+                               testDeadline.timeLeft());
 
                        // Remove all files
                        FileUtils.deleteDirectory(FileStateBackendBasePath);
@@ -268,7 +270,7 @@ public class JobManagerHACheckpointRecoveryITCase extends 
TestLogger {
 
                                if (output != null) {
                                        if (output.contains("Failed to recover 
job") &&
-                                                       
output.contains("java.io.FileNotFoundException")) {
+                                               
output.contains("java.io.FileNotFoundException")) {
 
                                                success = true;
                                                break;
@@ -352,10 +354,12 @@ public class JobManagerHACheckpointRecoveryITCase extends 
TestLogger {
 
                try {
                        Configuration config = new Configuration();
+
                        config.setInteger(CoreOptions.MAX_RETAINED_CHECKPOINTS, 
retainedCheckpoints);
                        
config.setInteger(ConfigConstants.LOCAL_NUMBER_JOB_MANAGER, numJMs);
                        
config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, numTMs);
                        
config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, numSlots);
+                       
config.setString(ConfigConstants.CHECKPOINTS_DIRECTORY_KEY, 
temporaryFolder.newFolder().toString());
 
 
                        String tmpFolderString = 
temporaryFolder.newFolder().toString();
@@ -372,6 +376,8 @@ public class JobManagerHACheckpointRecoveryITCase extends 
TestLogger {
                        StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
                        env.setParallelism(Parallelism);
                        env.enableCheckpointing(checkpointingInterval);
+                       env.getCheckpointConfig()
+                               
.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
 
                        //TODO parameterize
                        env.setStateBackend(stateBackend);
@@ -427,7 +433,7 @@ public class JobManagerHACheckpointRecoveryITCase extends 
TestLogger {
         * A checkpointed source, which emits elements from 0 to a configured 
number.
         */
        public static class CheckpointedSequenceSource extends 
RichParallelSourceFunction<Long>
-                       implements ListCheckpointed<Tuple2<Long, Integer>> {
+               implements ListCheckpointed<Tuple2<Long, Integer>> {
 
                private static final Logger LOG = 
LoggerFactory.getLogger(CheckpointedSequenceSource.class);
 
@@ -465,8 +471,7 @@ public class JobManagerHACheckpointRecoveryITCase extends 
TestLogger {
                                                --repeat;
                                                current = 0;
                                        } else {
-                                               ctx.collect(LastElement);
-                                               return;
+                                               isRunning = false;
                                        }
                                }
 
@@ -475,6 +480,11 @@ public class JobManagerHACheckpointRecoveryITCase extends 
TestLogger {
                                        Thread.sleep(50);
                                }
                        }
+
+                       CompletedCheckpointsLatch2.await();
+                       synchronized (ctx.getCheckpointLock()) {
+                               ctx.collect(LastElement);
+                       }
                }
 
                @Override
@@ -563,6 +573,7 @@ public class JobManagerHACheckpointRecoveryITCase extends 
TestLogger {
                public void notifyCheckpointComplete(long checkpointId) throws 
Exception {
                        LOG.debug("Checkpoint {} completed.", checkpointId);
                        CompletedCheckpointsLatch.countDown();
+                       CompletedCheckpointsLatch2.countDown();
                }
        }
 

Reply via email to