[FLINK-6612] Allow ZooKeeperStateHandleStore to lock created ZNodes

In order to guard against deletions of ZooKeeper nodes which are still being 
used
by a different ZooKeeperStateHandleStore, we have to introduce a locking 
mechanism.
Only after all ZooKeeperStateHandleStores have released their lock, the ZNode is
allowed to be deleted.

THe locking mechanism is implemented via ephemeral child nodes of the respective
ZooKeeper node. Whenever a ZooKeeperStateHandleStore wants to lock a ZNode, 
thus,
protecting it from being deleted, it creates an ephemeral child node. The node's
name is unique to the ZooKeeperStateHandleStore instance. The delete operations
will then only delete the node if it does not have any children associated.

In order to guard against oprhaned lock nodes, they are created as ephemeral 
nodes.
This means that they will be deleted by ZooKeeper once the connection of the
ZooKeeper client which created the node timed out.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/f58fec70
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/f58fec70
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/f58fec70

Branch: refs/heads/release-1.3
Commit: f58fec70fef12056bd58b6cc2985532ccb07625e
Parents: 0963718
Author: Till Rohrmann <trohrm...@apache.org>
Authored: Wed May 17 14:52:04 2017 +0200
Committer: Stefan Richter <s.rich...@data-artisans.com>
Committed: Fri May 19 11:00:07 2017 +0200

----------------------------------------------------------------------
 .../store/ZooKeeperMesosWorkerStore.java        |   8 +-
 .../ZooKeeperCompletedCheckpointStore.java      | 150 ++--
 .../ZooKeeperSubmittedJobGraphStore.java        |  50 +-
 .../zookeeper/ZooKeeperStateHandleStore.java    | 419 +++++++---
 .../CompletedCheckpointStoreTest.java           |   9 +
 ...ZooKeeperCompletedCheckpointStoreITCase.java | 133 ++-
 .../ZooKeeperCompletedCheckpointStoreTest.java  |  11 +-
 .../ZooKeeperStateHandleStoreITCase.java        | 642 ---------------
 .../ZooKeeperStateHandleStoreTest.java          | 805 +++++++++++++++++++
 9 files changed, 1345 insertions(+), 882 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/f58fec70/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/store/ZooKeeperMesosWorkerStore.java
----------------------------------------------------------------------
diff --git 
a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/store/ZooKeeperMesosWorkerStore.java
 
b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/store/ZooKeeperMesosWorkerStore.java
index 42abd4c..663ce56 100644
--- 
a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/store/ZooKeeperMesosWorkerStore.java
+++ 
b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/store/ZooKeeperMesosWorkerStore.java
@@ -88,7 +88,7 @@ public class ZooKeeperMesosWorkerStore implements 
MesosWorkerStore {
                                totalTaskCountInZooKeeper.close();
 
                                if(cleanup) {
-                                       
workersInZooKeeper.removeAndDiscardAllState();
+                                       
workersInZooKeeper.releaseAndTryRemoveAll();
                                }
 
                                isRunning = false;
@@ -169,7 +169,7 @@ public class ZooKeeperMesosWorkerStore implements 
MesosWorkerStore {
                synchronized (startStopLock) {
                        verifyIsRunning();
 
-                       List<Tuple2<RetrievableStateHandle<Worker>, String>> 
handles = workersInZooKeeper.getAll();
+                       List<Tuple2<RetrievableStateHandle<Worker>, String>> 
handles = workersInZooKeeper.getAllAndLock();
 
                        if(handles.size() != 0) {
                                List<MesosWorkerStore.Worker> workers = new 
ArrayList<>(handles.size());
@@ -199,7 +199,7 @@ public class ZooKeeperMesosWorkerStore implements 
MesosWorkerStore {
                        int currentVersion = workersInZooKeeper.exists(path);
                        if (currentVersion == -1) {
                                try {
-                                       workersInZooKeeper.add(path, worker);
+                                       workersInZooKeeper.addAndLock(path, 
worker);
                                        LOG.debug("Added {} in ZooKeeper.", 
worker);
                                } catch (KeeperException.NodeExistsException 
ex) {
                                        throw new 
ConcurrentModificationException("ZooKeeper unexpectedly modified", ex);
@@ -227,7 +227,7 @@ public class ZooKeeperMesosWorkerStore implements 
MesosWorkerStore {
                                return false;
                        }
 
-                       workersInZooKeeper.removeAndDiscardState(path);
+                       workersInZooKeeper.releaseAndTryRemove(path);
                        LOG.debug("Removed worker {} from ZooKeeper.", taskID);
                        return true;
                }

http://git-wip-us.apache.org/repos/asf/flink/blob/f58fec70/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java
index 95cfb0f..084d93e 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java
@@ -19,9 +19,6 @@
 package org.apache.flink.runtime.checkpoint;
 
 import org.apache.curator.framework.CuratorFramework;
-import org.apache.curator.framework.api.BackgroundCallback;
-import org.apache.curator.framework.api.CuratorEvent;
-import org.apache.curator.framework.api.CuratorEventType;
 import org.apache.curator.utils.ZKPaths;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.runtime.jobgraph.JobStatus;
@@ -34,12 +31,12 @@ import org.apache.flink.util.FlinkException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import javax.annotation.Nullable;
 import java.util.ArrayDeque;
 import java.util.ArrayList;
 import java.util.ConcurrentModificationException;
 import java.util.Iterator;
 import java.util.List;
-import java.util.concurrent.Callable;
 import java.util.concurrent.Executor;
 
 import static org.apache.flink.util.Preconditions.checkArgument;
@@ -155,7 +152,7 @@ public class ZooKeeperCompletedCheckpointStore extends 
AbstractCompletedCheckpoi
                List<Tuple2<RetrievableStateHandle<CompletedCheckpoint>, 
String>> initialCheckpoints;
                while (true) {
                        try {
-                               initialCheckpoints = 
checkpointsInZooKeeper.getAllSortedByName();
+                               initialCheckpoints = 
checkpointsInZooKeeper.getAllSortedByNameAndLock();
                                break;
                        }
                        catch (ConcurrentModificationException e) {
@@ -178,7 +175,7 @@ public class ZooKeeperCompletedCheckpointStore extends 
AbstractCompletedCheckpoi
                                        "checkpoint store.", e);
 
                                // remove the checkpoint with broken state 
handle
-                               removeBrokenStateHandle(checkpointStateHandle);
+                               
removeBrokenStateHandle(checkpointStateHandle.f1, checkpointStateHandle.f0);
                        }
 
                        if (completedCheckpoint != null) {
@@ -201,7 +198,7 @@ public class ZooKeeperCompletedCheckpointStore extends 
AbstractCompletedCheckpoi
                final RetrievableStateHandle<CompletedCheckpoint> stateHandle;
 
                // First add the new one. If it fails, we don't want to loose 
existing data.
-               stateHandle = checkpointsInZooKeeper.add(path, checkpoint);
+               stateHandle = checkpointsInZooKeeper.addAndLock(path, 
checkpoint);
 
                checkpointStateHandles.addLast(new Tuple2<>(stateHandle, path));
 
@@ -211,7 +208,7 @@ public class ZooKeeperCompletedCheckpointStore extends 
AbstractCompletedCheckpoi
                // Everything worked, let's remove a previous checkpoint if 
necessary.
                while (checkpointStateHandles.size() > 
maxNumberOfCheckpointsToRetain) {
                        try {
-                               
removeSubsumed(checkpointStateHandles.removeFirst(), sharedStateRegistry);
+                               
removeSubsumed(checkpointStateHandles.removeFirst().f1, sharedStateRegistry);
                        } catch (Exception e) {
                                LOG.warn("Failed to subsume the old 
checkpoint", e);
                        }
@@ -237,7 +234,8 @@ public class ZooKeeperCompletedCheckpointStore extends 
AbstractCompletedCheckpoi
 
                                        try {
                                                // remove the checkpoint with 
broken state handle
-                                               
removeBrokenStateHandle(checkpointStateHandles.pollLast());
+                                               
Tuple2<RetrievableStateHandle<CompletedCheckpoint>, String> checkpoint = 
checkpointStateHandles.pollLast();
+                                               
removeBrokenStateHandle(checkpoint.f1, checkpoint.f0);
                                        } catch (Exception removeException) {
                                                LOG.warn("Could not remove the 
latest checkpoint with a broken state handle.", removeException);
                                        }
@@ -265,7 +263,7 @@ public class ZooKeeperCompletedCheckpointStore extends 
AbstractCompletedCheckpoi
 
                                // remove the checkpoint with broken state 
handle
                                stateHandleIterator.remove();
-                               removeBrokenStateHandle(stateHandlePath);
+                               removeBrokenStateHandle(stateHandlePath.f1, 
stateHandlePath.f0);
                        }
                }
 
@@ -289,7 +287,7 @@ public class ZooKeeperCompletedCheckpointStore extends 
AbstractCompletedCheckpoi
 
                        for 
(Tuple2<RetrievableStateHandle<CompletedCheckpoint>, String> checkpoint : 
checkpointStateHandles) {
                                try {
-                                       removeShutdown(checkpoint, jobStatus, 
sharedStateRegistry);
+                                       removeShutdown(checkpoint.f1, 
jobStatus, sharedStateRegistry);
                                } catch (Exception e) {
                                        LOG.error("Failed to discard 
checkpoint.", e);
                                }
@@ -306,117 +304,87 @@ public class ZooKeeperCompletedCheckpointStore extends 
AbstractCompletedCheckpoi
 
                        // Clear the local handles, but don't remove any state
                        checkpointStateHandles.clear();
+
+                       // Release the state handle locks in ZooKeeper such 
that they can be deleted
+                       checkpointsInZooKeeper.releaseAll();
                }
        }
 
        // 
------------------------------------------------------------------------
 
        private void removeSubsumed(
-               final Tuple2<RetrievableStateHandle<CompletedCheckpoint>, 
String> stateHandleAndPath,
+               final String pathInZooKeeper,
                final SharedStateRegistry sharedStateRegistry) throws Exception 
{
                
-               Callable<Void> action = new Callable<Void>() {
+               ZooKeeperStateHandleStore.RemoveCallback<CompletedCheckpoint> 
action = new ZooKeeperStateHandleStore.RemoveCallback<CompletedCheckpoint>() {
                        @Override
-                       public Void call() throws Exception {
-                               CompletedCheckpoint completedCheckpoint = 
retrieveCompletedCheckpoint(stateHandleAndPath);
-                               
-                               if (completedCheckpoint != null) {
-                                       
completedCheckpoint.discardOnSubsume(sharedStateRegistry);
-                               }
+                       public void apply(@Nullable 
RetrievableStateHandle<CompletedCheckpoint> value) throws FlinkException {
+                               if (value != null) {
+                                       final CompletedCheckpoint 
completedCheckpoint;
+                                       try {
+                                               completedCheckpoint = 
value.retrieveState();
+                                       } catch (Exception e) {
+                                               throw new FlinkException("Could 
not retrieve the completed checkpoint from the given state handle.", e);
+                                       }
 
-                               return null;
+                                       if (completedCheckpoint != null) {
+                                               try {
+                                                       
completedCheckpoint.discardOnSubsume(sharedStateRegistry);
+                                               } catch (Exception e) {
+                                                       throw new 
FlinkException("Could not discard the completed checkpoint on subsume.", e);
+                                               }
+                                       }
+                               }
                        }
                };
 
-               remove(stateHandleAndPath, action);
+               checkpointsInZooKeeper.releaseAndTryRemove(pathInZooKeeper, 
action);
        }
 
        private void removeShutdown(
-                       final 
Tuple2<RetrievableStateHandle<CompletedCheckpoint>, String> stateHandleAndPath,
+                       final String pathInZooKeeper,
                        final JobStatus jobStatus,
                        final SharedStateRegistry sharedStateRegistry) throws 
Exception {
 
-               Callable<Void> action = new Callable<Void>() {
+               ZooKeeperStateHandleStore.RemoveCallback<CompletedCheckpoint> 
removeAction = new 
ZooKeeperStateHandleStore.RemoveCallback<CompletedCheckpoint>() {
                        @Override
-                       public Void call() throws Exception {
-                               CompletedCheckpoint completedCheckpoint = 
retrieveCompletedCheckpoint(stateHandleAndPath);
-                               
-                               if (completedCheckpoint != null) {
-                                       
completedCheckpoint.discardOnShutdown(jobStatus, sharedStateRegistry);
-                               }
+                       public void apply(@Nullable 
RetrievableStateHandle<CompletedCheckpoint> value) throws FlinkException {
+                               if (value != null) {
+                                       final CompletedCheckpoint 
completedCheckpoint;
+
+                                       try {
+                                               completedCheckpoint = 
value.retrieveState();
+                                       } catch (Exception e) {
+                                               throw new FlinkException("Could 
not retrieve the completed checkpoint from the given state handle.", e);
+                                       }
 
-                               return null;
+                                       if (completedCheckpoint != null) {
+                                               try {
+                                                       
completedCheckpoint.discardOnShutdown(jobStatus, sharedStateRegistry);
+                                               } catch (Exception e) {
+                                                       throw new 
FlinkException("Could not discard the completed checkpoint on subsume.", e);
+                                               }
+                                       }
+                               }
                        }
                };
 
-               remove(stateHandleAndPath, action);
-       }
-
-       private void removeBrokenStateHandle(final 
Tuple2<RetrievableStateHandle<CompletedCheckpoint>, String> stateHandleAndPath) 
throws Exception {
-               remove(stateHandleAndPath, null);
+               checkpointsInZooKeeper.releaseAndTryRemove(pathInZooKeeper, 
removeAction);
        }
 
-       /**
-        * Removes the state handle from ZooKeeper, discards the checkpoints, 
and the state handle.
-        */
-       private void remove(
-                       final 
Tuple2<RetrievableStateHandle<CompletedCheckpoint>, String> stateHandleAndPath,
-                       final Callable<Void> action) throws Exception {
-
-               BackgroundCallback callback = new BackgroundCallback() {
+       private void removeBrokenStateHandle(
+                       final String pathInZooKeeper,
+                       final RetrievableStateHandle<CompletedCheckpoint> 
retrievableStateHandle) throws Exception {
+               checkpointsInZooKeeper.releaseAndTryRemove(pathInZooKeeper, new 
ZooKeeperStateHandleStore.RemoveCallback<CompletedCheckpoint>() {
                        @Override
-                       public void processResult(CuratorFramework client, 
CuratorEvent event) throws Exception {
-                               final long checkpointId = 
pathToCheckpointId(stateHandleAndPath.f1);
-
+                       public void apply(@Nullable 
RetrievableStateHandle<CompletedCheckpoint> value) throws FlinkException {
                                try {
-                                       if (event.getType() == 
CuratorEventType.DELETE) {
-                                               if (event.getResultCode() == 0) 
{
-                                                       Exception exception = 
null;
-
-                                                       if (null != action) {
-                                                               try {
-                                                                       
action.call();
-                                                               } catch 
(Exception e) {
-                                                                       
exception = new Exception("Could not execute callable action " +
-                                                                               
"for checkpoint " + checkpointId + '.', e);
-                                                               }
-                                                       }
-
-                                                       try {
-                                                               // Discard the 
state handle
-                                                               
stateHandleAndPath.f0.discardState();
-                                                       } catch (Exception e) {
-                                                               Exception 
newException = new Exception("Could not discard meta " +
-                                                                       "data 
for completed checkpoint " + checkpointId + '.', e);
-
-                                                               if (exception 
== null) {
-                                                                       
exception = newException;
-                                                               } else {
-                                                                       
exception.addSuppressed(newException);
-                                                               }
-                                                       }
-
-                                                       if (exception != null) {
-                                                               throw exception;
-                                                       }
-                                               } else {
-                                                       throw new 
IllegalStateException("Unexpected result code " +
-                                                                       
event.getResultCode() + " in '" + event + "' callback.");
-                                               }
-                                       } else {
-                                               throw new 
IllegalStateException("Unexpected event type " +
-                                                               event.getType() 
+ " in '" + event + "' callback.");
-                                       }
+                                       retrievableStateHandle.discardState();
                                } catch (Exception e) {
-                                       LOG.warn("Failed to discard checkpoint 
{}.", checkpointId, e);
+                                       throw new FlinkException("Could not 
discard state handle.", e);
                                }
                        }
-               };
-
-               // Remove state handle from ZooKeeper first. If this fails, we 
can still recover, but if
-               // we remove a state handle and fail to remove it from 
ZooKeeper, we end up in an
-               // inconsistent state.
-               checkpointsInZooKeeper.remove(stateHandleAndPath.f1, callback);
+               });
        }
 
        /**

http://git-wip-us.apache.org/repos/asf/flink/blob/f58fec70/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/ZooKeeperSubmittedJobGraphStore.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/ZooKeeperSubmittedJobGraphStore.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/ZooKeeperSubmittedJobGraphStore.java
index 2552088..fa972ed 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/ZooKeeperSubmittedJobGraphStore.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/ZooKeeperSubmittedJobGraphStore.java
@@ -157,36 +157,46 @@ public class ZooKeeperSubmittedJobGraphStore implements 
SubmittedJobGraphStore {
        @Override
        public SubmittedJobGraph recoverJobGraph(JobID jobId) throws Exception {
                checkNotNull(jobId, "Job ID");
-               String path = getPathForJob(jobId);
+               final String path = getPathForJob(jobId);
 
                LOG.debug("Recovering job graph {} from {}{}.", jobId, 
zooKeeperFullBasePath, path);
 
                synchronized (cacheLock) {
                        verifyIsRunning();
 
-                       RetrievableStateHandle<SubmittedJobGraph> 
jobGraphRetrievableStateHandle;
+                       boolean success = false;
 
                        try {
-                               jobGraphRetrievableStateHandle = 
jobGraphsInZooKeeper.get(path);
-                       } catch (KeeperException.NoNodeException ignored) {
-                               return null;
-                       } catch (Exception e) {
-                               throw new Exception("Could not retrieve the 
submitted job graph state handle " +
-                                       "for " + path + "from the submitted job 
graph store.", e);
-                       }
-                       SubmittedJobGraph jobGraph;
+                               RetrievableStateHandle<SubmittedJobGraph> 
jobGraphRetrievableStateHandle;
 
-                       try {
-                               jobGraph = 
jobGraphRetrievableStateHandle.retrieveState();
-                       } catch (Exception e) {
-                               throw new Exception("Failed to retrieve the 
submitted job graph from state handle.", e);
-                       }
+                               try {
+                                       jobGraphRetrievableStateHandle = 
jobGraphsInZooKeeper.getAndLock(path);
+                               } catch (KeeperException.NoNodeException 
ignored) {
+                                       success = true;
+                                       return null;
+                               } catch (Exception e) {
+                                       throw new Exception("Could not retrieve 
the submitted job graph state handle " +
+                                               "for " + path + "from the 
submitted job graph store.", e);
+                               }
+                               SubmittedJobGraph jobGraph;
 
-                       addedJobGraphs.add(jobGraph.getJobId());
+                               try {
+                                       jobGraph = 
jobGraphRetrievableStateHandle.retrieveState();
+                               } catch (Exception e) {
+                                       throw new Exception("Failed to retrieve 
the submitted job graph from state handle.", e);
+                               }
 
-                       LOG.info("Recovered {}.", jobGraph);
+                               addedJobGraphs.add(jobGraph.getJobId());
 
-                       return jobGraph;
+                               LOG.info("Recovered {}.", jobGraph);
+
+                               success = true;
+                               return jobGraph;
+                       } finally {
+                               if (!success) {
+                                       jobGraphsInZooKeeper.release(path);
+                               }
+                       }
                }
        }
 
@@ -207,7 +217,7 @@ public class ZooKeeperSubmittedJobGraphStore implements 
SubmittedJobGraphStore {
 
                                if (currentVersion == -1) {
                                        try {
-                                               jobGraphsInZooKeeper.add(path, 
jobGraph);
+                                               
jobGraphsInZooKeeper.addAndLock(path, jobGraph);
 
                                                
addedJobGraphs.add(jobGraph.getJobId());
 
@@ -245,7 +255,7 @@ public class ZooKeeperSubmittedJobGraphStore implements 
SubmittedJobGraphStore {
 
                synchronized (cacheLock) {
                        if (addedJobGraphs.contains(jobId)) {
-                               
jobGraphsInZooKeeper.removeAndDiscardState(path);
+                               jobGraphsInZooKeeper.releaseAndTryRemove(path);
 
                                addedJobGraphs.remove(jobId);
                        }

http://git-wip-us.apache.org/repos/asf/flink/blob/f58fec70/flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStore.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStore.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStore.java
index 364ba0f..a548f1d 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStore.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStore.java
@@ -20,28 +20,38 @@ package org.apache.flink.runtime.zookeeper;
 
 import org.apache.curator.framework.CuratorFramework;
 import org.apache.curator.framework.api.BackgroundCallback;
+import org.apache.curator.framework.api.CuratorEvent;
+import org.apache.curator.framework.api.CuratorEventType;
 import org.apache.curator.utils.ZKPaths;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.runtime.state.RetrievableStateHandle;
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.FlinkException;
 import org.apache.flink.util.InstantiationUtil;
+import org.apache.flink.util.Preconditions;
 import org.apache.zookeeper.CreateMode;
 import org.apache.zookeeper.KeeperException;
 import org.apache.zookeeper.data.Stat;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import javax.annotation.Nullable;
 import java.io.IOException;
 import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
+import java.util.UUID;
 import java.util.concurrent.Executor;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
 /**
- * State handles backed by ZooKeeper.
+ * Class which stores state via the provided {@link 
RetrievableStateStorageHelper} and writes the
+ * returned state handle to ZooKeeper. The ZooKeeper node can be locked by 
creating an ephemeral
+ * child and only allowing the deletion of the ZooKeeper node if it does not 
have any children.
+ * That way we protect concurrent accesses from different 
ZooKeeperStateHandleStore instances.
  *
  * <p>Added state is persisted via {@link RetrievableStateHandle 
RetrievableStateHandles},
  * which in turn are written to ZooKeeper. This level of indirection is 
necessary to keep the
@@ -80,6 +90,9 @@ public class ZooKeeperStateHandleStore<T extends 
Serializable> {
 
        private final Executor executor;
 
+       /** Lock node name of this ZooKeeperStateHandleStore. The name should 
be unique among all other state handle stores. */
+       private final String lockNode;
+
        /**
         * Creates a {@link ZooKeeperStateHandleStore}.
         *
@@ -99,40 +112,36 @@ public class ZooKeeperStateHandleStore<T extends 
Serializable> {
                this.client = checkNotNull(client, "Curator client");
                this.storage = checkNotNull(storage, "State storage");
                this.executor = checkNotNull(executor);
-       }
 
-       /**
-        * Creates a state handle and stores it in ZooKeeper with create mode 
{@link
-        * CreateMode#PERSISTENT}.
-        *
-        * @see #add(String, T, CreateMode)
-        */
-       public RetrievableStateHandle<T> add(String pathInZooKeeper, T state) 
throws Exception {
-               return add(pathInZooKeeper, state, CreateMode.PERSISTENT);
+               // Generate a unique lock node name
+               lockNode = UUID.randomUUID().toString();
        }
 
        /**
-        * Creates a state handle and stores it in ZooKeeper.
+        * Creates a state handle, stores it in ZooKeeper and locks it. A 
locked node cannot be removed by
+        * another {@link ZooKeeperStateHandleStore} instance as long as this 
instance remains connected
+        * to ZooKeeper.
         *
         * <p><strong>Important</strong>: This will <em>not</em> store the 
actual state in
         * ZooKeeper, but create a state handle and store it in ZooKeeper. This 
level of indirection
         * makes sure that data in ZooKeeper is small.
         *
-        * @param pathInZooKeeper Destination path in ZooKeeper (expected to 
*not* exist yet and
-        *                        start with a '/')
+        * <p>The operation will fail if there is already an node under the 
given path
+        *
+        * @param pathInZooKeeper Destination path in ZooKeeper (expected to 
*not* exist yet)
         * @param state           State to be added
-        * @param createMode      The create mode for the new path in ZooKeeper
         *
         * @return The Created {@link RetrievableStateHandle}.
         * @throws Exception If a ZooKeeper or state handle operation fails
         */
-       public RetrievableStateHandle<T> add(
+       public RetrievableStateHandle<T> addAndLock(
                        String pathInZooKeeper,
-                       T state,
-                       CreateMode createMode) throws Exception {
+                       T state) throws Exception {
                checkNotNull(pathInZooKeeper, "Path in ZooKeeper");
                checkNotNull(state, "State");
 
+               final String path = normalizePath(pathInZooKeeper);
+
                RetrievableStateHandle<T> storeHandle = storage.store(state);
 
                boolean success = false;
@@ -145,7 +154,11 @@ public class ZooKeeperStateHandleStore<T extends 
Serializable> {
                        // smaller than the state itself. This level of 
indirection makes sure that data in
                        // ZooKeeper is small, because ZooKeeper is designed 
for data in the KB range, but
                        // the state can be larger.
-                       
client.create().withMode(createMode).forPath(pathInZooKeeper, 
serializedStoreHandle);
+                       // Create the lock node in a transaction with the 
actual state node. That way we can prevent
+                       // race conditions with a concurrent delete operation.
+                       
client.inTransaction().create().withMode(CreateMode.PERSISTENT).forPath(path, 
serializedStoreHandle)
+                               
.and().create().withMode(CreateMode.EPHEMERAL).forPath(getLockPath(path))
+                               .and().commit();
 
                        success = true;
                        return storeHandle;
@@ -172,7 +185,9 @@ public class ZooKeeperStateHandleStore<T extends 
Serializable> {
                checkNotNull(pathInZooKeeper, "Path in ZooKeeper");
                checkNotNull(state, "State");
 
-               RetrievableStateHandle<T> oldStateHandle = get(pathInZooKeeper);
+               final String path = normalizePath(pathInZooKeeper);
+
+               RetrievableStateHandle<T> oldStateHandle = get(path, false);
 
                RetrievableStateHandle<T> newStateHandle = storage.store(state);
 
@@ -185,7 +200,7 @@ public class ZooKeeperStateHandleStore<T extends 
Serializable> {
                        // Replace state handle in ZooKeeper.
                        client.setData()
                                        .withVersion(expectedVersion)
-                                       .forPath(pathInZooKeeper, 
serializedStateHandle);
+                                       .forPath(path, serializedStateHandle);
                        success = true;
                } finally {
                        if(success) {
@@ -207,7 +222,9 @@ public class ZooKeeperStateHandleStore<T extends 
Serializable> {
        public int exists(String pathInZooKeeper) throws Exception {
                checkNotNull(pathInZooKeeper, "Path in ZooKeeper");
 
-               Stat stat = client.checkExists().forPath(pathInZooKeeper);
+               final String path = normalizePath(pathInZooKeeper);
+
+               Stat stat = client.checkExists().forPath(path);
 
                if (stat != null) {
                        return stat.getVersion();
@@ -217,32 +234,17 @@ public class ZooKeeperStateHandleStore<T extends 
Serializable> {
        }
 
        /**
-        * Gets a state handle from ZooKeeper.
+        * Gets the {@link RetrievableStateHandle} stored in the given 
ZooKeeper node and locks it. A
+        * locked node cannot be removed by another {@link 
ZooKeeperStateHandleStore} instance as long
+        * as this instance remains connected to ZooKeeper.
         *
-        * @param pathInZooKeeper Path in ZooKeeper to get the state handle 
from (expected to
-        *                        exist and start with a '/').
-        * @return The state handle
-        * @throws Exception If a ZooKeeper or state handle operation fails
+        * @param pathInZooKeeper Path to the ZooKeeper node which contains the 
state handle
+        * @return The retrieved state handle from the specified ZooKeeper node
+        * @throws IOException Thrown if the method failed to deserialize the 
stored state handle
+        * @throws Exception Thrown if a ZooKeeper operation failed
         */
-       @SuppressWarnings("unchecked")
-       public RetrievableStateHandle<T> get(String pathInZooKeeper) throws 
Exception {
-               checkNotNull(pathInZooKeeper, "Path in ZooKeeper");
-
-               byte[] data;
-
-               try {
-                       data = client.getData().forPath(pathInZooKeeper);
-               } catch (Exception e) {
-                       throw new Exception("Failed to retrieve state handle 
data under " + pathInZooKeeper +
-                               " from ZooKeeper.", e);
-               }
-
-               try {
-                       return InstantiationUtil.deserializeObject(data, 
Thread.currentThread().getContextClassLoader());
-               } catch (IOException | ClassNotFoundException e) {
-                       throw new IOException("Failed to deserialize state 
handle from ZooKeeper data from " +
-                               pathInZooKeeper + '.', e);
-               }
+       public RetrievableStateHandle<T> getAndLock(String pathInZooKeeper) 
throws Exception {
+               return get(pathInZooKeeper, true);
        }
 
        /**
@@ -270,7 +272,7 @@ public class ZooKeeperStateHandleStore<T extends 
Serializable> {
        }
 
        /**
-        * Gets all available state handles from ZooKeeper.
+        * Gets all available state handles from ZooKeeper and locks the 
respective state nodes.
         *
         * <p>If there is a concurrent modification, the operation is retried 
until it succeeds.
         *
@@ -278,7 +280,7 @@ public class ZooKeeperStateHandleStore<T extends 
Serializable> {
         * @throws Exception If a ZooKeeper or state handle operation fails
         */
        @SuppressWarnings("unchecked")
-       public List<Tuple2<RetrievableStateHandle<T>, String>> getAll() throws 
Exception {
+       public List<Tuple2<RetrievableStateHandle<T>, String>> getAllAndLock() 
throws Exception {
                final List<Tuple2<RetrievableStateHandle<T>, String>> 
stateHandles = new ArrayList<>();
 
                boolean success = false;
@@ -300,7 +302,7 @@ public class ZooKeeperStateHandleStore<T extends 
Serializable> {
                                        path = "/" + path;
 
                                        try {
-                                               final RetrievableStateHandle<T> 
stateHandle = get(path);
+                                               final RetrievableStateHandle<T> 
stateHandle = getAndLock(path);
                                                stateHandles.add(new 
Tuple2<>(stateHandle, path));
                                        } catch 
(KeeperException.NoNodeException ignored) {
                                                // Concurrent deletion, retry
@@ -323,7 +325,8 @@ public class ZooKeeperStateHandleStore<T extends 
Serializable> {
 
 
        /**
-        * Gets all available state handles from ZooKeeper sorted by name 
(ascending).
+        * Gets all available state handles from ZooKeeper sorted by name 
(ascending) and locks the
+        * respective state nodes.
         *
         * <p>If there is a concurrent modification, the operation is retried 
until it succeeds.
         *
@@ -331,7 +334,7 @@ public class ZooKeeperStateHandleStore<T extends 
Serializable> {
         * @throws Exception If a ZooKeeper or state handle operation fails
         */
        @SuppressWarnings("unchecked")
-       public List<Tuple2<RetrievableStateHandle<T>, String>> 
getAllSortedByName() throws Exception {
+       public List<Tuple2<RetrievableStateHandle<T>, String>> 
getAllSortedByNameAndLock() throws Exception {
                final List<Tuple2<RetrievableStateHandle<T>, String>> 
stateHandles = new ArrayList<>();
 
                boolean success = false;
@@ -355,14 +358,16 @@ public class ZooKeeperStateHandleStore<T extends 
Serializable> {
                                        path = "/" + path;
 
                                        try {
-                                               final RetrievableStateHandle<T> 
stateHandle = get(path);
+                                               final RetrievableStateHandle<T> 
stateHandle = getAndLock(path);
                                                stateHandles.add(new 
Tuple2<>(stateHandle, path));
                                        } catch 
(KeeperException.NoNodeException ignored) {
                                                // Concurrent deletion, retry
                                                continue retry;
                                        } catch (IOException ioException) {
                                                LOG.warn("Could not get all 
ZooKeeper children. Node {} contained " +
-                                                       "corrupted data. 
Ignoring this node.", path, ioException);
+                                                       "corrupted data. 
Releasing and trying to remove this node.", path, ioException);
+
+                                               releaseAndTryRemove(path);
                                        }
                                }
 
@@ -370,6 +375,9 @@ public class ZooKeeperStateHandleStore<T extends 
Serializable> {
 
                                // Check for concurrent modifications
                                success = initialCVersion == finalCVersion;
+
+                               // we don't have to release all locked nodes in 
case of a concurrent modification, because we
+                               // will retrieve them in the next iteration 
again.
                        }
                }
 
@@ -377,75 +385,306 @@ public class ZooKeeperStateHandleStore<T extends 
Serializable> {
        }
 
        /**
-        * Removes a state handle from ZooKeeper.
+        * Releases the lock for the given state node and tries to remove the 
state node if it is no longer locked.
+        * The deletion of the state node is executed asynchronously.
         *
-        * <p><strong>Important</strong>: this does not discard the state 
handle. If you want to
-        * discard the state handle call {@link #removeAndDiscardState(String)}.
+        * <p><strong>Important</strong>: This also discards the stored state 
handle after the given action
+        * has been executed.
         *
         * @param pathInZooKeeper Path of state handle to remove (expected to 
start with a '/')
         * @throws Exception If the ZooKeeper operation fails
         */
-       public void remove(String pathInZooKeeper) throws Exception {
-               checkNotNull(pathInZooKeeper, "Path in ZooKeeper");
-
-               
client.delete().deletingChildrenIfNeeded().forPath(pathInZooKeeper);
+       public void releaseAndTryRemove(String pathInZooKeeper) throws 
Exception {
+               releaseAndTryRemove(pathInZooKeeper, null);
        }
 
        /**
-        * Removes a state handle from ZooKeeper asynchronously.
+        * Releases the lock for the given state node and tries to remove the 
state node if it is no longer locked.
+        * The deletion of the state node is executed asynchronously. After the 
state node has been deleted, the given
+        * callback is called with the {@link RetrievableStateHandle} of the 
deleted state node.
         *
-        * <p><strong>Important</strong>: this does not discard the state 
handle. If you want to
-        * discard the state handle call {@link #removeAndDiscardState(String)}.
+        * <p><strong>Important</strong>: This also discards the stored state 
handle after the given action
+        * has been executed.
         *
-        * @param pathInZooKeeper Path of state handle to remove (expected to 
start with a '/')
-        * @param callback        The callback after the operation finishes
+        * @param pathInZooKeeper Path of state handle to remove
+        * @param callback The callback to execute after a successful deletion. 
Null if no action needs to be executed.
         * @throws Exception If the ZooKeeper operation fails
         */
-       public void remove(String pathInZooKeeper, BackgroundCallback callback) 
throws Exception {
+       public void releaseAndTryRemove(
+                       String pathInZooKeeper,
+                       @Nullable final RemoveCallback<T> callback) throws 
Exception {
                checkNotNull(pathInZooKeeper, "Path in ZooKeeper");
-               checkNotNull(callback, "Background callback");
 
-               
client.delete().deletingChildrenIfNeeded().inBackground(callback, 
executor).forPath(pathInZooKeeper);
+               final String path = normalizePath(pathInZooKeeper);
+
+               RetrievableStateHandle<T> stateHandle = null;
+
+               try {
+                       stateHandle = get(path, false);
+               } catch (Exception e) {
+                       LOG.warn("Could not retrieve the state handle from node 
" + path + '.', e);
+               }
+
+               release(pathInZooKeeper);
+
+               final BackgroundCallback backgroundCallback = new 
RemoveBackgroundCallback<>(stateHandle, callback, path);
+
+               client.delete().inBackground(backgroundCallback, 
executor).forPath(path);
        }
 
        /**
-        * Discards a state handle and removes it from ZooKeeper.
+        * Releases all lock nodes of this ZooKeeperStateHandleStores and tries 
to remove all state nodes which
+        * are not locked anymore.
         *
-        * <p>If you only want to remove the state handle in ZooKeeper call 
{@link #remove(String)}.
+        * <p>The delete operation is executed asynchronously
         *
-        * @param pathInZooKeeper Path of state handle to discard (expected to 
start with a '/')
-        * @throws Exception If the ZooKeeper or state handle operation fails
+        * @throws Exception if the delete operation fails
         */
-       public void removeAndDiscardState(String pathInZooKeeper) throws 
Exception {
+       public void releaseAndTryRemoveAll() throws Exception {
+               Collection<String> children = getAllPaths();
+
+               Exception exception = null;
+
+               for (String child : children) {
+                       try {
+                               releaseAndTryRemove('/' + child);
+                       } catch (Exception e) {
+                               exception = ExceptionUtils.firstOrSuppressed(e, 
exception);
+                       }
+               }
+
+               if (exception != null) {
+                       throw new Exception("Could not properly release and try 
removing all state nodes.", exception);
+               }
+       }
+
+       /**
+        * Releases the lock from the node under the given ZooKeeper path. If 
no lock exists, then nothing happens.
+        *
+        * @param pathInZooKeeper Path describing the ZooKeeper node
+        * @throws Exception if the delete operation of the lock node fails
+        */
+       public void release(String pathInZooKeeper) throws Exception {
+               final String path = normalizePath(pathInZooKeeper);
+
+               try {
+                       client.delete().forPath(getLockPath(path));
+               } catch (KeeperException.NoNodeException ignored) {
+                       // we have never locked this node
+               } catch (Exception e) {
+                       throw new Exception("Could not release the lock: " + 
getLockPath(pathInZooKeeper) + '.', e);
+               }
+       }
+
+       /**
+        * Releases all lock nodes of this ZooKeeperStateHandleStore.
+        *
+        * @throws Exception if the delete operation of a lock file fails
+        */
+       public void releaseAll() throws Exception {
+               Collection<String> children = getAllPaths();
+
+               Exception exception = null;
+
+               for (String child: children) {
+                       try {
+                               release(child);
+                       } catch (Exception e) {
+                               exception = ExceptionUtils.firstOrSuppressed(e, 
exception);
+                       }
+               }
+
+               if (exception != null) {
+                       throw new Exception("Could not properly release all 
state nodes.", exception);
+               }
+       }
+
+       // 
---------------------------------------------------------------------------------------------------------
+       // Protected methods
+       // 
---------------------------------------------------------------------------------------------------------
+
+       /**
+        * Returns the path for the lock node relative to the given path.
+        *
+        * @param rootPath Root path under which the lock node shall be created
+        * @return Path for the lock node
+        */
+       protected String getLockPath(String rootPath) {
+               return rootPath + '/' + lockNode;
+       }
+
+       // 
---------------------------------------------------------------------------------------------------------
+       // Private methods
+       // 
---------------------------------------------------------------------------------------------------------
+
+       /**
+        * Gets a state handle from ZooKeeper and optionally locks it.
+        *
+        * @param pathInZooKeeper Path in ZooKeeper to get the state handle from
+        * @param lock True if we should lock the node; otherwise false
+        * @return The state handle
+        * @throws IOException Thrown if the method failed to deserialize the 
stored state handle
+        * @throws Exception Thrown if a ZooKeeper operation failed
+        */
+       @SuppressWarnings("unchecked")
+       private RetrievableStateHandle<T> get(String pathInZooKeeper, boolean 
lock) throws Exception {
                checkNotNull(pathInZooKeeper, "Path in ZooKeeper");
 
-               RetrievableStateHandle<T> stateHandle = get(pathInZooKeeper);
+               final String path = normalizePath(pathInZooKeeper);
+
+               if (lock) {
+                       // try to lock the node
+                       try {
+                               
client.create().withMode(CreateMode.EPHEMERAL).forPath(getLockPath(path));
+                       } catch (KeeperException.NodeExistsException ignored) {
+                               // we have already created the lock
+                       } catch (KeeperException.NoNodeException e) {
+                               throw new Exception("Cannot lock the node " + 
path + " since it does not exist.", e);
+                       }
+               }
+
+               boolean success = false;
+
+               try {
+                       byte[] data;
+
+                       try {
+                               data = client.getData().forPath(path);
+                       } catch (Exception e) {
+                               throw new Exception("Failed to retrieve state 
handle data under " + path +
+                                       " from ZooKeeper.", e);
+                       }
+
+                       try {
+                               RetrievableStateHandle<T> 
retrievableStateHandle = InstantiationUtil.deserializeObject(
+                                       data,
+                                       
Thread.currentThread().getContextClassLoader());
 
-               // Delete the state handle from ZooKeeper first
-               
client.delete().deletingChildrenIfNeeded().forPath(pathInZooKeeper);
+                               success = true;
 
-               // Discard the state handle only after it has been successfully 
deleted from ZooKeeper.
-               // Otherwise we might enter an illegal state after failures 
(with a state handle in
-               // ZooKeeper, which has already been discarded).
-               stateHandle.discardState();
+                               return retrievableStateHandle;
+                       } catch (IOException | ClassNotFoundException e) {
+                               throw new IOException("Failed to deserialize 
state handle from ZooKeeper data from " +
+                                       path + '.', e);
+                       }
+               } finally {
+                       if (!success && lock) {
+                               // release the lock
+                               release(path);
+                       }
+               }
        }
 
        /**
-        * Discards all available state handles and removes them from ZooKeeper.
+        * Makes sure that every path starts with a "/"
         *
-        * @throws Exception If a ZooKeeper or state handle operation fails
+        * @param path Path to normalize
+        * @return Normalized path such that it starts with a "/"
         */
-       public void removeAndDiscardAllState() throws Exception {
-               final List<Tuple2<RetrievableStateHandle<T>, String>> 
allStateHandles = getAll();
+       private static String normalizePath(String path) {
+               if (path.startsWith("/")) {
+                       return path;
+               } else {
+                       return '/' + path;
+               }
+       }
 
-               ZKPaths.deleteChildren(
-                               client.getZookeeperClient().getZooKeeper(),
-                               ZKPaths.fixForNamespace(client.getNamespace(), 
"/"),
-                               false);
+       // 
---------------------------------------------------------------------------------------------------------
+       // Utility classes
+       // 
---------------------------------------------------------------------------------------------------------
 
-               // Discard the state handles only after they have been 
successfully deleted from ZooKeeper.
-               for (Tuple2<RetrievableStateHandle<T>, String> 
stateHandleAndPath : allStateHandles) {
-                       stateHandleAndPath.f0.discardState();
+       /**
+        * Callback which is executed when removing a node from ZooKeeper. The 
callback will call the given
+        * {@link RemoveCallback} if it is not null. Afterwards, it will 
discard the given {@link RetrievableStateHandle}
+        * if it is not null.
+        *
+        * @param <T> Type of the value stored in the RetrievableStateHandle
+        */
+       private static final class RemoveBackgroundCallback<T extends 
Serializable> implements BackgroundCallback {
+               @Nullable
+               private final RetrievableStateHandle<T> stateHandle;
+
+               @Nullable
+               private final RemoveCallback<T> callback;
+
+               private final String pathInZooKeeper;
+
+               private RemoveBackgroundCallback(
+                       @Nullable RetrievableStateHandle<T> stateHandle,
+                       @Nullable RemoveCallback<T> callback,
+                       String pathInZooKeeper) {
+
+                       this.stateHandle = stateHandle;
+                       this.callback = callback;
+                       this.pathInZooKeeper = 
Preconditions.checkNotNull(pathInZooKeeper);
                }
+
+               @Override
+               public void processResult(CuratorFramework client, CuratorEvent 
event) throws Exception {
+                       try {
+                               if (event.getType() == CuratorEventType.DELETE) 
{
+                                       final KeeperException.Code resultCode = 
KeeperException.Code.get(event.getResultCode());
+
+                                       if (resultCode == 
KeeperException.Code.OK) {
+                                               Exception exception = null;
+
+                                               if (null != callback) {
+                                                       try {
+                                                               
callback.apply(stateHandle);
+                                                       } catch (Throwable e) {
+                                                               exception = new 
Exception("Could not execute delete action for node " +
+                                                                       
pathInZooKeeper + '.', e);
+                                                       }
+                                               }
+
+                                               if (stateHandle != null) {
+                                                       try {
+                                                               // Discard the 
state handle
+                                                               
stateHandle.discardState();
+                                                       } catch (Throwable e) {
+                                                               Exception 
newException = new Exception("Could not discard state handle of node " +
+                                                                       
pathInZooKeeper + '.', e);
+
+                                                               if (exception 
== null) {
+                                                                       
exception = newException;
+                                                               } else {
+                                                                       
exception.addSuppressed(newException);
+                                                               }
+                                                       }
+                                               }
+
+                                               if (exception != null) {
+                                                       throw exception;
+                                               }
+                                       } else if (resultCode == 
KeeperException.Code.NOTEMPTY) {
+                                               // Could not delete the node 
because it still contains children/locks
+                                               LOG.debug("Could not delete 
node " + pathInZooKeeper + " because it is still locked.");
+                                       } else {
+                                               throw new 
IllegalStateException("Unexpected result code " +
+                                                       resultCode.name() + " 
in '" + event + "' callback.");
+                                       }
+                               } else {
+                                       throw new 
IllegalStateException("Unexpected event type " +
+                                               event.getType() + " in '" + 
event + "' callback.");
+                               }
+                       } catch (Exception e) {
+                               LOG.warn("Failed to run callback for delete 
operation on node " + pathInZooKeeper + '.', e);
+                       }
+
+               }
+       };
+
+       /**
+        * Callback interface for remove calls
+        */
+       public interface RemoveCallback<T extends Serializable> {
+               /**
+                * Callback method. The parameter can be null if the {@link 
RetrievableStateHandle} could not be retrieved
+                * from ZooKeeper.
+                *
+                * @param value RetrievableStateHandle retrieved from 
ZooKeeper, null if it was not retrievable
+                * @throws FlinkException If the callback failed
+                */
+               void apply(@Nullable RetrievableStateHandle<T> value) throws 
FlinkException;
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/f58fec70/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointStoreTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointStoreTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointStoreTest.java
index 94bd12f..985c662 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointStoreTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointStoreTest.java
@@ -32,6 +32,7 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNull;
@@ -307,6 +308,14 @@ public abstract class CompletedCheckpointStoreTest extends 
TestLogger {
                        }
                }
 
+               public boolean awaitDiscard(long timeout) throws 
InterruptedException {
+                       if (discardLatch != null) {
+                               return discardLatch.await(timeout, 
TimeUnit.MILLISECONDS);
+                       } else {
+                               return false;
+                       }
+               }
+
                @Override
                public boolean equals(Object o) {
                        if (this == o) return true;

http://git-wip-us.apache.org/repos/asf/flink/blob/f58fec70/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreITCase.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreITCase.java
index 3fd7f1b..0d93289 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreITCase.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreITCase.java
@@ -24,50 +24,52 @@ import org.apache.flink.runtime.jobgraph.JobStatus;
 import org.apache.flink.runtime.state.RetrievableStateHandle;
 import org.apache.flink.runtime.zookeeper.RetrievableStateStorageHelper;
 import org.apache.flink.runtime.zookeeper.ZooKeeperTestEnvironment;
+import org.apache.zookeeper.data.Stat;
 import org.junit.AfterClass;
 import org.junit.Before;
 import org.junit.Test;
 
 import java.io.Serializable;
 import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
 import java.util.List;
+import java.util.concurrent.atomic.AtomicInteger;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
 
 /**
  * Tests for basic {@link CompletedCheckpointStore} contract and ZooKeeper 
state handling.
  */
 public class ZooKeeperCompletedCheckpointStoreITCase extends 
CompletedCheckpointStoreTest {
 
-       private final static ZooKeeperTestEnvironment ZooKeeper = new 
ZooKeeperTestEnvironment(1);
+       private static final ZooKeeperTestEnvironment ZOOKEEPER = new 
ZooKeeperTestEnvironment(1);
 
-       private final static String CheckpointsPath = "/checkpoints";
+       private static final String CHECKPOINT_PATH = "/checkpoints";
 
        @AfterClass
        public static void tearDown() throws Exception {
-               if (ZooKeeper != null) {
-                       ZooKeeper.shutdown();
+               if (ZOOKEEPER != null) {
+                       ZOOKEEPER.shutdown();
                }
        }
 
        @Before
        public void cleanUp() throws Exception {
-               ZooKeeper.deleteAll();
+               ZOOKEEPER.deleteAll();
        }
 
        @Override
-       protected AbstractCompletedCheckpointStore createCompletedCheckpoints(
-                       int maxNumberOfCheckpointsToRetain) throws Exception {
-
+       protected ZooKeeperCompletedCheckpointStore 
createCompletedCheckpoints(int maxNumberOfCheckpointsToRetain) throws Exception 
{
                return new 
ZooKeeperCompletedCheckpointStore(maxNumberOfCheckpointsToRetain,
-                       ZooKeeper.createClient(), CheckpointsPath, new 
RetrievableStateStorageHelper<CompletedCheckpoint>() {
-                       @Override
-                       public RetrievableStateHandle<CompletedCheckpoint> 
store(CompletedCheckpoint state) throws Exception {
-                               return new HeapRetrievableStateHandle<>(state);
-                       }
-               }, Executors.directExecutor());
+                       ZOOKEEPER.getClient(),
+                       CHECKPOINT_PATH,
+                       new HeapStateStorageHelper(),
+                       Executors.directExecutor());
        }
 
        // 
---------------------------------------------------------------------------------------------
@@ -95,7 +97,7 @@ public class ZooKeeperCompletedCheckpointStoreITCase extends 
CompletedCheckpoint
                
verifyCheckpointRegistered(expected[2].getOperatorStates().values(), 
checkpoints.sharedStateRegistry);
 
                // All three should be in ZK
-               assertEquals(3, 
ZooKeeper.getClient().getChildren().forPath(CheckpointsPath).size());
+               assertEquals(3, 
ZOOKEEPER.getClient().getChildren().forPath(CHECKPOINT_PATH).size());
                assertEquals(3, checkpoints.getNumberOfRetainedCheckpoints());
 
                resetCheckpoint(expected[0].getOperatorStates().values());
@@ -105,7 +107,7 @@ public class ZooKeeperCompletedCheckpointStoreITCase 
extends CompletedCheckpoint
                // Recover TODO!!! clear registry!
                checkpoints.recover();
 
-               assertEquals(3, 
ZooKeeper.getClient().getChildren().forPath(CheckpointsPath).size());
+               assertEquals(3, 
ZOOKEEPER.getClient().getChildren().forPath(CHECKPOINT_PATH).size());
                assertEquals(3, checkpoints.getNumberOfRetainedCheckpoints());
                assertEquals(expected[2], checkpoints.getLatestCheckpoint());
 
@@ -130,18 +132,18 @@ public class ZooKeeperCompletedCheckpointStoreITCase 
extends CompletedCheckpoint
         */
        @Test
        public void testShutdownDiscardsCheckpoints() throws Exception {
-               CuratorFramework client = ZooKeeper.getClient();
+               CuratorFramework client = ZOOKEEPER.getClient();
 
                CompletedCheckpointStore store = createCompletedCheckpoints(1);
                TestCompletedCheckpoint checkpoint = createCheckpoint(0);
 
                store.addCheckpoint(checkpoint);
                assertEquals(1, store.getNumberOfRetainedCheckpoints());
-               assertNotNull(client.checkExists().forPath(CheckpointsPath + 
ZooKeeperCompletedCheckpointStore.checkpointIdToPath(checkpoint.getCheckpointID())));
+               assertNotNull(client.checkExists().forPath(CHECKPOINT_PATH + 
ZooKeeperCompletedCheckpointStore.checkpointIdToPath(checkpoint.getCheckpointID())));
 
                store.shutdown(JobStatus.FINISHED);
                assertEquals(0, store.getNumberOfRetainedCheckpoints());
-               assertNull(client.checkExists().forPath(CheckpointsPath + 
ZooKeeperCompletedCheckpointStore.checkpointIdToPath(checkpoint.getCheckpointID())));
+               assertNull(client.checkExists().forPath(CHECKPOINT_PATH + 
ZooKeeperCompletedCheckpointStore.checkpointIdToPath(checkpoint.getCheckpointID())));
 
                store.recover();
 
@@ -149,24 +151,30 @@ public class ZooKeeperCompletedCheckpointStoreITCase 
extends CompletedCheckpoint
        }
 
        /**
-        * Tests that suspends keeps all checkpoints (as they can be recovered
-        * later by the ZooKeeper store).
+        * Tests that suspends keeps all checkpoints (so that they can be 
recovered
+        * later by the ZooKeeper store). Furthermore, suspending a job should 
release
+        * all locks.
         */
        @Test
        public void testSuspendKeepsCheckpoints() throws Exception {
-               CuratorFramework client = ZooKeeper.getClient();
+               CuratorFramework client = ZOOKEEPER.getClient();
 
                CompletedCheckpointStore store = createCompletedCheckpoints(1);
                TestCompletedCheckpoint checkpoint = createCheckpoint(0);
 
                store.addCheckpoint(checkpoint);
                assertEquals(1, store.getNumberOfRetainedCheckpoints());
-               assertNotNull(client.checkExists().forPath(CheckpointsPath + 
ZooKeeperCompletedCheckpointStore.checkpointIdToPath(checkpoint.getCheckpointID())));
+               assertNotNull(client.checkExists().forPath(CHECKPOINT_PATH + 
ZooKeeperCompletedCheckpointStore.checkpointIdToPath(checkpoint.getCheckpointID())));
 
                store.shutdown(JobStatus.SUSPENDED);
 
                assertEquals(0, store.getNumberOfRetainedCheckpoints());
-               assertNotNull(client.checkExists().forPath(CheckpointsPath + 
ZooKeeperCompletedCheckpointStore.checkpointIdToPath(checkpoint.getCheckpointID())));
+
+               final String checkpointPath = CHECKPOINT_PATH + 
ZooKeeperCompletedCheckpointStore.checkpointIdToPath(checkpoint.getCheckpointID());
+               Stat stat = client.checkExists().forPath(checkpointPath);
+
+               assertNotNull("The checkpoint node should exist.", stat);
+               assertEquals("The checkpoint node should not be locked.", 0, 
stat.getNumChildren());
 
                // Recover again
                store.recover();
@@ -201,24 +209,91 @@ public class ZooKeeperCompletedCheckpointStoreITCase 
extends CompletedCheckpoint
                assertEquals(checkpoints.get(checkpoints.size() -1), 
latestCheckpoint);
        }
 
+       /**
+        * FLINK-6612
+        *
+        * Checks that a concurrent checkpoint completion won't discard a 
checkpoint which has been
+        * recovered by a different completed checkpoint store.
+        */
+       @Test
+       public void testConcurrentCheckpointOperations() throws Exception {
+               final int numberOfCheckpoints = 1;
+               final long waitingTimeout = 50L;
+
+               ZooKeeperCompletedCheckpointStore zkCheckpointStore1 = 
createCompletedCheckpoints(numberOfCheckpoints);
+               ZooKeeperCompletedCheckpointStore zkCheckpointStore2 = 
createCompletedCheckpoints(numberOfCheckpoints);
+
+               TestCompletedCheckpoint completedCheckpoint = 
createCheckpoint(1);
+
+               // complete the first checkpoint
+               zkCheckpointStore1.addCheckpoint(completedCheckpoint);
+
+               // recover the checkpoint by a different checkpoint store
+               zkCheckpointStore2.recover();
+
+               CompletedCheckpoint recoveredCheckpoint = 
zkCheckpointStore2.getLatestCheckpoint();
+               assertTrue(recoveredCheckpoint instanceof 
TestCompletedCheckpoint);
+               TestCompletedCheckpoint recoveredTestCheckpoint = 
(TestCompletedCheckpoint) recoveredCheckpoint;
+
+               // Check that the recovered checkpoint is not yet discarded
+               assertFalse(recoveredTestCheckpoint.isDiscarded());
+
+               // complete another checkpoint --> this should remove the first 
checkpoint from the store
+               // because the number of retained checkpoints == 1
+               TestCompletedCheckpoint completedCheckpoint2 = 
createCheckpoint(2);
+               zkCheckpointStore1.addCheckpoint(completedCheckpoint2);
+
+               List<CompletedCheckpoint> allCheckpoints = 
zkCheckpointStore1.getAllCheckpoints();
+
+               // check that we have removed the first checkpoint from 
zkCompletedStore1
+               assertEquals(Collections.singletonList(completedCheckpoint2), 
allCheckpoints);
+
+               // lets wait a little bit to see that no discard operation will 
be executed
+               assertFalse("The checkpoint should not have been discarded.", 
recoveredTestCheckpoint.awaitDiscard(waitingTimeout));
+
+               // check that we have not discarded the first completed 
checkpoint
+               assertFalse(recoveredTestCheckpoint.isDiscarded());
+
+               TestCompletedCheckpoint completedCheckpoint3 = 
createCheckpoint(3);
+
+               // this should release the last lock on completedCheckoint and 
thus discard it
+               zkCheckpointStore2.addCheckpoint(completedCheckpoint3);
+
+               // the checkpoint should be discarded eventually because there 
is no lock on it anymore
+               recoveredTestCheckpoint.awaitDiscard();
+       }
+
+
+       static class HeapStateStorageHelper implements 
RetrievableStateStorageHelper<CompletedCheckpoint> {
+               @Override
+               public RetrievableStateHandle<CompletedCheckpoint> 
store(CompletedCheckpoint state) throws Exception {
+                       return new HeapRetrievableStateHandle<>(state);
+               }
+       }
+
        static class HeapRetrievableStateHandle<T extends Serializable> 
implements RetrievableStateHandle<T> {
 
                private static final long serialVersionUID = -268548467968932L;
 
+               private static AtomicInteger nextKey = new AtomicInteger(0);
+
+               private static HashMap<Integer, Object> stateMap = new 
HashMap<>();
+
+               private final int key;
+
                public HeapRetrievableStateHandle(T state) {
-                       this.state = state;
+                       key = nextKey.getAndIncrement();
+                       stateMap.put(key, state);
                }
 
-               private T state;
-
                @Override
                public T retrieveState() throws Exception {
-                       return state;
+                       return (T) stateMap.get(key);
                }
 
                @Override
                public void discardState() throws Exception {
-                       state = null;
+                       stateMap.remove(key);
                }
 
                @Override

http://git-wip-us.apache.org/repos/asf/flink/blob/f58fec70/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreTest.java
index 0d22dc6..7d22d8e 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreTest.java
@@ -110,7 +110,7 @@ public class ZooKeeperCompletedCheckpointStoreTest extends 
TestLogger {
 
                ZooKeeperStateHandleStore<CompletedCheckpoint> 
zooKeeperStateHandleStoreMock = spy(new ZooKeeperStateHandleStore<>(client, 
storageHelperMock, Executors.directExecutor()));
                
whenNew(ZooKeeperStateHandleStore.class).withAnyArguments().thenReturn(zooKeeperStateHandleStoreMock);
-               
doReturn(checkpointsInZooKeeper).when(zooKeeperStateHandleStoreMock).getAllSortedByName();
+               
doReturn(checkpointsInZooKeeper).when(zooKeeperStateHandleStoreMock).getAllSortedByNameAndLock();
 
                final int numCheckpointsToRetain = 1;
 
@@ -126,7 +126,6 @@ public class ZooKeeperCompletedCheckpointStoreTest extends 
TestLogger {
                when(
                        client
                                .delete()
-                               .deletingChildrenIfNeeded()
                                .inBackground(any(BackgroundCallback.class), 
any(Executor.class))
                ).thenAnswer(new Answer<ErrorListenerPathable<Void>>() {
                        @Override
@@ -150,13 +149,13 @@ public class ZooKeeperCompletedCheckpointStoreTest 
extends TestLogger {
                });
 
                final String checkpointsPath = "foobar";
-               final RetrievableStateStorageHelper<CompletedCheckpoint> 
stateSotrage = mock(RetrievableStateStorageHelper.class);
+               final RetrievableStateStorageHelper<CompletedCheckpoint> 
stateStorage = mock(RetrievableStateStorageHelper.class);
 
                ZooKeeperCompletedCheckpointStore 
zooKeeperCompletedCheckpointStore = new ZooKeeperCompletedCheckpointStore(
                        numCheckpointsToRetain,
                        client,
                        checkpointsPath,
-                       stateSotrage,
+                       stateStorage,
                        Executors.directExecutor());
 
                zooKeeperCompletedCheckpointStore.recover();
@@ -209,9 +208,9 @@ public class ZooKeeperCompletedCheckpointStoreTest extends 
TestLogger {
                                
                                return retrievableStateHandle;
                        }
-               }).when(zookeeperStateHandleStoreMock).add(anyString(), 
any(CompletedCheckpoint.class));
+               }).when(zookeeperStateHandleStoreMock).addAndLock(anyString(), 
any(CompletedCheckpoint.class));
                
-               doThrow(new 
Exception()).when(zookeeperStateHandleStoreMock).remove(anyString(), 
any(BackgroundCallback.class));
+               doThrow(new 
Exception()).when(zookeeperStateHandleStoreMock).releaseAndTryRemove(anyString(),
 any(ZooKeeperStateHandleStore.RemoveCallback.class));
                
                final int numCheckpointsToRetain = 1;
                final String checkpointsPath = "foobar";

http://git-wip-us.apache.org/repos/asf/flink/blob/f58fec70/flink-runtime/src/test/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStoreITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStoreITCase.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStoreITCase.java
deleted file mode 100644
index 4dc4c6b..0000000
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStoreITCase.java
+++ /dev/null
@@ -1,642 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.zookeeper;
-
-import org.apache.curator.framework.CuratorFramework;
-import org.apache.curator.framework.api.BackgroundCallback;
-import org.apache.curator.framework.api.CuratorEvent;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.runtime.concurrent.Executors;
-import org.apache.flink.runtime.state.RetrievableStateHandle;
-import org.apache.flink.util.InstantiationUtil;
-import org.apache.flink.util.TestLogger;
-import org.apache.zookeeper.CreateMode;
-import org.apache.zookeeper.data.Stat;
-import org.junit.AfterClass;
-import org.junit.Before;
-import org.junit.Test;
-import org.mockito.invocation.InvocationOnMock;
-import org.mockito.stubbing.Answer;
-
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Set;
-import java.util.concurrent.CountDownLatch;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-import static org.mockito.Matchers.any;
-import static org.mockito.Matchers.eq;
-import static org.mockito.Mockito.doAnswer;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.spy;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
-
-/**
- * Tests for basic {@link ZooKeeperStateHandleStore} behaviour.
- *
- * <p> Tests include:
- * <ul>
- * <li>Expected usage of operations</li>
- * <li>Correct ordering of ZooKeeper and state handle operations</li>
- * </ul>
- */
-public class ZooKeeperStateHandleStoreITCase extends TestLogger {
-
-       private final static ZooKeeperTestEnvironment ZooKeeper = new 
ZooKeeperTestEnvironment(1);
-
-       @AfterClass
-       public static void tearDown() throws Exception {
-               if (ZooKeeper != null) {
-                       ZooKeeper.shutdown();
-               }
-       }
-
-       @Before
-       public void cleanUp() throws Exception {
-               ZooKeeper.deleteAll();
-       }
-
-       /**
-        * Tests add operation with default {@link CreateMode}.
-        */
-       @Test
-       public void testAdd() throws Exception {
-               LongStateStorage longStateStorage = new LongStateStorage();
-               ZooKeeperStateHandleStore<Long> store = new 
ZooKeeperStateHandleStore<Long>(
-                               ZooKeeper.getClient(), longStateStorage, 
Executors.directExecutor());
-
-               // Config
-               final String pathInZooKeeper = "/testAdd";
-               final Long state = 1239712317L;
-
-               // Test
-               store.add(pathInZooKeeper, state);
-
-               // Verify
-               // State handle created
-               assertEquals(1, store.getAll().size());
-               assertEquals(state, store.get(pathInZooKeeper).retrieveState());
-
-               // Path created and is persistent
-               Stat stat = 
ZooKeeper.getClient().checkExists().forPath(pathInZooKeeper);
-               assertNotNull(stat);
-               assertEquals(0, stat.getEphemeralOwner());
-
-               // Data is equal
-               @SuppressWarnings("unchecked")
-               Long actual = ((RetrievableStateHandle<Long>) 
InstantiationUtil.deserializeObject(
-                               
ZooKeeper.getClient().getData().forPath(pathInZooKeeper),
-                               
ClassLoader.getSystemClassLoader())).retrieveState();
-
-               assertEquals(state, actual);
-       }
-
-       /**
-        * Tests that {@link CreateMode} is respected.
-        */
-       @Test
-       public void testAddWithCreateMode() throws Exception {
-               LongStateStorage longStateStorage = new LongStateStorage();
-               ZooKeeperStateHandleStore<Long> store = new 
ZooKeeperStateHandleStore<Long>(
-                               ZooKeeper.getClient(), longStateStorage, 
Executors.directExecutor());
-
-               // Config
-               Long state = 3457347234L;
-
-               CreateMode[] modes = CreateMode.values();
-               for (int i = 0; i < modes.length; i++) {
-                       CreateMode mode = modes[i];
-                       state += i;
-
-                       String pathInZooKeeper = "/testAddWithCreateMode" + 
mode.name();
-
-                       // Test
-                       store.add(pathInZooKeeper, state, mode);
-
-                       if (mode.isSequential()) {
-                               // Figure out the sequential ID
-                               List<String> paths = 
ZooKeeper.getClient().getChildren().forPath("/");
-                               for (String p : paths) {
-                                       if 
(p.startsWith("testAddWithCreateMode" + mode.name())) {
-                                               pathInZooKeeper = "/" + p;
-                                               break;
-                                       }
-                               }
-                       }
-
-                       // Verify
-                       // State handle created
-                       assertEquals(i + 1, store.getAll().size());
-                       assertEquals(state, 
longStateStorage.getStateHandles().get(i).retrieveState());
-
-                       // Path created
-                       Stat stat = 
ZooKeeper.getClient().checkExists().forPath(pathInZooKeeper);
-
-                       assertNotNull(stat);
-
-                       // Is ephemeral or persistent
-                       if (mode.isEphemeral()) {
-                               assertTrue(stat.getEphemeralOwner() != 0);
-                       }
-                       else {
-                               assertEquals(0, stat.getEphemeralOwner());
-                       }
-
-                       // Data is equal
-                       @SuppressWarnings("unchecked")
-                       Long actual = ((RetrievableStateHandle<Long>) 
InstantiationUtil.deserializeObject(
-                                       
ZooKeeper.getClient().getData().forPath(pathInZooKeeper),
-                                       
ClassLoader.getSystemClassLoader())).retrieveState();
-
-                       assertEquals(state, actual);
-               }
-       }
-
-       /**
-        * Tests that an existing path throws an Exception.
-        */
-       @Test(expected = Exception.class)
-       public void testAddAlreadyExistingPath() throws Exception {
-               LongStateStorage stateHandleProvider = new LongStateStorage();
-
-               ZooKeeperStateHandleStore<Long> store = new 
ZooKeeperStateHandleStore<>(
-                               ZooKeeper.getClient(), stateHandleProvider, 
Executors.directExecutor());
-
-               
ZooKeeper.getClient().create().forPath("/testAddAlreadyExistingPath");
-
-               store.add("/testAddAlreadyExistingPath", 1L);
-       }
-
-       /**
-        * Tests that the created state handle is discarded if ZooKeeper create 
fails.
-        */
-       @Test
-       public void testAddDiscardStateHandleAfterFailure() throws Exception {
-               // Setup
-               LongStateStorage stateHandleProvider = new LongStateStorage();
-
-               CuratorFramework client = spy(ZooKeeper.getClient());
-               when(client.create()).thenThrow(new RuntimeException("Expected 
test Exception."));
-
-               ZooKeeperStateHandleStore<Long> store = new 
ZooKeeperStateHandleStore<>(
-                               client, stateHandleProvider, 
Executors.directExecutor());
-
-               // Config
-               final String pathInZooKeeper = 
"/testAddDiscardStateHandleAfterFailure";
-               final Long state = 81282227L;
-
-               try {
-                       // Test
-                       store.add(pathInZooKeeper, state);
-                       fail("Did not throw expected exception");
-               }
-               catch (Exception ignored) {
-               }
-
-               // Verify
-               // State handle created and discarded
-               assertEquals(1, stateHandleProvider.getStateHandles().size());
-               assertEquals(state, 
stateHandleProvider.getStateHandles().get(0).retrieveState());
-               assertEquals(1, 
stateHandleProvider.getStateHandles().get(0).getNumberOfDiscardCalls());
-       }
-
-       /**
-        * Tests that a state handle is replaced.
-        */
-       @Test
-       public void testReplace() throws Exception {
-               // Setup
-               LongStateStorage stateHandleProvider = new LongStateStorage();
-
-               ZooKeeperStateHandleStore<Long> store = new 
ZooKeeperStateHandleStore<>(
-                               ZooKeeper.getClient(), stateHandleProvider, 
Executors.directExecutor());
-
-               // Config
-               final String pathInZooKeeper = "/testReplace";
-               final Long initialState = 30968470898L;
-               final Long replaceState = 88383776661L;
-
-               // Test
-               store.add(pathInZooKeeper, initialState);
-               store.replace(pathInZooKeeper, 0, replaceState);
-
-               // Verify
-               // State handles created
-               assertEquals(2, stateHandleProvider.getStateHandles().size());
-               assertEquals(initialState, 
stateHandleProvider.getStateHandles().get(0).retrieveState());
-               assertEquals(replaceState, 
stateHandleProvider.getStateHandles().get(1).retrieveState());
-
-               // Path created and is persistent
-               Stat stat = 
ZooKeeper.getClient().checkExists().forPath(pathInZooKeeper);
-               assertNotNull(stat);
-               assertEquals(0, stat.getEphemeralOwner());
-
-               // Data is equal
-               @SuppressWarnings("unchecked")
-               Long actual = ((RetrievableStateHandle<Long>) 
InstantiationUtil.deserializeObject(
-                               
ZooKeeper.getClient().getData().forPath(pathInZooKeeper),
-                               
ClassLoader.getSystemClassLoader())).retrieveState();
-
-               assertEquals(replaceState, actual);
-       }
-
-       /**
-        * Tests that a non existing path throws an Exception.
-        */
-       @Test(expected = Exception.class)
-       public void testReplaceNonExistingPath() throws Exception {
-               RetrievableStateStorageHelper<Long> stateStorage = new 
LongStateStorage();
-
-               ZooKeeperStateHandleStore<Long> store = new 
ZooKeeperStateHandleStore<>(
-                               ZooKeeper.getClient(), stateStorage, 
Executors.directExecutor());
-
-               store.replace("/testReplaceNonExistingPath", 0, 1L);
-       }
-
-       /**
-        * Tests that the replace state handle is discarded if ZooKeeper 
setData fails.
-        */
-       @Test
-       public void testReplaceDiscardStateHandleAfterFailure() throws 
Exception {
-               // Setup
-               LongStateStorage stateHandleProvider = new LongStateStorage();
-
-               CuratorFramework client = spy(ZooKeeper.getClient());
-               when(client.setData()).thenThrow(new RuntimeException("Expected 
test Exception."));
-
-               ZooKeeperStateHandleStore<Long> store = new 
ZooKeeperStateHandleStore<>(
-                               client, stateHandleProvider, 
Executors.directExecutor());
-
-               // Config
-               final String pathInZooKeeper = 
"/testReplaceDiscardStateHandleAfterFailure";
-               final Long initialState = 30968470898L;
-               final Long replaceState = 88383776661L;
-
-               // Test
-               store.add(pathInZooKeeper, initialState);
-
-               try {
-                       store.replace(pathInZooKeeper, 0, replaceState);
-                       fail("Did not throw expected exception");
-               }
-               catch (Exception ignored) {
-               }
-
-               // Verify
-               // State handle created and discarded
-               assertEquals(2, stateHandleProvider.getStateHandles().size());
-               assertEquals(initialState, 
stateHandleProvider.getStateHandles().get(0).retrieveState());
-               assertEquals(replaceState, 
stateHandleProvider.getStateHandles().get(1).retrieveState());
-               assertEquals(1, 
stateHandleProvider.getStateHandles().get(1).getNumberOfDiscardCalls());
-
-               // Initial value
-               @SuppressWarnings("unchecked")
-               Long actual = ((RetrievableStateHandle<Long>) 
InstantiationUtil.deserializeObject(
-                               
ZooKeeper.getClient().getData().forPath(pathInZooKeeper),
-                               
ClassLoader.getSystemClassLoader())).retrieveState();
-
-               assertEquals(initialState, actual);
-       }
-
-       /**
-        * Tests get operation.
-        */
-       @Test
-       public void testGetAndExists() throws Exception {
-               // Setup
-               LongStateStorage stateHandleProvider = new LongStateStorage();
-
-               ZooKeeperStateHandleStore<Long> store = new 
ZooKeeperStateHandleStore<>(
-                               ZooKeeper.getClient(), stateHandleProvider, 
Executors.directExecutor());
-
-               // Config
-               final String pathInZooKeeper = "/testGetAndExists";
-               final Long state = 311222268470898L;
-
-               // Test
-               assertEquals(-1, store.exists(pathInZooKeeper));
-
-               store.add(pathInZooKeeper, state);
-               RetrievableStateHandle<Long> actual = 
store.get(pathInZooKeeper);
-
-               // Verify
-               assertEquals(state, actual.retrieveState());
-               assertTrue(store.exists(pathInZooKeeper) >= 0);
-       }
-
-       /**
-        * Tests that a non existing path throws an Exception.
-        */
-       @Test(expected = Exception.class)
-       public void testGetNonExistingPath() throws Exception {
-               LongStateStorage stateHandleProvider = new LongStateStorage();
-
-               ZooKeeperStateHandleStore<Long> store = new 
ZooKeeperStateHandleStore<>(
-                               ZooKeeper.getClient(), stateHandleProvider, 
Executors.directExecutor());
-
-               store.get("/testGetNonExistingPath");
-       }
-
-       /**
-        * Tests that all added state is returned.
-        */
-       @Test
-       public void testGetAll() throws Exception {
-               // Setup
-               LongStateStorage stateHandleProvider = new LongStateStorage();
-
-               ZooKeeperStateHandleStore<Long> store = new 
ZooKeeperStateHandleStore<>(
-                               ZooKeeper.getClient(), stateHandleProvider, 
Executors.directExecutor());
-
-               // Config
-               final String pathInZooKeeper = "/testGetAll";
-
-               final Set<Long> expected = new HashSet<>();
-               expected.add(311222268470898L);
-               expected.add(132812888L);
-               expected.add(27255442L);
-               expected.add(11122233124L);
-
-               // Test
-               for (long val : expected) {
-                       store.add(pathInZooKeeper, val, 
CreateMode.PERSISTENT_SEQUENTIAL);
-               }
-
-               for (Tuple2<RetrievableStateHandle<Long>, String> val : 
store.getAll()) {
-                       assertTrue(expected.remove(val.f0.retrieveState()));
-               }
-               assertEquals(0, expected.size());
-       }
-
-       /**
-        * Tests that the state is returned sorted.
-        */
-       @Test
-       public void testGetAllSortedByName() throws Exception {
-               // Setup
-               LongStateStorage stateHandleProvider = new LongStateStorage();
-
-               ZooKeeperStateHandleStore<Long> store = new 
ZooKeeperStateHandleStore<>(
-                               ZooKeeper.getClient(), stateHandleProvider, 
Executors.directExecutor());
-
-               // Config
-               final String pathInZooKeeper = "/testGetAllSortedByName";
-
-               final Long[] expected = new Long[] {
-                               311222268470898L, 132812888L, 27255442L, 
11122233124L };
-
-               // Test
-               for (long val : expected) {
-                       store.add(pathInZooKeeper, val, 
CreateMode.PERSISTENT_SEQUENTIAL);
-               }
-
-               List<Tuple2<RetrievableStateHandle<Long>, String>> actual = 
store.getAllSortedByName();
-               assertEquals(expected.length, actual.size());
-
-               for (int i = 0; i < expected.length; i++) {
-                       assertEquals(expected[i], 
actual.get(i).f0.retrieveState());
-               }
-       }
-
-       /**
-        * Tests that state handles are correctly removed.
-        */
-       @Test
-       public void testRemove() throws Exception {
-               // Setup
-               LongStateStorage stateHandleProvider = new LongStateStorage();
-
-               ZooKeeperStateHandleStore<Long> store = new 
ZooKeeperStateHandleStore<>(
-                               ZooKeeper.getClient(), stateHandleProvider, 
Executors.directExecutor());
-
-               // Config
-               final String pathInZooKeeper = "/testRemove";
-               final Long state = 27255442L;
-
-               store.add(pathInZooKeeper, state);
-
-               // Test
-               store.remove(pathInZooKeeper);
-
-               // Verify discarded
-               assertEquals(0, 
ZooKeeper.getClient().getChildren().forPath("/").size());
-       }
-
-       /**
-        * Tests that state handles are correctly removed with a callback.
-        */
-       @Test
-       public void testRemoveWithCallback() throws Exception {
-               // Setup
-               LongStateStorage stateHandleProvider = new LongStateStorage();
-
-               ZooKeeperStateHandleStore<Long> store = new 
ZooKeeperStateHandleStore<>(
-                               ZooKeeper.getClient(), stateHandleProvider, 
Executors.directExecutor());
-
-               // Config
-               final String pathInZooKeeper = "/testRemoveWithCallback";
-               final Long state = 27255442L;
-
-               store.add(pathInZooKeeper, state);
-
-               final CountDownLatch sync = new CountDownLatch(1);
-               BackgroundCallback callback = mock(BackgroundCallback.class);
-               doAnswer(new Answer<Void>() {
-                       @Override
-                       public Void answer(InvocationOnMock invocation) throws 
Throwable {
-                               sync.countDown();
-                               return null;
-                       }
-               }).when(callback).processResult(eq(ZooKeeper.getClient()), 
any(CuratorEvent.class));
-
-               // Test
-               store.remove(pathInZooKeeper, callback);
-
-               // Verify discarded and callback called
-               assertEquals(0, 
ZooKeeper.getClient().getChildren().forPath("/").size());
-
-               sync.await();
-
-               verify(callback, times(1))
-                               .processResult(eq(ZooKeeper.getClient()), 
any(CuratorEvent.class));
-       }
-
-       /**
-        * Tests that state handles are correctly discarded.
-        */
-       @Test
-       public void testRemoveAndDiscardState() throws Exception {
-               // Setup
-               LongStateStorage stateHandleProvider = new LongStateStorage();
-
-               ZooKeeperStateHandleStore<Long> store = new 
ZooKeeperStateHandleStore<>(
-                               ZooKeeper.getClient(), stateHandleProvider, 
Executors.directExecutor());
-
-               // Config
-               final String pathInZooKeeper = "/testDiscard";
-               final Long state = 27255442L;
-
-               store.add(pathInZooKeeper, state);
-
-               // Test
-               store.removeAndDiscardState(pathInZooKeeper);
-
-               // Verify discarded
-               assertEquals(0, 
ZooKeeper.getClient().getChildren().forPath("/").size());
-       }
-
-       /** Tests that all state handles are correctly discarded. */
-       @Test
-       public void testRemoveAndDiscardAllState() throws Exception {
-               // Setup
-               LongStateStorage stateHandleProvider = new LongStateStorage();
-
-               ZooKeeperStateHandleStore<Long> store = new 
ZooKeeperStateHandleStore<>(
-                               ZooKeeper.getClient(), stateHandleProvider, 
Executors.directExecutor());
-
-               // Config
-               final String pathInZooKeeper = "/testDiscardAll";
-
-               final Set<Long> expected = new HashSet<>();
-               expected.add(311222268470898L);
-               expected.add(132812888L);
-               expected.add(27255442L);
-               expected.add(11122233124L);
-
-               // Test
-               for (long val : expected) {
-                       store.add(pathInZooKeeper, val, 
CreateMode.PERSISTENT_SEQUENTIAL);
-               }
-
-               store.removeAndDiscardAllState();
-
-               // Verify all discarded
-               assertEquals(0, 
ZooKeeper.getClient().getChildren().forPath("/").size());
-       }
-
-       /**
-        * Tests that the ZooKeeperStateHandleStore can handle corrupted data 
by ignoring the respective
-        * ZooKeeper ZNodes.
-        */
-       @Test
-       public void testCorruptedData() throws Exception {
-               LongStateStorage stateStorage = new LongStateStorage();
-
-               ZooKeeperStateHandleStore<Long> store = new 
ZooKeeperStateHandleStore<>(
-                       ZooKeeper.getClient(),
-                       stateStorage,
-                       Executors.directExecutor());
-
-               final Collection<Long> input = new HashSet<>();
-               input.add(1L);
-               input.add(2L);
-               input.add(3L);
-
-               for (Long aLong : input) {
-                       store.add("/" + aLong, aLong);
-               }
-
-               // corrupt one of the entries
-               ZooKeeper.getClient().setData().forPath("/" + 2, new byte[2]);
-
-               List<Tuple2<RetrievableStateHandle<Long>, String>> allEntries = 
store.getAll();
-
-               Collection<Long> expected = new HashSet<>(input);
-               expected.remove(2L);
-
-               Collection<Long> actual = new HashSet<>(expected.size());
-
-               for (Tuple2<RetrievableStateHandle<Long>, String> entry : 
allEntries) {
-                       actual.add(entry.f0.retrieveState());
-               }
-
-               assertEquals(expected, actual);
-
-               // check the same for the all sorted by name call
-               allEntries = store.getAllSortedByName();
-
-               actual.clear();
-
-               for (Tuple2<RetrievableStateHandle<Long>, String> entry : 
allEntries) {
-                       actual.add(entry.f0.retrieveState());
-               }
-
-               assertEquals(expected, actual);
-       }
-
-       // 
---------------------------------------------------------------------------------------------
-       // Simple test helpers
-       // 
---------------------------------------------------------------------------------------------
-
-       private static class LongStateStorage implements 
RetrievableStateStorageHelper<Long> {
-
-               private final List<LongRetrievableStateHandle> stateHandles = 
new ArrayList<>();
-
-               @Override
-               public RetrievableStateHandle<Long> store(Long state) throws 
Exception {
-                       LongRetrievableStateHandle stateHandle = new 
LongRetrievableStateHandle(state);
-                       stateHandles.add(stateHandle);
-
-                       return stateHandle;
-               }
-
-               List<LongRetrievableStateHandle> getStateHandles() {
-                       return stateHandles;
-               }
-       }
-
-       private static class LongRetrievableStateHandle implements 
RetrievableStateHandle<Long> {
-
-               private static final long serialVersionUID = 
-3555329254423838912L;
-
-               private final Long state;
-
-               private int numberOfDiscardCalls;
-
-               public LongRetrievableStateHandle(Long state) {
-                       this.state = state;
-               }
-
-               @Override
-               public Long retrieveState() throws Exception {
-                       return state;
-               }
-
-               @Override
-               public void discardState() throws Exception {
-                       numberOfDiscardCalls++;
-               }
-
-               @Override
-               public long getStateSize() {
-                       return 0;
-               }
-
-               public int getNumberOfDiscardCalls() {
-                       return numberOfDiscardCalls;
-               }
-       }
-}

Reply via email to