This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a commit to branch release-1.6
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 73e36698e61a326babe733b6b2397b3a2a2484f3
Author: Till Rohrmann <trohrm...@apache.org>
AuthorDate: Mon Aug 20 11:19:18 2018 +0200

    [FLINK-10185] Make ZooKeeperStateHandleStore#releaseAndTryRemove synchronous
    
    Remove the asynchronous callback from 
ZooKeeperStateHandleStore#releaseAndTryRemove.
    Instead we can execute the callback after having executed the 
releaseAndTryRemove
    method successfully. This separates concerns better because we don't mix 
storage
    with business logic. Furthermore, we can still avoid blocking operations if 
we use a
    separate thread to call into ZooKeeperStateHandleStore#releaseAndTryRemove.
---
 .../services/ZooKeeperMesosServices.java           |   3 +-
 .../ZooKeeperCompletedCheckpointStore.java         | 101 +++-----
 .../zookeeper/ZooKeeperHaServices.java             |   2 +-
 .../ZooKeeperSubmittedJobGraphStore.java           |   7 +-
 .../apache/flink/runtime/util/ZooKeeperUtils.java  |   8 +-
 .../zookeeper/ZooKeeperStateHandleStore.java       | 156 ++---------
 .../runtime/zookeeper/ZooKeeperUtilityFactory.java |  14 +-
 .../checkpoint/CompletedCheckpointStoreTest.java   |   9 +-
 ...KeeperCompletedCheckpointStoreMockitoTest.java} |  19 +-
 .../ZooKeeperCompletedCheckpointStoreTest.java     | 288 +++++++--------------
 .../ZooKeeperSubmittedJobGraphsStoreITCase.java    |  14 +-
 .../flink/runtime/zookeeper/ZooKeeperResource.java |  72 ++++++
 .../zookeeper/ZooKeeperStateHandleStoreTest.java   | 115 +++-----
 13 files changed, 283 insertions(+), 525 deletions(-)

diff --git 
a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/services/ZooKeeperMesosServices.java
 
b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/services/ZooKeeperMesosServices.java
index 069cb83..45d1141 100644
--- 
a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/services/ZooKeeperMesosServices.java
+++ 
b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/services/ZooKeeperMesosServices.java
@@ -56,8 +56,7 @@ public class ZooKeeperMesosServices extends 
AbstractMesosServices {
 
                ZooKeeperStateHandleStore<MesosWorkerStore.Worker> 
zooKeeperStateHandleStore = 
zooKeeperUtilityFactory.createZooKeeperStateHandleStore(
                        "/workers",
-                       stateStorageHelper,
-                       executor);
+                       stateStorageHelper);
 
                ZooKeeperSharedValue frameworkId = 
zooKeeperUtilityFactory.createSharedValue("/frameworkId", new byte[0]);
                ZooKeeperSharedCount totalTaskCount = 
zooKeeperUtilityFactory.createSharedCount("/taskCount", 0);
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java
index deb1ab3..1317339 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java
@@ -25,14 +25,13 @@ import 
org.apache.flink.runtime.state.RetrievableStateHandle;
 import org.apache.flink.runtime.zookeeper.RetrievableStateStorageHelper;
 import org.apache.flink.runtime.zookeeper.ZooKeeperStateHandleStore;
 import org.apache.flink.util.FlinkException;
+import org.apache.flink.util.function.ConsumerWithException;
 
 import org.apache.curator.framework.CuratorFramework;
 import org.apache.curator.utils.ZKPaths;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import javax.annotation.Nullable;
-
 import java.io.IOException;
 import java.util.ArrayDeque;
 import java.util.ArrayList;
@@ -86,6 +85,8 @@ public class ZooKeeperCompletedCheckpointStore implements 
CompletedCheckpointSto
         */
        private final ArrayDeque<CompletedCheckpoint> completedCheckpoints;
 
+       private final Executor executor;
+
        /**
         * Creates a {@link ZooKeeperCompletedCheckpointStore} instance.
         *
@@ -98,7 +99,7 @@ public class ZooKeeperCompletedCheckpointStore implements 
CompletedCheckpointSto
         *                                       start with a '/')
         * @param stateStorage                   State storage to be used to 
persist the completed
         *                                       checkpoint
-        * @param executor to give to the ZooKeeperStateHandleStore to run 
ZooKeeper callbacks
+        * @param executor to execute blocking calls
         * @throws Exception
         */
        public ZooKeeperCompletedCheckpointStore(
@@ -123,10 +124,12 @@ public class ZooKeeperCompletedCheckpointStore implements 
CompletedCheckpointSto
                // All operations will have the path as root
                this.client = client.usingNamespace(client.getNamespace() + 
checkpointsPath);
 
-               this.checkpointsInZooKeeper = new 
ZooKeeperStateHandleStore<>(this.client, stateStorage, executor);
+               this.checkpointsInZooKeeper = new 
ZooKeeperStateHandleStore<>(this.client, stateStorage);
 
                this.completedCheckpoints = new 
ArrayDeque<>(maxNumberOfCheckpointsToRetain + 1);
 
+               this.executor = checkNotNull(executor);
+
                LOG.info("Initialized in '{}'.", checkpointsPath);
        }
 
@@ -236,16 +239,30 @@ public class ZooKeeperCompletedCheckpointStore implements 
CompletedCheckpointSto
 
                // Everything worked, let's remove a previous checkpoint if 
necessary.
                while (completedCheckpoints.size() > 
maxNumberOfCheckpointsToRetain) {
-                       try {
-                               
removeSubsumed(completedCheckpoints.removeFirst());
-                       } catch (Exception e) {
-                               LOG.warn("Failed to subsume the old 
checkpoint", e);
-                       }
+                       final CompletedCheckpoint completedCheckpoint = 
completedCheckpoints.removeFirst();
+                       tryRemoveCompletedCheckpoint(completedCheckpoint, 
CompletedCheckpoint::discardOnSubsume);
                }
 
                LOG.debug("Added {} to {}.", checkpoint, path);
        }
 
+       private void tryRemoveCompletedCheckpoint(CompletedCheckpoint 
completedCheckpoint, ConsumerWithException<CompletedCheckpoint, Exception> 
discardCallback) {
+               try {
+                       if (tryRemove(completedCheckpoint.getCheckpointID())) {
+                               executor.execute(() -> {
+                                       try {
+                                               
discardCallback.accept(completedCheckpoint);
+                                       } catch (Exception e) {
+                                               LOG.warn("Could not discard 
completed checkpoint {}.", completedCheckpoint.getCheckpointID(), e);
+                                       }
+                               });
+
+                       }
+               } catch (Exception e) {
+                       LOG.warn("Failed to subsume the old checkpoint", e);
+               }
+       }
+
        @Override
        public CompletedCheckpoint getLatestCheckpoint() {
                if (completedCheckpoints.isEmpty()) {
@@ -278,11 +295,9 @@ public class ZooKeeperCompletedCheckpointStore implements 
CompletedCheckpointSto
                        LOG.info("Shutting down");
 
                        for (CompletedCheckpoint checkpoint : 
completedCheckpoints) {
-                               try {
-                                       removeShutdown(checkpoint, jobStatus);
-                               } catch (Exception e) {
-                                       LOG.error("Failed to discard 
checkpoint.", e);
-                               }
+                               tryRemoveCompletedCheckpoint(
+                                       checkpoint,
+                                       completedCheckpoint -> 
completedCheckpoint.discardOnShutdown(jobStatus));
                        }
 
                        completedCheckpoints.clear();
@@ -305,59 +320,13 @@ public class ZooKeeperCompletedCheckpointStore implements 
CompletedCheckpointSto
        // 
------------------------------------------------------------------------
 
        /**
-        * Removes a subsumed checkpoint from ZooKeeper and drops the state.
-        */
-       private void removeSubsumed(
-               final CompletedCheckpoint completedCheckpoint) throws Exception 
{
-
-               if (completedCheckpoint == null) {
-                       return;
-               }
-
-               ZooKeeperStateHandleStore.RemoveCallback<CompletedCheckpoint> 
action =
-                       new 
ZooKeeperStateHandleStore.RemoveCallback<CompletedCheckpoint>() {
-                               @Override
-                               public void apply(@Nullable 
RetrievableStateHandle<CompletedCheckpoint> value) throws FlinkException {
-                                       if (value != null) {
-                                               try {
-                                                       
completedCheckpoint.discardOnSubsume();
-                                               } catch (Exception e) {
-                                                       throw new 
FlinkException("Could not discard the completed checkpoint on subsume.", e);
-                                               }
-                                       }
-                               }
-                       };
-
-               checkpointsInZooKeeper.releaseAndTryRemove(
-                       
checkpointIdToPath(completedCheckpoint.getCheckpointID()),
-                       action);
-       }
-
-       /**
-        * Removes a checkpoint from ZooKeeper because of Job shutdown and 
drops the state.
+        * Tries to remove the checkpoint identified by the given checkpoint id.
+        *
+        * @param checkpointId identifying the checkpoint to remove
+        * @return true if the checkpoint could be removed
         */
-       private void removeShutdown(
-                       final CompletedCheckpoint completedCheckpoint,
-                       final JobStatus jobStatus) throws Exception {
-
-               if (completedCheckpoint == null) {
-                       return;
-               }
-
-               ZooKeeperStateHandleStore.RemoveCallback<CompletedCheckpoint> 
removeAction = new 
ZooKeeperStateHandleStore.RemoveCallback<CompletedCheckpoint>() {
-                       @Override
-                       public void apply(@Nullable 
RetrievableStateHandle<CompletedCheckpoint> value) throws FlinkException {
-                               try {
-                                       
completedCheckpoint.discardOnShutdown(jobStatus);
-                               } catch (Exception e) {
-                                       throw new FlinkException("Could not 
discard the completed checkpoint on subsume.", e);
-                               }
-                       }
-               };
-
-               checkpointsInZooKeeper.releaseAndTryRemove(
-                       
checkpointIdToPath(completedCheckpoint.getCheckpointID()),
-                       removeAction);
+       private boolean tryRemove(long checkpointId) throws Exception {
+               return 
checkpointsInZooKeeper.releaseAndTryRemove(checkpointIdToPath(checkpointId));
        }
 
        /**
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/zookeeper/ZooKeeperHaServices.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/zookeeper/ZooKeeperHaServices.java
index 3882479..ea96d7d 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/zookeeper/ZooKeeperHaServices.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/zookeeper/ZooKeeperHaServices.java
@@ -179,7 +179,7 @@ public class ZooKeeperHaServices implements 
HighAvailabilityServices {
 
        @Override
        public SubmittedJobGraphStore getSubmittedJobGraphStore() throws 
Exception {
-               return ZooKeeperUtils.createSubmittedJobGraphs(client, 
configuration, executor);
+               return ZooKeeperUtils.createSubmittedJobGraphs(client, 
configuration);
        }
 
        @Override
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/ZooKeeperSubmittedJobGraphStore.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/ZooKeeperSubmittedJobGraphStore.java
index 7ba5d48..0510815 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/ZooKeeperSubmittedJobGraphStore.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/ZooKeeperSubmittedJobGraphStore.java
@@ -41,7 +41,6 @@ import java.util.Collection;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
-import java.util.concurrent.Executor;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
 import static org.apache.flink.util.Preconditions.checkState;
@@ -99,14 +98,12 @@ public class ZooKeeperSubmittedJobGraphStore implements 
SubmittedJobGraphStore {
         * @param client ZooKeeper client
         * @param currentJobsPath ZooKeeper path for current job graphs
         * @param stateStorage State storage used to persist the submitted jobs
-        * @param executor to give to the ZooKeeperStateHandleStore to run 
ZooKeeper callbacks
         * @throws Exception
         */
        public ZooKeeperSubmittedJobGraphStore(
                        CuratorFramework client,
                        String currentJobsPath,
-                       RetrievableStateStorageHelper<SubmittedJobGraph> 
stateStorage,
-                       Executor executor) throws Exception {
+                       RetrievableStateStorageHelper<SubmittedJobGraph> 
stateStorage) throws Exception {
 
                checkNotNull(currentJobsPath, "Current jobs path");
                checkNotNull(stateStorage, "State storage");
@@ -123,7 +120,7 @@ public class ZooKeeperSubmittedJobGraphStore implements 
SubmittedJobGraphStore {
                CuratorFramework facade = 
client.usingNamespace(client.getNamespace() + currentJobsPath);
 
                this.zooKeeperFullBasePath = client.getNamespace() + 
currentJobsPath;
-               this.jobGraphsInZooKeeper = new 
ZooKeeperStateHandleStore<>(facade, stateStorage, executor);
+               this.jobGraphsInZooKeeper = new 
ZooKeeperStateHandleStore<>(facade, stateStorage);
 
                this.pathCache = new PathChildrenCache(facade, "/", false);
                pathCache.getListenable().addListener(new 
SubmittedJobGraphsPathCacheListener());
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/util/ZooKeeperUtils.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/util/ZooKeeperUtils.java
index 43c930e..d9c9161 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/util/ZooKeeperUtils.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/util/ZooKeeperUtils.java
@@ -227,14 +227,12 @@ public class ZooKeeperUtils {
         *
         * @param client        The {@link CuratorFramework} ZooKeeper client 
to use
         * @param configuration {@link Configuration} object
-        * @param executor to run ZooKeeper callbacks
         * @return {@link ZooKeeperSubmittedJobGraphStore} instance
         * @throws Exception if the submitted job graph store cannot be created
         */
        public static ZooKeeperSubmittedJobGraphStore createSubmittedJobGraphs(
                        CuratorFramework client,
-                       Configuration configuration,
-                       Executor executor) throws Exception {
+                       Configuration configuration) throws Exception {
 
                checkNotNull(configuration, "Configuration");
 
@@ -244,7 +242,9 @@ public class ZooKeeperUtils {
                String zooKeeperSubmittedJobsPath = 
configuration.getString(HighAvailabilityOptions.HA_ZOOKEEPER_JOBGRAPHS_PATH);
 
                return new ZooKeeperSubmittedJobGraphStore(
-                               client, zooKeeperSubmittedJobsPath, 
stateStorage, executor);
+                       client,
+                       zooKeeperSubmittedJobsPath,
+                       stateStorage);
        }
 
        /**
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStore.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStore.java
index e151a11..8c3d31f 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStore.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStore.java
@@ -21,14 +21,9 @@ package org.apache.flink.runtime.zookeeper;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.runtime.state.RetrievableStateHandle;
 import org.apache.flink.util.ExceptionUtils;
-import org.apache.flink.util.FlinkException;
 import org.apache.flink.util.InstantiationUtil;
-import org.apache.flink.util.Preconditions;
 
 import org.apache.curator.framework.CuratorFramework;
-import org.apache.curator.framework.api.BackgroundCallback;
-import org.apache.curator.framework.api.CuratorEvent;
-import org.apache.curator.framework.api.CuratorEventType;
 import org.apache.curator.utils.ZKPaths;
 import org.apache.zookeeper.CreateMode;
 import org.apache.zookeeper.KeeperException;
@@ -46,7 +41,6 @@ import java.util.Collections;
 import java.util.ConcurrentModificationException;
 import java.util.List;
 import java.util.UUID;
-import java.util.concurrent.Executor;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
@@ -91,8 +85,6 @@ public class ZooKeeperStateHandleStore<T extends 
Serializable> {
 
        private final RetrievableStateStorageHelper<T> storage;
 
-       private final Executor executor;
-
        /** Lock node name of this ZooKeeperStateHandleStore. The name should 
be unique among all other state handle stores. */
        private final String lockNode;
 
@@ -105,16 +97,13 @@ public class ZooKeeperStateHandleStore<T extends 
Serializable> {
         *                            instance, e.g. 
<code>client.usingNamespace("/stateHandles")</code>
         * @param storage to persist the actual state and whose returned state 
handle is then written
         *                to ZooKeeper
-        * @param executor to run the ZooKeeper callbacks
         */
        public ZooKeeperStateHandleStore(
                CuratorFramework client,
-               RetrievableStateStorageHelper<T> storage,
-               Executor executor) {
+               RetrievableStateStorageHelper<T> storage) {
 
                this.client = checkNotNull(client, "Curator client");
                this.storage = checkNotNull(storage, "State storage");
-               this.executor = checkNotNull(executor);
 
                // Generate a unique lock node name
                lockNode = UUID.randomUUID().toString();
@@ -395,33 +384,14 @@ public class ZooKeeperStateHandleStore<T extends 
Serializable> {
 
        /**
         * Releases the lock for the given state node and tries to remove the 
state node if it is no longer locked.
-        * The deletion of the state node is executed asynchronously.
-        *
-        * <p><strong>Important</strong>: This also discards the stored state 
handle after the given action
-        * has been executed.
-        *
-        * @param pathInZooKeeper Path of state handle to remove (expected to 
start with a '/')
-        * @throws Exception If the ZooKeeper operation fails
-        */
-       public void releaseAndTryRemove(String pathInZooKeeper) throws 
Exception {
-               releaseAndTryRemove(pathInZooKeeper, null);
-       }
-
-       /**
-        * Releases the lock for the given state node and tries to remove the 
state node if it is no longer locked.
-        * The deletion of the state node is executed asynchronously. After the 
state node has been deleted, the given
-        * callback is called with the {@link RetrievableStateHandle} of the 
deleted state node.
-        *
-        * <p><strong>Important</strong>: This also discards the stored state 
handle after the given action
-        * has been executed.
+        * It returns the {@link RetrievableStateHandle} stored under the given 
state node if any.
         *
         * @param pathInZooKeeper Path of state handle to remove
-        * @param callback The callback to execute after a successful deletion. 
Null if no action needs to be executed.
-        * @throws Exception If the ZooKeeper operation fails
+        * @return True if the state handle could be released
+        * @throws Exception If the ZooKeeper operation or discarding the state 
handle fails
         */
-       public void releaseAndTryRemove(
-                       String pathInZooKeeper,
-                       @Nullable final RemoveCallback<T> callback) throws 
Exception {
+       @Nullable
+       public boolean releaseAndTryRemove(String pathInZooKeeper) throws 
Exception {
                checkNotNull(pathInZooKeeper, "Path in ZooKeeper");
 
                final String path = normalizePath(pathInZooKeeper);
@@ -431,14 +401,23 @@ public class ZooKeeperStateHandleStore<T extends 
Serializable> {
                try {
                        stateHandle = get(path, false);
                } catch (Exception e) {
-                       LOG.warn("Could not retrieve the state handle from node 
" + path + '.', e);
+                       LOG.warn("Could not retrieve the state handle from node 
{}.", path, e);
                }
 
                release(pathInZooKeeper);
 
-               final BackgroundCallback backgroundCallback = new 
RemoveBackgroundCallback<>(stateHandle, callback, path);
+               try {
+                       client.delete().forPath(path);
+               } catch (KeeperException.NotEmptyException ignored) {
+                       LOG.debug("Could not delete znode {} because it is 
still locked.", path);
+                       return false;
+               }
+
+               if (stateHandle != null) {
+                       stateHandle.discardState();
+               }
 
-               client.delete().inBackground(backgroundCallback, 
executor).forPath(path);
+               return true;
        }
 
        /**
@@ -597,103 +576,4 @@ public class ZooKeeperStateHandleStore<T extends 
Serializable> {
                        return '/' + path;
                }
        }
-
-       // 
---------------------------------------------------------------------------------------------------------
-       // Utility classes
-       // 
---------------------------------------------------------------------------------------------------------
-
-       /**
-        * Callback which is executed when removing a node from ZooKeeper. The 
callback will call the given
-        * {@link RemoveCallback} if it is not null. Afterwards, it will 
discard the given {@link RetrievableStateHandle}
-        * if it is not null.
-        *
-        * @param <T> Type of the value stored in the RetrievableStateHandle
-        */
-       private static final class RemoveBackgroundCallback<T extends 
Serializable> implements BackgroundCallback {
-               @Nullable
-               private final RetrievableStateHandle<T> stateHandle;
-
-               @Nullable
-               private final RemoveCallback<T> callback;
-
-               private final String pathInZooKeeper;
-
-               private RemoveBackgroundCallback(
-                       @Nullable RetrievableStateHandle<T> stateHandle,
-                       @Nullable RemoveCallback<T> callback,
-                       String pathInZooKeeper) {
-
-                       this.stateHandle = stateHandle;
-                       this.callback = callback;
-                       this.pathInZooKeeper = 
Preconditions.checkNotNull(pathInZooKeeper);
-               }
-
-               @Override
-               public void processResult(CuratorFramework client, CuratorEvent 
event) throws Exception {
-                       try {
-                               if (event.getType() == CuratorEventType.DELETE) 
{
-                                       final KeeperException.Code resultCode = 
KeeperException.Code.get(event.getResultCode());
-
-                                       if (resultCode == 
KeeperException.Code.OK) {
-                                               Exception exception = null;
-
-                                               if (null != callback) {
-                                                       try {
-                                                               
callback.apply(stateHandle);
-                                                       } catch (Throwable e) {
-                                                               exception = new 
Exception("Could not execute delete action for node " +
-                                                                       
pathInZooKeeper + '.', e);
-                                                       }
-                                               }
-
-                                               if (stateHandle != null) {
-                                                       try {
-                                                               // Discard the 
state handle
-                                                               
stateHandle.discardState();
-                                                       } catch (Throwable e) {
-                                                               Exception 
newException = new Exception("Could not discard state handle of node " +
-                                                                       
pathInZooKeeper + '.', e);
-
-                                                               if (exception 
== null) {
-                                                                       
exception = newException;
-                                                               } else {
-                                                                       
exception.addSuppressed(newException);
-                                                               }
-                                                       }
-                                               }
-
-                                               if (exception != null) {
-                                                       throw exception;
-                                               }
-                                       } else if (resultCode == 
KeeperException.Code.NOTEMPTY) {
-                                               // Could not delete the node 
because it still contains children/locks
-                                               LOG.debug("Could not delete 
node " + pathInZooKeeper + " because it is still locked.");
-                                       } else {
-                                               throw new 
IllegalStateException("Unexpected result code " +
-                                                       resultCode.name() + " 
in '" + event + "' callback.");
-                                       }
-                               } else {
-                                       throw new 
IllegalStateException("Unexpected event type " +
-                                               event.getType() + " in '" + 
event + "' callback.");
-                               }
-                       } catch (Exception e) {
-                               LOG.warn("Failed to run callback for delete 
operation on node " + pathInZooKeeper + '.', e);
-                       }
-
-               }
-       }
-
-       /**
-        * Callback interface for remove calls.
-        */
-       public interface RemoveCallback<T extends Serializable> {
-               /**
-                * Callback method. The parameter can be null if the {@link 
RetrievableStateHandle} could not be retrieved
-                * from ZooKeeper.
-                *
-                * @param value RetrievableStateHandle retrieved from 
ZooKeeper, null if it was not retrievable
-                * @throws FlinkException If the callback failed
-                */
-               void apply(@Nullable RetrievableStateHandle<T> value) throws 
FlinkException;
-       }
 }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/ZooKeeperUtilityFactory.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/ZooKeeperUtilityFactory.java
index d3b7dc5..3e294e0 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/ZooKeeperUtilityFactory.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/ZooKeeperUtilityFactory.java
@@ -18,15 +18,15 @@
 
 package org.apache.flink.runtime.zookeeper;
 
-import org.apache.curator.framework.CuratorFramework;
-import org.apache.curator.framework.recipes.shared.SharedCount;
-import org.apache.curator.framework.recipes.shared.SharedValue;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.util.ZooKeeperUtils;
 import org.apache.flink.util.Preconditions;
 
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.recipes.shared.SharedCount;
+import org.apache.curator.framework.recipes.shared.SharedValue;
+
 import java.io.Serializable;
-import java.util.concurrent.Executor;
 
 /**
  * Creates ZooKeeper utility classes without exposing the {@link 
CuratorFramework} dependency. The
@@ -71,7 +71,6 @@ public class ZooKeeperUtilityFactory {
         *
         * @param zkStateHandleStorePath specifying the path in ZooKeeper to 
store the state handles to
         * @param stateStorageHelper storing the actual state data
-        * @param executor to run asynchronous callbacks of the state handle 
store
         * @param <T> Type of the state to be stored
         * @return a ZooKeeperStateHandleStore instance
         * @throws Exception if ZooKeeper could not create the provided state 
handle store path in
@@ -79,8 +78,7 @@ public class ZooKeeperUtilityFactory {
         */
        public <T extends Serializable> ZooKeeperStateHandleStore<T> 
createZooKeeperStateHandleStore(
                        String zkStateHandleStorePath,
-                       RetrievableStateStorageHelper<T> stateStorageHelper,
-                       Executor executor) throws Exception {
+                       RetrievableStateStorageHelper<T> stateStorageHelper) 
throws Exception {
 
                
facade.newNamespaceAwareEnsurePath(zkStateHandleStorePath).ensure(facade.getZookeeperClient());
                CuratorFramework stateHandleStoreFacade = facade.usingNamespace(
@@ -88,7 +86,7 @@ public class ZooKeeperUtilityFactory {
                                facade.getNamespace(),
                                zkStateHandleStorePath));
 
-               return new ZooKeeperStateHandleStore<>(stateHandleStoreFacade, 
stateStorageHelper, executor);
+               return new ZooKeeperStateHandleStore<>(stateHandleStoreFacade, 
stateStorageHelper);
        }
 
        /**
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointStoreTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointStoreTest.java
index 8156964..c4d8903 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointStoreTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointStoreTest.java
@@ -193,7 +193,7 @@ public abstract class CompletedCheckpointStoreTest extends 
TestLogger {
 
        // 
---------------------------------------------------------------------------------------------
 
-       protected TestCompletedCheckpoint createCheckpoint(
+       public static TestCompletedCheckpoint createCheckpoint(
                int id,
                SharedStateRegistry sharedStateRegistry) throws IOException {
 
@@ -226,7 +226,12 @@ public abstract class CompletedCheckpointStoreTest extends 
TestLogger {
                }
        }
 
-       protected void verifyCheckpointDiscarded(Collection<OperatorState> 
operatorStates) {
+       public static void verifyCheckpointDiscarded(TestCompletedCheckpoint 
completedCheckpoint) {
+               assertTrue(completedCheckpoint.isDiscarded());
+               
verifyCheckpointDiscarded(completedCheckpoint.getOperatorStates().values());
+       }
+
+       protected static void 
verifyCheckpointDiscarded(Collection<OperatorState> operatorStates) {
                for (OperatorState operatorState : operatorStates) {
                        for (OperatorSubtaskState subtaskState : 
operatorState.getStates()) {
                                
Assert.assertTrue(((TestOperatorSubtaskState)subtaskState).discarded);
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreMockitoTest.java
similarity index 95%
copy from 
flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreTest.java
copy to 
flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreMockitoTest.java
index 0384733..1f7d369 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreMockitoTest.java
@@ -66,20 +66,11 @@ import static 
org.powermock.api.mockito.PowerMockito.doThrow;
 import static org.powermock.api.mockito.PowerMockito.whenNew;
 
 /**
- * Tests for {@link ZooKeeperCompletedCheckpointStore}.
+ * Mockito based tests for the {@link ZooKeeperStateHandleStore}.
  */
 @RunWith(PowerMockRunner.class)
 @PrepareForTest(ZooKeeperCompletedCheckpointStore.class)
-public class ZooKeeperCompletedCheckpointStoreTest extends TestLogger {
-
-       @Test
-       public void testPathConversion() {
-               final long checkpointId = 42L;
-
-               final String path = 
ZooKeeperCompletedCheckpointStore.checkpointIdToPath(checkpointId);
-
-               assertEquals(checkpointId, 
ZooKeeperCompletedCheckpointStore.pathToCheckpointId(path));
-       }
+public class ZooKeeperCompletedCheckpointStoreMockitoTest extends TestLogger {
 
        /**
         * Tests that the completed checkpoint store can retrieve all 
checkpoints stored in ZooKeeper
@@ -133,7 +124,7 @@ public class ZooKeeperCompletedCheckpointStoreTest extends 
TestLogger {
                final CuratorFramework client = mock(CuratorFramework.class, 
Mockito.RETURNS_DEEP_STUBS);
                final RetrievableStateStorageHelper<CompletedCheckpoint> 
storageHelperMock = mock(RetrievableStateStorageHelper.class);
 
-               ZooKeeperStateHandleStore<CompletedCheckpoint> 
zooKeeperStateHandleStoreMock = spy(new ZooKeeperStateHandleStore<>(client, 
storageHelperMock, Executors.directExecutor()));
+               ZooKeeperStateHandleStore<CompletedCheckpoint> 
zooKeeperStateHandleStoreMock = spy(new ZooKeeperStateHandleStore<>(client, 
storageHelperMock));
                
whenNew(ZooKeeperStateHandleStore.class).withAnyArguments().thenReturn(zooKeeperStateHandleStoreMock);
                
doReturn(checkpointsInZooKeeper).when(zooKeeperStateHandleStoreMock).getAllSortedByNameAndLock();
 
@@ -221,7 +212,7 @@ public class ZooKeeperCompletedCheckpointStoreTest extends 
TestLogger {
                final RetrievableStateStorageHelper<CompletedCheckpoint> 
storageHelperMock = mock(RetrievableStateStorageHelper.class);
 
                ZooKeeperStateHandleStore<CompletedCheckpoint> 
zookeeperStateHandleStoreMock =
-                       spy(new ZooKeeperStateHandleStore<>(client, 
storageHelperMock, Executors.directExecutor()));
+                       spy(new ZooKeeperStateHandleStore<>(client, 
storageHelperMock));
                
whenNew(ZooKeeperStateHandleStore.class).withAnyArguments().thenReturn(zookeeperStateHandleStoreMock);
 
                doAnswer(new 
Answer<RetrievableStateHandle<CompletedCheckpoint>>() {
@@ -236,7 +227,7 @@ public class ZooKeeperCompletedCheckpointStoreTest extends 
TestLogger {
                        }
                }).when(zookeeperStateHandleStoreMock).addAndLock(anyString(), 
any(CompletedCheckpoint.class));
 
-               doThrow(new 
Exception()).when(zookeeperStateHandleStoreMock).releaseAndTryRemove(anyString(),
 any(ZooKeeperStateHandleStore.RemoveCallback.class));
+               doThrow(new 
Exception()).when(zookeeperStateHandleStoreMock).releaseAndTryRemove(anyString());
 
                final int numCheckpointsToRetain = 1;
                final String checkpointsPath = "foobar";
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreTest.java
index 0384733..f992d3b 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreTest.java
@@ -18,60 +18,39 @@
 
 package org.apache.flink.runtime.checkpoint;
 
-import org.apache.flink.api.common.JobID;
-import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.HighAvailabilityOptions;
 import org.apache.flink.runtime.concurrent.Executors;
+import org.apache.flink.runtime.jobgraph.JobStatus;
 import org.apache.flink.runtime.state.RetrievableStateHandle;
-import 
org.apache.flink.runtime.state.testutils.TestCompletedCheckpointStorageLocation;
+import org.apache.flink.runtime.state.SharedStateRegistry;
+import org.apache.flink.runtime.util.ZooKeeperUtils;
 import org.apache.flink.runtime.zookeeper.RetrievableStateStorageHelper;
-import org.apache.flink.runtime.zookeeper.ZooKeeperStateHandleStore;
+import org.apache.flink.runtime.zookeeper.ZooKeeperResource;
 import org.apache.flink.util.TestLogger;
 
 import org.apache.curator.framework.CuratorFramework;
-import org.apache.curator.framework.api.BackgroundCallback;
-import org.apache.curator.framework.api.CuratorEvent;
-import org.apache.curator.framework.api.CuratorEventType;
-import org.apache.curator.framework.api.ErrorListenerPathable;
-import org.apache.curator.utils.EnsurePath;
+import org.hamcrest.Matchers;
+import org.junit.ClassRule;
 import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.mockito.Mockito;
-import org.mockito.invocation.InvocationOnMock;
-import org.mockito.stubbing.Answer;
-import org.powermock.core.classloader.annotations.PrepareForTest;
-import org.powermock.modules.junit4.PowerMockRunner;
+
+import javax.annotation.Nonnull;
 
 import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
+import java.io.Serializable;
 import java.util.List;
-import java.util.concurrent.Executor;
 
 import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-import static org.mockito.Matchers.any;
-import static org.mockito.Matchers.anyString;
-import static org.mockito.Mockito.doReturn;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.never;
-import static org.mockito.Mockito.spy;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
-import static org.powermock.api.mockito.PowerMockito.doAnswer;
-import static org.powermock.api.mockito.PowerMockito.doThrow;
-import static org.powermock.api.mockito.PowerMockito.whenNew;
+import static org.junit.Assert.assertThat;
 
 /**
  * Tests for {@link ZooKeeperCompletedCheckpointStore}.
  */
-@RunWith(PowerMockRunner.class)
-@PrepareForTest(ZooKeeperCompletedCheckpointStore.class)
 public class ZooKeeperCompletedCheckpointStoreTest extends TestLogger {
 
+       @ClassRule
+       public static ZooKeeperResource zooKeeperResource = new 
ZooKeeperResource();
+
        @Test
        public void testPathConversion() {
                final long checkpointId = 42L;
@@ -82,188 +61,103 @@ public class ZooKeeperCompletedCheckpointStoreTest 
extends TestLogger {
        }
 
        /**
-        * Tests that the completed checkpoint store can retrieve all 
checkpoints stored in ZooKeeper
-        * and ignores those which cannot be retrieved via their state handles.
-        *
-        * <p>We have a timeout in case the ZooKeeper store get's into a 
deadlock/livelock situation.
+        * Tests that subsumed checkpoints are discarded.
         */
-       @Test(timeout = 50000)
-       public void testCheckpointRecovery() throws Exception {
-               final JobID jobID = new JobID();
-               final long checkpoint1Id = 1L;
-               final long checkpoint2Id = 2;
-               final List<Tuple2<RetrievableStateHandle<CompletedCheckpoint>, 
String>> checkpointsInZooKeeper = new ArrayList<>(4);
-
-               final Collection<Long> expectedCheckpointIds = new HashSet<>(2);
-               expectedCheckpointIds.add(1L);
-               expectedCheckpointIds.add(2L);
-
-               final RetrievableStateHandle<CompletedCheckpoint> 
failingRetrievableStateHandle = mock(RetrievableStateHandle.class);
-               
when(failingRetrievableStateHandle.retrieveState()).thenThrow(new 
IOException("Test exception"));
-
-               final RetrievableStateHandle<CompletedCheckpoint> 
retrievableStateHandle1 = mock(RetrievableStateHandle.class);
-               when(retrievableStateHandle1.retrieveState()).then(
-                       (invocation) -> new CompletedCheckpoint(
-                               jobID,
-                               checkpoint1Id,
-                               1L,
-                               1L,
-                               new HashMap<>(),
-                               null,
-                               
CheckpointProperties.forCheckpoint(CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION),
-                               new TestCompletedCheckpointStorageLocation()));
-
-               final RetrievableStateHandle<CompletedCheckpoint> 
retrievableStateHandle2 = mock(RetrievableStateHandle.class);
-               when(retrievableStateHandle2.retrieveState()).then(
-                       (invocation -> new CompletedCheckpoint(
-                               jobID,
-                               checkpoint2Id,
-                               2L,
-                               2L,
-                               new HashMap<>(),
-                               null,
-                               
CheckpointProperties.forCheckpoint(CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION),
-                               new TestCompletedCheckpointStorageLocation())));
-
-               checkpointsInZooKeeper.add(Tuple2.of(retrievableStateHandle1, 
"/foobar1"));
-               
checkpointsInZooKeeper.add(Tuple2.of(failingRetrievableStateHandle, 
"/failing1"));
-               checkpointsInZooKeeper.add(Tuple2.of(retrievableStateHandle2, 
"/foobar2"));
-               
checkpointsInZooKeeper.add(Tuple2.of(failingRetrievableStateHandle, 
"/failing2"));
-
-               final CuratorFramework client = mock(CuratorFramework.class, 
Mockito.RETURNS_DEEP_STUBS);
-               final RetrievableStateStorageHelper<CompletedCheckpoint> 
storageHelperMock = mock(RetrievableStateStorageHelper.class);
-
-               ZooKeeperStateHandleStore<CompletedCheckpoint> 
zooKeeperStateHandleStoreMock = spy(new ZooKeeperStateHandleStore<>(client, 
storageHelperMock, Executors.directExecutor()));
-               
whenNew(ZooKeeperStateHandleStore.class).withAnyArguments().thenReturn(zooKeeperStateHandleStoreMock);
-               
doReturn(checkpointsInZooKeeper).when(zooKeeperStateHandleStoreMock).getAllSortedByNameAndLock();
-
-               final int numCheckpointsToRetain = 1;
-
-               // Mocking for the delete operation on the CuratorFramework 
client
-               // It assures that the callback is executed synchronously
-
-               final EnsurePath ensurePathMock = mock(EnsurePath.class);
-               final CuratorEvent curatorEventMock = mock(CuratorEvent.class);
-               
when(curatorEventMock.getType()).thenReturn(CuratorEventType.DELETE);
-               when(curatorEventMock.getResultCode()).thenReturn(0);
-               
when(client.newNamespaceAwareEnsurePath(anyString())).thenReturn(ensurePathMock);
-
-               when(
-                       client
-                               .delete()
-                               .inBackground(any(BackgroundCallback.class), 
any(Executor.class))
-               ).thenAnswer(new Answer<ErrorListenerPathable<Void>>() {
-                       @Override
-                       public ErrorListenerPathable<Void> 
answer(InvocationOnMock invocation) throws Throwable {
-                               final BackgroundCallback callback = 
(BackgroundCallback) invocation.getArguments()[0];
+       @Test
+       public void testDiscardingSubsumedCheckpoints() throws Exception {
+               final SharedStateRegistry sharedStateRegistry = new 
SharedStateRegistry();
+               final Configuration configuration = new Configuration();
+               
configuration.setString(HighAvailabilityOptions.HA_ZOOKEEPER_QUORUM, 
zooKeeperResource.getConnectString());
+
+               final CuratorFramework client = 
ZooKeeperUtils.startCuratorFramework(configuration);
+               final ZooKeeperCompletedCheckpointStore checkpointStore = 
createZooKeeperCheckpointStore(client);
+
+               try {
+                       final 
CompletedCheckpointStoreTest.TestCompletedCheckpoint checkpoint1 = 
CompletedCheckpointStoreTest.createCheckpoint(0, sharedStateRegistry);
+
+                       checkpointStore.addCheckpoint(checkpoint1);
+                       assertThat(checkpointStore.getAllCheckpoints(), 
Matchers.contains(checkpoint1));
+
+                       final 
CompletedCheckpointStoreTest.TestCompletedCheckpoint checkpoint2 = 
CompletedCheckpointStoreTest.createCheckpoint(1, sharedStateRegistry);
+                       checkpointStore.addCheckpoint(checkpoint2);
+                       final List<CompletedCheckpoint> allCheckpoints = 
checkpointStore.getAllCheckpoints();
+                       assertThat(allCheckpoints, 
Matchers.contains(checkpoint2));
+                       assertThat(allCheckpoints, 
Matchers.not(Matchers.contains(checkpoint1)));
+
+                       // verify that the subsumed checkpoint is discarded
+                       
CompletedCheckpointStoreTest.verifyCheckpointDiscarded(checkpoint1);
+               } finally {
+                       client.close();
+               }
+       }
 
-                               ErrorListenerPathable<Void> result = 
mock(ErrorListenerPathable.class);
+       /**
+        * Tests that checkpoints are discarded when the completed checkpoint 
store is shut
+        * down with a globally terminal state.
+        */
+       @Test
+       public void testDiscardingCheckpointsAtShutDown() throws Exception {
+               final SharedStateRegistry sharedStateRegistry = new 
SharedStateRegistry();
+               final Configuration configuration = new Configuration();
+               
configuration.setString(HighAvailabilityOptions.HA_ZOOKEEPER_QUORUM, 
zooKeeperResource.getConnectString());
 
-                               
when(result.forPath(anyString())).thenAnswer(new Answer<Void>() {
-                                       @Override
-                                       public Void answer(InvocationOnMock 
invocation) throws Throwable {
+               final CuratorFramework client = 
ZooKeeperUtils.startCuratorFramework(configuration);
+               final ZooKeeperCompletedCheckpointStore checkpointStore = 
createZooKeeperCheckpointStore(client);
 
-                                               callback.processResult(client, 
curatorEventMock);
+               try {
+                       final 
CompletedCheckpointStoreTest.TestCompletedCheckpoint checkpoint1 = 
CompletedCheckpointStoreTest.createCheckpoint(0, sharedStateRegistry);
 
-                                               return null;
-                                       }
-                               });
+                       checkpointStore.addCheckpoint(checkpoint1);
+                       assertThat(checkpointStore.getAllCheckpoints(), 
Matchers.contains(checkpoint1));
 
-                               return result;
-                       }
-               });
+                       checkpointStore.shutdown(JobStatus.FINISHED);
 
-               final String checkpointsPath = "foobar";
-               final RetrievableStateStorageHelper<CompletedCheckpoint> 
stateStorage = mock(RetrievableStateStorageHelper.class);
+                       // verify that the checkpoint is discarded
+                       
CompletedCheckpointStoreTest.verifyCheckpointDiscarded(checkpoint1);
+               } finally {
+                       client.close();
+               }
+       }
 
-               ZooKeeperCompletedCheckpointStore 
zooKeeperCompletedCheckpointStore = new ZooKeeperCompletedCheckpointStore(
-                       numCheckpointsToRetain,
+       @Nonnull
+       private ZooKeeperCompletedCheckpointStore 
createZooKeeperCheckpointStore(CuratorFramework client) throws Exception {
+               return new ZooKeeperCompletedCheckpointStore(
+                       1,
                        client,
-                       checkpointsPath,
-                       stateStorage,
+                       "/checkpoints",
+                       new TestingRetrievableStateStorageHelper<>(),
                        Executors.directExecutor());
+       }
 
-               zooKeeperCompletedCheckpointStore.recover();
-
-               CompletedCheckpoint latestCompletedCheckpoint = 
zooKeeperCompletedCheckpointStore.getLatestCheckpoint();
-
-               // check that we return the latest retrievable checkpoint
-               // this should remove the latest checkpoint because it is broken
-               assertEquals(checkpoint2Id, 
latestCompletedCheckpoint.getCheckpointID());
-
-               // this should remove the second broken checkpoint because 
we're iterating over all checkpoints
-               List<CompletedCheckpoint> completedCheckpoints = 
zooKeeperCompletedCheckpointStore.getAllCheckpoints();
-
-               Collection<Long> actualCheckpointIds = new 
HashSet<>(completedCheckpoints.size());
-
-               for (CompletedCheckpoint completedCheckpoint : 
completedCheckpoints) {
-                       
actualCheckpointIds.add(completedCheckpoint.getCheckpointID());
+       private static final class TestingRetrievableStateStorageHelper<T 
extends Serializable> implements RetrievableStateStorageHelper<T> {
+               @Override
+               public RetrievableStateHandle<T> store(T state) {
+                       return new TestingRetrievableStateHandle<>(state);
                }
 
-               assertEquals(expectedCheckpointIds, actualCheckpointIds);
-
-               // check that we did not discard any of the state handles
-               verify(retrievableStateHandle1, never()).discardState();
-               verify(retrievableStateHandle2, never()).discardState();
+               private static class TestingRetrievableStateHandle<T extends 
Serializable> implements RetrievableStateHandle<T> {
 
-               // Make sure that we also didn't discard any of the broken 
handles. Only when checkpoints
-               // are subsumed should they be discarded.
-               verify(failingRetrievableStateHandle, never()).discardState();
-       }
+                       private static final long serialVersionUID = 
137053380713794300L;
 
-       /**
-        * Tests that the checkpoint does not exist in the store when we fail 
to add
-        * it into the store (i.e., there exists an exception thrown by the 
method).
-        */
-       @Test
-       public void testAddCheckpointWithFailedRemove() throws Exception {
-               final CuratorFramework client = mock(CuratorFramework.class, 
Mockito.RETURNS_DEEP_STUBS);
-               final RetrievableStateStorageHelper<CompletedCheckpoint> 
storageHelperMock = mock(RetrievableStateStorageHelper.class);
+                       private final T state;
 
-               ZooKeeperStateHandleStore<CompletedCheckpoint> 
zookeeperStateHandleStoreMock =
-                       spy(new ZooKeeperStateHandleStore<>(client, 
storageHelperMock, Executors.directExecutor()));
-               
whenNew(ZooKeeperStateHandleStore.class).withAnyArguments().thenReturn(zookeeperStateHandleStoreMock);
+                       private TestingRetrievableStateHandle(T state) {
+                               this.state = state;
+                       }
 
-               doAnswer(new 
Answer<RetrievableStateHandle<CompletedCheckpoint>>() {
                        @Override
-                       public RetrievableStateHandle<CompletedCheckpoint> 
answer(InvocationOnMock invocationOnMock) throws Throwable {
-                               CompletedCheckpoint checkpoint = 
(CompletedCheckpoint) invocationOnMock.getArguments()[1];
-
-                               RetrievableStateHandle<CompletedCheckpoint> 
retrievableStateHandle = mock(RetrievableStateHandle.class);
-                               
when(retrievableStateHandle.retrieveState()).thenReturn(checkpoint);
-
-                               return retrievableStateHandle;
+                       public T retrieveState() throws IOException, 
ClassNotFoundException {
+                               return state;
                        }
-               }).when(zookeeperStateHandleStoreMock).addAndLock(anyString(), 
any(CompletedCheckpoint.class));
-
-               doThrow(new 
Exception()).when(zookeeperStateHandleStoreMock).releaseAndTryRemove(anyString(),
 any(ZooKeeperStateHandleStore.RemoveCallback.class));
 
-               final int numCheckpointsToRetain = 1;
-               final String checkpointsPath = "foobar";
-               final RetrievableStateStorageHelper<CompletedCheckpoint> 
stateSotrage = mock(RetrievableStateStorageHelper.class);
-
-               ZooKeeperCompletedCheckpointStore 
zooKeeperCompletedCheckpointStore = new ZooKeeperCompletedCheckpointStore(
-                       numCheckpointsToRetain,
-                       client,
-                       checkpointsPath,
-                       stateSotrage,
-                       Executors.directExecutor());
+                       @Override
+                       public void discardState() throws Exception {
+                               // no op
+                       }
 
-               for (long i = 0; i <= numCheckpointsToRetain; ++i) {
-                       CompletedCheckpoint checkpointToAdd = 
mock(CompletedCheckpoint.class);
-                       doReturn(i).when(checkpointToAdd).getCheckpointID();
-                       
doReturn(Collections.emptyMap()).when(checkpointToAdd).getOperatorStates();
-
-                       try {
-                               
zooKeeperCompletedCheckpointStore.addCheckpoint(checkpointToAdd);
-
-                               // The checkpoint should be in the store if we 
successfully add it into the store.
-                               List<CompletedCheckpoint> addedCheckpoints = 
zooKeeperCompletedCheckpointStore.getAllCheckpoints();
-                               
assertTrue(addedCheckpoints.contains(checkpointToAdd));
-                       } catch (Exception e) {
-                               // The checkpoint should not be in the store if 
any exception is thrown.
-                               List<CompletedCheckpoint> addedCheckpoints = 
zooKeeperCompletedCheckpointStore.getAllCheckpoints();
-                               
assertFalse(addedCheckpoints.contains(checkpointToAdd));
+                       @Override
+                       public long getStateSize() {
+                               return 0;
                        }
                }
        }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/ZooKeeperSubmittedJobGraphsStoreITCase.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/ZooKeeperSubmittedJobGraphsStoreITCase.java
index c1a7b53..e9be145 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/ZooKeeperSubmittedJobGraphsStoreITCase.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/ZooKeeperSubmittedJobGraphsStoreITCase.java
@@ -20,7 +20,6 @@ package org.apache.flink.runtime.jobmanager;
 
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.runtime.akka.ListeningBehaviour;
-import org.apache.flink.runtime.concurrent.Executors;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobgraph.JobVertex;
 import 
org.apache.flink.runtime.jobmanager.SubmittedJobGraphStore.SubmittedJobGraphListener;
@@ -90,8 +89,7 @@ public class ZooKeeperSubmittedJobGraphsStoreITCase extends 
TestLogger {
                ZooKeeperSubmittedJobGraphStore jobGraphs = new 
ZooKeeperSubmittedJobGraphStore(
                        ZooKeeper.createClient(),
                        "/testPutAndRemoveJobGraph",
-                       localStateStorage,
-                       Executors.directExecutor());
+                       localStateStorage);
 
                try {
                        SubmittedJobGraphListener listener = 
mock(SubmittedJobGraphListener.class);
@@ -147,7 +145,7 @@ public class ZooKeeperSubmittedJobGraphsStoreITCase extends 
TestLogger {
        @Test
        public void testRecoverJobGraphs() throws Exception {
                ZooKeeperSubmittedJobGraphStore jobGraphs = new 
ZooKeeperSubmittedJobGraphStore(
-                               ZooKeeper.createClient(), 
"/testRecoverJobGraphs", localStateStorage, Executors.directExecutor());
+                               ZooKeeper.createClient(), 
"/testRecoverJobGraphs", localStateStorage);
 
                try {
                        SubmittedJobGraphListener listener = 
mock(SubmittedJobGraphListener.class);
@@ -198,10 +196,10 @@ public class ZooKeeperSubmittedJobGraphsStoreITCase 
extends TestLogger {
 
                try {
                        jobGraphs = new ZooKeeperSubmittedJobGraphStore(
-                                       ZooKeeper.createClient(), 
"/testConcurrentAddJobGraph", localStateStorage, Executors.directExecutor());
+                                       ZooKeeper.createClient(), 
"/testConcurrentAddJobGraph", localStateStorage);
 
                        otherJobGraphs = new ZooKeeperSubmittedJobGraphStore(
-                                       ZooKeeper.createClient(), 
"/testConcurrentAddJobGraph", localStateStorage, Executors.directExecutor());
+                                       ZooKeeper.createClient(), 
"/testConcurrentAddJobGraph", localStateStorage);
 
 
                        SubmittedJobGraph jobGraph = 
createSubmittedJobGraph(new JobID(), 0);
@@ -257,10 +255,10 @@ public class ZooKeeperSubmittedJobGraphsStoreITCase 
extends TestLogger {
        @Test(expected = IllegalStateException.class)
        public void testUpdateJobGraphYouDidNotGetOrAdd() throws Exception {
                ZooKeeperSubmittedJobGraphStore jobGraphs = new 
ZooKeeperSubmittedJobGraphStore(
-                               ZooKeeper.createClient(), 
"/testUpdateJobGraphYouDidNotGetOrAdd", localStateStorage, 
Executors.directExecutor());
+                               ZooKeeper.createClient(), 
"/testUpdateJobGraphYouDidNotGetOrAdd", localStateStorage);
 
                ZooKeeperSubmittedJobGraphStore otherJobGraphs = new 
ZooKeeperSubmittedJobGraphStore(
-                               ZooKeeper.createClient(), 
"/testUpdateJobGraphYouDidNotGetOrAdd", localStateStorage, 
Executors.directExecutor());
+                               ZooKeeper.createClient(), 
"/testUpdateJobGraphYouDidNotGetOrAdd", localStateStorage);
 
                jobGraphs.start(null);
                otherJobGraphs.start(null);
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/zookeeper/ZooKeeperResource.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/zookeeper/ZooKeeperResource.java
new file mode 100644
index 0000000..c4c5694
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/zookeeper/ZooKeeperResource.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.zookeeper;
+
+import org.apache.flink.util.Preconditions;
+
+import org.apache.curator.test.TestingServer;
+import org.junit.rules.ExternalResource;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+
+/**
+ * {@link ExternalResource} which starts a {@link 
org.apache.zookeeper.server.ZooKeeperServer}.
+ */
+public class ZooKeeperResource extends ExternalResource {
+
+       private static final Logger LOG = 
LoggerFactory.getLogger(ZooKeeperResource.class);
+
+       @Nullable
+       private TestingServer zooKeeperServer;
+
+       public String getConnectString() {
+               verifyIsRunning();
+               return zooKeeperServer.getConnectString();
+       }
+
+       private void verifyIsRunning() {
+               Preconditions.checkState(zooKeeperServer != null);
+       }
+
+       @Override
+       protected void before() throws Throwable {
+               terminateZooKeeperServer();
+               zooKeeperServer = new TestingServer(true);
+       }
+
+       private void terminateZooKeeperServer() throws IOException {
+               if (zooKeeperServer != null) {
+                       zooKeeperServer.stop();
+                       zooKeeperServer = null;
+               }
+       }
+
+       @Override
+       protected void after() {
+               try {
+                       terminateZooKeeperServer();
+               } catch (IOException e) {
+                       LOG.warn("Could not properly terminate the {}.", 
getClass().getSimpleName(), e);
+               }
+       }
+}
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStoreTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStoreTest.java
index fd39b25..2dd27e7 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStoreTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStoreTest.java
@@ -18,21 +18,19 @@
 
 package org.apache.flink.runtime.zookeeper;
 
-import org.apache.curator.framework.CuratorFramework;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.HighAvailabilityOptions;
-import org.apache.flink.runtime.concurrent.Executors;
 import org.apache.flink.runtime.state.RetrievableStateHandle;
 import org.apache.flink.runtime.util.ZooKeeperUtils;
 import org.apache.flink.util.InstantiationUtil;
 import org.apache.flink.util.TestLogger;
+
+import org.apache.curator.framework.CuratorFramework;
 import org.apache.zookeeper.data.Stat;
 import org.junit.AfterClass;
 import org.junit.Before;
 import org.junit.Test;
-import org.mockito.invocation.InvocationOnMock;
-import org.mockito.stubbing.Answer;
 
 import java.io.IOException;
 import java.util.ArrayList;
@@ -41,7 +39,6 @@ import java.util.Collection;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
-import java.util.concurrent.CountDownLatch;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotEquals;
@@ -49,12 +46,7 @@ import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
-import static org.mockito.Matchers.any;
-import static org.mockito.Mockito.doAnswer;
-import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.spy;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
 /**
@@ -88,8 +80,8 @@ public class ZooKeeperStateHandleStoreTest extends TestLogger 
{
        @Test
        public void testAddAndLock() throws Exception {
                LongStateStorage longStateStorage = new LongStateStorage();
-               ZooKeeperStateHandleStore<Long> store = new 
ZooKeeperStateHandleStore<Long>(
-                               ZOOKEEPER.getClient(), longStateStorage, 
Executors.directExecutor());
+               ZooKeeperStateHandleStore<Long> store = new 
ZooKeeperStateHandleStore<>(
+                       ZOOKEEPER.getClient(), longStateStorage);
 
                // Config
                final String pathInZooKeeper = "/testAdd";
@@ -136,7 +128,7 @@ public class ZooKeeperStateHandleStoreTest extends 
TestLogger {
                LongStateStorage stateHandleProvider = new LongStateStorage();
 
                ZooKeeperStateHandleStore<Long> store = new 
ZooKeeperStateHandleStore<>(
-                               ZOOKEEPER.getClient(), stateHandleProvider, 
Executors.directExecutor());
+                               ZOOKEEPER.getClient(), stateHandleProvider);
 
                
ZOOKEEPER.getClient().create().forPath("/testAddAlreadyExistingPath");
 
@@ -161,7 +153,7 @@ public class ZooKeeperStateHandleStoreTest extends 
TestLogger {
                when(client.inTransaction().create()).thenThrow(new 
RuntimeException("Expected test Exception."));
 
                ZooKeeperStateHandleStore<Long> store = new 
ZooKeeperStateHandleStore<>(
-                               client, stateHandleProvider, 
Executors.directExecutor());
+                               client, stateHandleProvider);
 
                // Config
                final String pathInZooKeeper = 
"/testAddDiscardStateHandleAfterFailure";
@@ -191,7 +183,7 @@ public class ZooKeeperStateHandleStoreTest extends 
TestLogger {
                LongStateStorage stateHandleProvider = new LongStateStorage();
 
                ZooKeeperStateHandleStore<Long> store = new 
ZooKeeperStateHandleStore<>(
-                               ZOOKEEPER.getClient(), stateHandleProvider, 
Executors.directExecutor());
+                               ZOOKEEPER.getClient(), stateHandleProvider);
 
                // Config
                final String pathInZooKeeper = "/testReplace";
@@ -230,7 +222,7 @@ public class ZooKeeperStateHandleStoreTest extends 
TestLogger {
                RetrievableStateStorageHelper<Long> stateStorage = new 
LongStateStorage();
 
                ZooKeeperStateHandleStore<Long> store = new 
ZooKeeperStateHandleStore<>(
-                               ZOOKEEPER.getClient(), stateStorage, 
Executors.directExecutor());
+                               ZOOKEEPER.getClient(), stateStorage);
 
                store.replace("/testReplaceNonExistingPath", 0, 1L);
        }
@@ -247,7 +239,7 @@ public class ZooKeeperStateHandleStoreTest extends 
TestLogger {
                when(client.setData()).thenThrow(new RuntimeException("Expected 
test Exception."));
 
                ZooKeeperStateHandleStore<Long> store = new 
ZooKeeperStateHandleStore<>(
-                               client, stateHandleProvider, 
Executors.directExecutor());
+                               client, stateHandleProvider);
 
                // Config
                final String pathInZooKeeper = 
"/testReplaceDiscardStateHandleAfterFailure";
@@ -289,7 +281,7 @@ public class ZooKeeperStateHandleStoreTest extends 
TestLogger {
                LongStateStorage stateHandleProvider = new LongStateStorage();
 
                ZooKeeperStateHandleStore<Long> store = new 
ZooKeeperStateHandleStore<>(
-                               ZOOKEEPER.getClient(), stateHandleProvider, 
Executors.directExecutor());
+                               ZOOKEEPER.getClient(), stateHandleProvider);
 
                // Config
                final String pathInZooKeeper = "/testGetAndExists";
@@ -314,7 +306,7 @@ public class ZooKeeperStateHandleStoreTest extends 
TestLogger {
                LongStateStorage stateHandleProvider = new LongStateStorage();
 
                ZooKeeperStateHandleStore<Long> store = new 
ZooKeeperStateHandleStore<>(
-                               ZOOKEEPER.getClient(), stateHandleProvider, 
Executors.directExecutor());
+                               ZOOKEEPER.getClient(), stateHandleProvider);
 
                store.getAndLock("/testGetNonExistingPath");
        }
@@ -328,7 +320,7 @@ public class ZooKeeperStateHandleStoreTest extends 
TestLogger {
                LongStateStorage stateHandleProvider = new LongStateStorage();
 
                ZooKeeperStateHandleStore<Long> store = new 
ZooKeeperStateHandleStore<>(
-                               ZOOKEEPER.getClient(), stateHandleProvider, 
Executors.directExecutor());
+                               ZOOKEEPER.getClient(), stateHandleProvider);
 
                // Config
                final String pathInZooKeeper = "/testGetAll";
@@ -359,7 +351,7 @@ public class ZooKeeperStateHandleStoreTest extends 
TestLogger {
                LongStateStorage stateHandleProvider = new LongStateStorage();
 
                ZooKeeperStateHandleStore<Long> store = new 
ZooKeeperStateHandleStore<>(
-                               ZOOKEEPER.getClient(), stateHandleProvider, 
Executors.directExecutor());
+                               ZOOKEEPER.getClient(), stateHandleProvider);
 
                // Config
                final String basePath = "/testGetAllSortedByName";
@@ -393,7 +385,7 @@ public class ZooKeeperStateHandleStoreTest extends 
TestLogger {
                LongStateStorage stateHandleProvider = new LongStateStorage();
 
                ZooKeeperStateHandleStore<Long> store = new 
ZooKeeperStateHandleStore<>(
-                               ZOOKEEPER.getClient(), stateHandleProvider, 
Executors.directExecutor());
+                               ZOOKEEPER.getClient(), stateHandleProvider);
 
                // Config
                final String pathInZooKeeper = "/testRemove";
@@ -401,50 +393,14 @@ public class ZooKeeperStateHandleStoreTest extends 
TestLogger {
 
                store.addAndLock(pathInZooKeeper, state);
 
+               final int numberOfGlobalDiscardCalls = 
LongRetrievableStateHandle.getNumberOfGlobalDiscardCalls();
+
                // Test
                store.releaseAndTryRemove(pathInZooKeeper);
 
                // Verify discarded
                assertEquals(0, 
ZOOKEEPER.getClient().getChildren().forPath("/").size());
-       }
-
-       /**
-        * Tests that state handles are correctly removed with a callback.
-        */
-       @Test
-       public void testRemoveWithCallback() throws Exception {
-               // Setup
-               LongStateStorage stateHandleProvider = new LongStateStorage();
-
-               ZooKeeperStateHandleStore<Long> store = new 
ZooKeeperStateHandleStore<>(
-                               ZOOKEEPER.getClient(), stateHandleProvider, 
Executors.directExecutor());
-
-               // Config
-               final String pathInZooKeeper = "/testRemoveWithCallback";
-               final Long state = 27255442L;
-
-               store.addAndLock(pathInZooKeeper, state);
-
-               final CountDownLatch sync = new CountDownLatch(1);
-               ZooKeeperStateHandleStore.RemoveCallback<Long> callback = 
mock(ZooKeeperStateHandleStore.RemoveCallback.class);
-               doAnswer(new Answer<Void>() {
-                       @Override
-                       public Void answer(InvocationOnMock invocation) throws 
Throwable {
-                               sync.countDown();
-                               return null;
-                       }
-               }).when(callback).apply(any(RetrievableStateHandle.class));
-
-               // Test
-               store.releaseAndTryRemove(pathInZooKeeper, callback);
-
-               // Verify discarded and callback called
-               assertEquals(0, 
ZOOKEEPER.getClient().getChildren().forPath("/").size());
-
-               sync.await();
-
-               verify(callback, times(1))
-                               .apply(any(RetrievableStateHandle.class));
+               assertEquals(numberOfGlobalDiscardCalls + 1, 
LongRetrievableStateHandle.getNumberOfGlobalDiscardCalls());
        }
 
        /** Tests that all state handles are correctly discarded. */
@@ -454,7 +410,7 @@ public class ZooKeeperStateHandleStoreTest extends 
TestLogger {
                LongStateStorage stateHandleProvider = new LongStateStorage();
 
                ZooKeeperStateHandleStore<Long> store = new 
ZooKeeperStateHandleStore<>(
-                               ZOOKEEPER.getClient(), stateHandleProvider, 
Executors.directExecutor());
+                               ZOOKEEPER.getClient(), stateHandleProvider);
 
                // Config
                final String pathInZooKeeper = "/testDiscardAll";
@@ -486,8 +442,7 @@ public class ZooKeeperStateHandleStoreTest extends 
TestLogger {
 
                ZooKeeperStateHandleStore<Long> store = new 
ZooKeeperStateHandleStore<>(
                        ZOOKEEPER.getClient(),
-                       stateStorage,
-                       Executors.directExecutor());
+                       stateStorage);
 
                final Collection<Long> input = new HashSet<>();
                input.add(1L);
@@ -543,13 +498,11 @@ public class ZooKeeperStateHandleStoreTest extends 
TestLogger {
 
                ZooKeeperStateHandleStore<Long> zkStore1 = new 
ZooKeeperStateHandleStore<>(
                        ZOOKEEPER.getClient(),
-                       longStateStorage,
-                       Executors.directExecutor());
+                       longStateStorage);
 
                ZooKeeperStateHandleStore<Long> zkStore2 = new 
ZooKeeperStateHandleStore<>(
                        ZOOKEEPER.getClient(),
-                       longStateStorage,
-                       Executors.directExecutor());
+                       longStateStorage);
 
                final String statePath = "/state";
 
@@ -586,13 +539,11 @@ public class ZooKeeperStateHandleStoreTest extends 
TestLogger {
 
                ZooKeeperStateHandleStore<Long> zkStore1 = new 
ZooKeeperStateHandleStore<>(
                        ZOOKEEPER.getClient(),
-                       longStateStorage,
-                       Executors.directExecutor());
+                       longStateStorage);
 
                ZooKeeperStateHandleStore<Long> zkStore2 = new 
ZooKeeperStateHandleStore<>(
                        ZOOKEEPER.getClient(),
-                       longStateStorage,
-                       Executors.directExecutor());
+                       longStateStorage);
 
                final String path = "/state";
 
@@ -649,8 +600,7 @@ public class ZooKeeperStateHandleStoreTest extends 
TestLogger {
 
                        ZooKeeperStateHandleStore<Long> zkStore = new 
ZooKeeperStateHandleStore<>(
                                client,
-                               longStateStorage,
-                               Executors.directExecutor());
+                               longStateStorage);
 
                        final String path = "/state";
 
@@ -682,8 +632,7 @@ public class ZooKeeperStateHandleStoreTest extends 
TestLogger {
 
                ZooKeeperStateHandleStore<Long> zkStore = new 
ZooKeeperStateHandleStore<>(
                        ZOOKEEPER.getClient(),
-                       longStateStorage,
-                       Executors.directExecutor());
+                       longStateStorage);
 
                final String path = "/state";
 
@@ -720,8 +669,7 @@ public class ZooKeeperStateHandleStoreTest extends 
TestLogger {
 
                ZooKeeperStateHandleStore<Long> zkStore = new 
ZooKeeperStateHandleStore<>(
                        ZOOKEEPER.getClient(),
-                       longStateStorage,
-                       Executors.directExecutor());
+                       longStateStorage);
 
                final Collection<String> paths = Arrays.asList("/state1", 
"/state2", "/state3");
 
@@ -775,9 +723,11 @@ public class ZooKeeperStateHandleStoreTest extends 
TestLogger {
 
                private static final long serialVersionUID = 
-3555329254423838912L;
 
+               private static int numberOfGlobalDiscardCalls = 0;
+
                private final Long state;
 
-               private int numberOfDiscardCalls;
+               private int numberOfDiscardCalls = 0;
 
                public LongRetrievableStateHandle(Long state) {
                        this.state = state;
@@ -790,6 +740,7 @@ public class ZooKeeperStateHandleStoreTest extends 
TestLogger {
 
                @Override
                public void discardState() throws Exception {
+                       numberOfGlobalDiscardCalls++;
                        numberOfDiscardCalls++;
                }
 
@@ -798,8 +749,12 @@ public class ZooKeeperStateHandleStoreTest extends 
TestLogger {
                        return 0;
                }
 
-               public int getNumberOfDiscardCalls() {
+               int getNumberOfDiscardCalls() {
                        return numberOfDiscardCalls;
                }
+
+               public static int getNumberOfGlobalDiscardCalls() {
+                       return numberOfGlobalDiscardCalls;
+               }
        }
 }

Reply via email to