CURATOR-161 - Added support for guaranteed removal of watches. This includes refactoring the FailedDeleteManager code into a FailedOperationManager to allow subclassing.
Project: http://git-wip-us.apache.org/repos/asf/curator/repo Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/22d034af Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/22d034af Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/22d034af Branch: refs/heads/CURATOR-161 Commit: 22d034af90987940420649c5f320e8dc09910c8a Parents: 04caf36 Author: Cameron McKenzie <[email protected]> Authored: Wed May 13 09:28:45 2015 +1000 Committer: Cameron McKenzie <[email protected]> Committed: Wed May 13 09:28:45 2015 +1000 ---------------------------------------------------------------------- .../curator/framework/CuratorFramework.java | 4 + .../curator/framework/api/DeleteBuilder.java | 2 +- .../curator/framework/api/Guaranteeable.java | 20 +-- .../framework/api/GuaranteeableDelete.java | 39 ++++++ .../framework/api/RemoveWatchesType.java | 2 +- .../framework/imps/CuratorFrameworkImpl.java | 8 ++ .../framework/imps/DeleteBuilderImpl.java | 4 +- .../framework/imps/FailedDeleteManager.java | 39 +----- .../framework/imps/FailedOperationManager.java | 65 ++++++++++ .../imps/FailedRemoveWatchManager.java | 56 ++++++++ .../framework/imps/NamespaceWatcherMap.java | 8 ++ .../imps/RemoveWatchesBuilderImpl.java | 56 ++++++-- .../framework/imps/TestFailedDeleteManager.java | 9 +- .../framework/imps/TestRemoveWatches.java | 129 +++++++++++++++++++ 14 files changed, 377 insertions(+), 64 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/curator/blob/22d034af/curator-framework/src/main/java/org/apache/curator/framework/CuratorFramework.java ---------------------------------------------------------------------- diff --git a/curator-framework/src/main/java/org/apache/curator/framework/CuratorFramework.java b/curator-framework/src/main/java/org/apache/curator/framework/CuratorFramework.java index 4b30fd4..2bce552 100644 --- a/curator-framework/src/main/java/org/apache/curator/framework/CuratorFramework.java +++ b/curator-framework/src/main/java/org/apache/curator/framework/CuratorFramework.java @@ -216,7 +216,11 @@ public interface CuratorFramework extends Closeable * Call this method on watchers you are no longer interested in. * * @param watcher the watcher + * + * @deprecated As of ZooKeeper 3.5 Curators recipes will handle removing watcher references + * when they are no longer used. */ + @Deprecated public void clearWatcherReferences(Watcher watcher); /** http://git-wip-us.apache.org/repos/asf/curator/blob/22d034af/curator-framework/src/main/java/org/apache/curator/framework/api/DeleteBuilder.java ---------------------------------------------------------------------- diff --git a/curator-framework/src/main/java/org/apache/curator/framework/api/DeleteBuilder.java b/curator-framework/src/main/java/org/apache/curator/framework/api/DeleteBuilder.java index 3a3faf7..893e825 100644 --- a/curator-framework/src/main/java/org/apache/curator/framework/api/DeleteBuilder.java +++ b/curator-framework/src/main/java/org/apache/curator/framework/api/DeleteBuilder.java @@ -18,6 +18,6 @@ */ package org.apache.curator.framework.api; -public interface DeleteBuilder extends Guaranteeable, ChildrenDeletable +public interface DeleteBuilder extends GuaranteeableDelete, ChildrenDeletable { } http://git-wip-us.apache.org/repos/asf/curator/blob/22d034af/curator-framework/src/main/java/org/apache/curator/framework/api/Guaranteeable.java ---------------------------------------------------------------------- diff --git a/curator-framework/src/main/java/org/apache/curator/framework/api/Guaranteeable.java b/curator-framework/src/main/java/org/apache/curator/framework/api/Guaranteeable.java index 481911b..b43d6b0 100644 --- a/curator-framework/src/main/java/org/apache/curator/framework/api/Guaranteeable.java +++ b/curator-framework/src/main/java/org/apache/curator/framework/api/Guaranteeable.java @@ -18,23 +18,15 @@ */ package org.apache.curator.framework.api; -public interface Guaranteeable extends BackgroundVersionable +public interface Guaranteeable<T> { /** - * <p> - * Solves this edge case: deleting a node can fail due to connection issues. Further, - * if the node was ephemeral, the node will not get auto-deleted as the session is still valid. - * This can wreak havoc with lock implementations. - * </p> - * - * <p> - * When <code>guaranteed</code> is set, Curator will record failed node deletions and - * attempt to delete them in the background until successful. NOTE: you will still get an - * exception when the deletion fails. But, you can be assured that as long as the - * {@link org.apache.curator.framework.CuratorFramework} instance is open attempts will be made to delete the node. - * </p> + * Solves edge cases where an operation may succeed on the server but connection failure occurs before a + * response can be successfully returned to the client. + * + * @see org.apache.curator.framework.api.GuaranteeableDelete * * @return this */ - public ChildrenDeletable guaranteed(); + public T guaranteed(); } http://git-wip-us.apache.org/repos/asf/curator/blob/22d034af/curator-framework/src/main/java/org/apache/curator/framework/api/GuaranteeableDelete.java ---------------------------------------------------------------------- diff --git a/curator-framework/src/main/java/org/apache/curator/framework/api/GuaranteeableDelete.java b/curator-framework/src/main/java/org/apache/curator/framework/api/GuaranteeableDelete.java new file mode 100644 index 0000000..d04e7ea --- /dev/null +++ b/curator-framework/src/main/java/org/apache/curator/framework/api/GuaranteeableDelete.java @@ -0,0 +1,39 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.curator.framework.api; + +/** + * <p> + * Solves this edge case: deleting a node can fail due to connection issues. Further, + * if the node was ephemeral, the node will not get auto-deleted as the session is still valid. + * This can wreak havoc with lock implementations. + * </p> + * + * <p> + * When <code>guaranteed</code> is set, Curator will record failed node deletions and + * attempt to delete them in the background until successful. NOTE: you will still get an + * exception when the deletion fails. But, you can be assured that as long as the + * {@link org.apache.curator.framework.CuratorFramework} instance is open attempts will be made to delete the node. + * </p> + * + * @return this + */ +public interface GuaranteeableDelete extends Guaranteeable<ChildrenDeletable>, BackgroundVersionable +{ +} http://git-wip-us.apache.org/repos/asf/curator/blob/22d034af/curator-framework/src/main/java/org/apache/curator/framework/api/RemoveWatchesType.java ---------------------------------------------------------------------- diff --git a/curator-framework/src/main/java/org/apache/curator/framework/api/RemoveWatchesType.java b/curator-framework/src/main/java/org/apache/curator/framework/api/RemoveWatchesType.java index 1123afd..3112eac 100644 --- a/curator-framework/src/main/java/org/apache/curator/framework/api/RemoveWatchesType.java +++ b/curator-framework/src/main/java/org/apache/curator/framework/api/RemoveWatchesType.java @@ -6,7 +6,7 @@ import org.apache.zookeeper.Watcher.WatcherType; * Builder to allow the specification of whether it is acceptable to remove client side watch information * in the case where ZK cannot be contacted. */ -public interface RemoveWatchesType extends RemoveWatchesLocal +public interface RemoveWatchesType extends RemoveWatchesLocal, Guaranteeable<BackgroundPathableQuietly<Void>> { /** http://git-wip-us.apache.org/repos/asf/curator/blob/22d034af/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java ---------------------------------------------------------------------- diff --git a/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java b/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java index 5caff7d..b4a1d93 100644 --- a/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java +++ b/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java @@ -72,6 +72,7 @@ public class CuratorFrameworkImpl implements CuratorFramework private final AtomicReference<AuthInfo> authInfo = new AtomicReference<AuthInfo>(); private final byte[] defaultData; private final FailedDeleteManager failedDeleteManager; + private final FailedRemoveWatchManager failedRemoveWatcherManager; private final CompressionProvider compressionProvider; private final ACLProvider aclProvider; private final NamespaceFacadeCache namespaceFacadeCache; @@ -147,6 +148,7 @@ public class CuratorFrameworkImpl implements CuratorFramework } failedDeleteManager = new FailedDeleteManager(this); + failedRemoveWatcherManager = new FailedRemoveWatchManager(this); namespaceFacadeCache = new NamespaceFacadeCache(this); } @@ -190,6 +192,7 @@ public class CuratorFrameworkImpl implements CuratorFramework connectionStateManager = parent.connectionStateManager; defaultData = parent.defaultData; failedDeleteManager = parent.failedDeleteManager; + failedRemoveWatcherManager = parent.failedRemoveWatcherManager; compressionProvider = parent.compressionProvider; aclProvider = parent.aclProvider; namespaceFacadeCache = parent.namespaceFacadeCache; @@ -487,6 +490,11 @@ public class CuratorFrameworkImpl implements CuratorFramework { return failedDeleteManager; } + + FailedRemoveWatchManager getFailedRemoveWatcherManager() + { + return failedRemoveWatcherManager; + } RetryLoop newRetryLoop() { http://git-wip-us.apache.org/repos/asf/curator/blob/22d034af/curator-framework/src/main/java/org/apache/curator/framework/imps/DeleteBuilderImpl.java ---------------------------------------------------------------------- diff --git a/curator-framework/src/main/java/org/apache/curator/framework/imps/DeleteBuilderImpl.java b/curator-framework/src/main/java/org/apache/curator/framework/imps/DeleteBuilderImpl.java index 5d8b846..51691dd 100644 --- a/curator-framework/src/main/java/org/apache/curator/framework/imps/DeleteBuilderImpl.java +++ b/curator-framework/src/main/java/org/apache/curator/framework/imps/DeleteBuilderImpl.java @@ -203,7 +203,7 @@ class DeleteBuilderImpl implements DeleteBuilder, BackgroundOperation<String> @Override public void retriesExhausted(OperationAndData<String> operationAndData) { - client.getFailedDeleteManager().addFailedDelete(unfixedPath); + client.getFailedDeleteManager().addFailedOperation(unfixedPath); } }; } @@ -253,7 +253,7 @@ class DeleteBuilderImpl implements DeleteBuilder, BackgroundOperation<String> //Only retry a guaranteed delete if it's a retryable error if( RetryLoop.isRetryException(e) && guaranteed ) { - client.getFailedDeleteManager().addFailedDelete(unfixedPath); + client.getFailedDeleteManager().addFailedOperation(unfixedPath); } throw e; } http://git-wip-us.apache.org/repos/asf/curator/blob/22d034af/curator-framework/src/main/java/org/apache/curator/framework/imps/FailedDeleteManager.java ---------------------------------------------------------------------- diff --git a/curator-framework/src/main/java/org/apache/curator/framework/imps/FailedDeleteManager.java b/curator-framework/src/main/java/org/apache/curator/framework/imps/FailedDeleteManager.java index deb7f40..934ae40 100644 --- a/curator-framework/src/main/java/org/apache/curator/framework/imps/FailedDeleteManager.java +++ b/curator-framework/src/main/java/org/apache/curator/framework/imps/FailedDeleteManager.java @@ -19,45 +19,18 @@ package org.apache.curator.framework.imps; import org.apache.curator.framework.CuratorFramework; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -class FailedDeleteManager +class FailedDeleteManager extends FailedOperationManager<String> { - private final Logger log = LoggerFactory.getLogger(getClass()); - private final CuratorFramework client; - - volatile FailedDeleteManagerListener debugListener = null; - - interface FailedDeleteManagerListener - { - public void pathAddedForDelete(String path); - } - FailedDeleteManager(CuratorFramework client) { - this.client = client; + super(client); } - void addFailedDelete(String path) + @Override + protected void executeGuaranteedOperationInBackground(String path) + throws Exception { - if ( debugListener != null ) - { - debugListener.pathAddedForDelete(path); - } - - - if ( client.getState() == CuratorFrameworkState.STARTED ) - { - log.debug("Path being added to guaranteed delete set: " + path); - try - { - client.delete().guaranteed().inBackground().forPath(path); - } - catch ( Exception e ) - { - addFailedDelete(path); - } - } + client.delete().guaranteed().inBackground().forPath(path); } } http://git-wip-us.apache.org/repos/asf/curator/blob/22d034af/curator-framework/src/main/java/org/apache/curator/framework/imps/FailedOperationManager.java ---------------------------------------------------------------------- diff --git a/curator-framework/src/main/java/org/apache/curator/framework/imps/FailedOperationManager.java b/curator-framework/src/main/java/org/apache/curator/framework/imps/FailedOperationManager.java new file mode 100644 index 0000000..a1efde2 --- /dev/null +++ b/curator-framework/src/main/java/org/apache/curator/framework/imps/FailedOperationManager.java @@ -0,0 +1,65 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.curator.framework.imps; + +import org.apache.curator.framework.CuratorFramework; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +abstract class FailedOperationManager<T> +{ + private final Logger log = LoggerFactory.getLogger(getClass()); + protected final CuratorFramework client; + + volatile FailedOperationManagerListener<T> debugListener = null; + + interface FailedOperationManagerListener<T> + { + public void pathAddedForGuaranteedOperation(T detail); + } + + FailedOperationManager(CuratorFramework client) + { + this.client = client; + } + + void addFailedOperation(T details) + { + if ( debugListener != null ) + { + debugListener.pathAddedForGuaranteedOperation(details); + } + + + if ( client.getState() == CuratorFrameworkState.STARTED ) + { + log.debug("Details being added to guaranteed operation set: " + details); + try + { + executeGuaranteedOperationInBackground(details); + } + catch ( Exception e ) + { + addFailedOperation(details); + } + } + } + + protected abstract void executeGuaranteedOperationInBackground(T details) throws Exception; +} http://git-wip-us.apache.org/repos/asf/curator/blob/22d034af/curator-framework/src/main/java/org/apache/curator/framework/imps/FailedRemoveWatchManager.java ---------------------------------------------------------------------- diff --git a/curator-framework/src/main/java/org/apache/curator/framework/imps/FailedRemoveWatchManager.java b/curator-framework/src/main/java/org/apache/curator/framework/imps/FailedRemoveWatchManager.java new file mode 100644 index 0000000..f954e2a --- /dev/null +++ b/curator-framework/src/main/java/org/apache/curator/framework/imps/FailedRemoveWatchManager.java @@ -0,0 +1,56 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.curator.framework.imps; + +import org.apache.curator.framework.CuratorFramework; +import org.apache.zookeeper.Watcher; + +class FailedRemoveWatchManager extends FailedOperationManager<FailedRemoveWatchManager.FailedRemoveWatchDetails> +{ + FailedRemoveWatchManager(CuratorFramework client) + { + super(client); + } + + @Override + protected void executeGuaranteedOperationInBackground(FailedRemoveWatchDetails details) + throws Exception + { + if(details.watcher == null) + { + client.watches().removeAll().guaranteed().inBackground().forPath(details.path); + } + else + { + client.watches().remove(details.watcher).guaranteed().inBackground().forPath(details.path); + } + } + + static class FailedRemoveWatchDetails + { + public final String path; + public final Watcher watcher; + + public FailedRemoveWatchDetails(String path, Watcher watcher) + { + this.path = path; + this.watcher = watcher; + } + } +} http://git-wip-us.apache.org/repos/asf/curator/blob/22d034af/curator-framework/src/main/java/org/apache/curator/framework/imps/NamespaceWatcherMap.java ---------------------------------------------------------------------- diff --git a/curator-framework/src/main/java/org/apache/curator/framework/imps/NamespaceWatcherMap.java b/curator-framework/src/main/java/org/apache/curator/framework/imps/NamespaceWatcherMap.java index e5aecb2..f656ba1 100644 --- a/curator-framework/src/main/java/org/apache/curator/framework/imps/NamespaceWatcherMap.java +++ b/curator-framework/src/main/java/org/apache/curator/framework/imps/NamespaceWatcherMap.java @@ -70,6 +70,14 @@ class NamespaceWatcherMap implements Closeable { return map.remove(key); } + + /** + * Remove all watchers for a given path + * @param path + */ + void removeAllForPath(String path) { + + } @VisibleForTesting boolean isEmpty() http://git-wip-us.apache.org/repos/asf/curator/blob/22d034af/curator-framework/src/main/java/org/apache/curator/framework/imps/RemoveWatchesBuilderImpl.java ---------------------------------------------------------------------- diff --git a/curator-framework/src/main/java/org/apache/curator/framework/imps/RemoveWatchesBuilderImpl.java b/curator-framework/src/main/java/org/apache/curator/framework/imps/RemoveWatchesBuilderImpl.java index c9868f4..27d05da 100644 --- a/curator-framework/src/main/java/org/apache/curator/framework/imps/RemoveWatchesBuilderImpl.java +++ b/curator-framework/src/main/java/org/apache/curator/framework/imps/RemoveWatchesBuilderImpl.java @@ -27,8 +27,9 @@ public class RemoveWatchesBuilderImpl implements RemoveWatchesBuilder, RemoveWat private CuratorFrameworkImpl client; private Watcher watcher; private WatcherType watcherType; + private boolean guaranteed; private boolean local; - private boolean quietly; + private boolean quietly; private Backgrounding backgrounding; public RemoveWatchesBuilderImpl(CuratorFrameworkImpl client) @@ -36,6 +37,7 @@ public class RemoveWatchesBuilderImpl implements RemoveWatchesBuilder, RemoveWat this.client = client; this.watcher = null; this.watcherType = WatcherType.Any; + this.guaranteed = false; this.local = false; this.quietly = false; this.backgrounding = new Backgrounding(); @@ -44,14 +46,26 @@ public class RemoveWatchesBuilderImpl implements RemoveWatchesBuilder, RemoveWat @Override public RemoveWatchesType remove(Watcher watcher) { - this.watcher = watcher == null ? null : client.getNamespaceWatcherMap().getNamespaceWatcher(watcher); + if(watcher == null) { + this.watcher = null; + } else { + //Try and get the namespaced version of the watcher. + this.watcher = client.getNamespaceWatcherMap().get(watcher); + + //If this is not present then default to the original watcher. This shouldn't happen in practice unless the user + //has added a watch directly to the ZK client rather than via the CuratorFramework. + if(this.watcher == null) { + this.watcher = watcher; + } + } + return this; } @Override public RemoveWatchesType remove(CuratorWatcher watcher) { - this.watcher = watcher == null ? null : client.getNamespaceWatcherMap().getNamespaceWatcher(watcher); + this.watcher = watcher == null ? null : client.getNamespaceWatcherMap().get(watcher); return this; } @@ -111,6 +125,13 @@ public class RemoveWatchesBuilderImpl implements RemoveWatchesBuilder, RemoveWat backgrounding = new Backgrounding(context); return this; } + + @Override + public RemoveWatchesLocal guaranteed() + { + guaranteed = true; + return this; + } @Override public BackgroundPathableQuietly<Void> locally() @@ -143,14 +164,23 @@ public class RemoveWatchesBuilderImpl implements RemoveWatchesBuilder, RemoveWat return null; } - private void pathInBackground(String path) + private void pathInBackground(final String path) { - OperationAndData.ErrorCallback<String> errorCallback = null; + OperationAndData.ErrorCallback<String> errorCallback = new OperationAndData.ErrorCallback<String>() + { + @Override + public void retriesExhausted(OperationAndData<String> operationAndData) + { + client.getFailedRemoveWatcherManager().addFailedOperation(new FailedRemoveWatchManager.FailedRemoveWatchDetails(path, watcher)); + } + }; client.processBackgroundOperation(new OperationAndData<String>(this, path, backgrounding.getCallback(), errorCallback, backgrounding.getContext()), null); } private void pathInForeground(final String path) throws Exception { + //For the local case we don't want to use the normal retry loop and we don't want to block until a connection is available. + //We just execute the removeWatch, and if it fails, ZK will just remove local watches. if(local) { ZooKeeper zkClient = client.getZooKeeper(); @@ -184,11 +214,21 @@ public class RemoveWatchesBuilderImpl implements RemoveWatchesBuilder, RemoveWat zkClient.removeWatches(path, watcher, watcherType, local); } } - catch(KeeperException.NoWatcherException e) + catch(Exception e) { - //Swallow this exception if the quietly flag is set, otherwise rethrow. - if(!quietly) + if( RetryLoop.isRetryException(e) && guaranteed ) + { + //Setup the guaranteed handler + client.getFailedRemoveWatcherManager().addFailedOperation(new FailedRemoveWatchManager.FailedRemoveWatchDetails(path, watcher)); + throw e; + } + else if(e instanceof KeeperException.NoWatcherException && quietly) + { + //Ignore + } + else { + //Rethrow throw e; } } http://git-wip-us.apache.org/repos/asf/curator/blob/22d034af/curator-framework/src/test/java/org/apache/curator/framework/imps/TestFailedDeleteManager.java ---------------------------------------------------------------------- diff --git a/curator-framework/src/test/java/org/apache/curator/framework/imps/TestFailedDeleteManager.java b/curator-framework/src/test/java/org/apache/curator/framework/imps/TestFailedDeleteManager.java index 6599745..943529f 100644 --- a/curator-framework/src/test/java/org/apache/curator/framework/imps/TestFailedDeleteManager.java +++ b/curator-framework/src/test/java/org/apache/curator/framework/imps/TestFailedDeleteManager.java @@ -22,7 +22,6 @@ import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; import org.apache.curator.framework.api.BackgroundCallback; import org.apache.curator.framework.api.CuratorEvent; -import org.apache.curator.framework.imps.FailedDeleteManager.FailedDeleteManagerListener; import org.apache.curator.framework.state.ConnectionState; import org.apache.curator.framework.state.ConnectionStateListener; import org.apache.curator.retry.ExponentialBackoffRetry; @@ -291,11 +290,11 @@ public class TestFailedDeleteManager extends BaseClassForTests final AtomicBoolean pathAdded = new AtomicBoolean(false); - ((CuratorFrameworkImpl)client).getFailedDeleteManager().debugListener = new FailedDeleteManagerListener() + ((CuratorFrameworkImpl)client).getFailedDeleteManager().debugListener = new FailedOperationManager.FailedOperationManagerListener<String>() { @Override - public void pathAddedForDelete(String path) + public void pathAddedForGuaranteedOperation(String path) { pathAdded.set(true); } @@ -325,11 +324,11 @@ public class TestFailedDeleteManager extends BaseClassForTests final AtomicBoolean pathAdded = new AtomicBoolean(false); - ((CuratorFrameworkImpl)client).getFailedDeleteManager().debugListener = new FailedDeleteManagerListener() + ((CuratorFrameworkImpl)client).getFailedDeleteManager().debugListener = new FailedOperationManager.FailedOperationManagerListener<String>() { @Override - public void pathAddedForDelete(String path) + public void pathAddedForGuaranteedOperation(String path) { pathAdded.set(true); } http://git-wip-us.apache.org/repos/asf/curator/blob/22d034af/curator-framework/src/test/java/org/apache/curator/framework/imps/TestRemoveWatches.java ---------------------------------------------------------------------- diff --git a/curator-framework/src/test/java/org/apache/curator/framework/imps/TestRemoveWatches.java b/curator-framework/src/test/java/org/apache/curator/framework/imps/TestRemoveWatches.java index 0912c70..518f13b 100644 --- a/curator-framework/src/test/java/org/apache/curator/framework/imps/TestRemoveWatches.java +++ b/curator-framework/src/test/java/org/apache/curator/framework/imps/TestRemoveWatches.java @@ -11,6 +11,10 @@ import org.apache.curator.framework.api.CuratorEvent; import org.apache.curator.framework.api.CuratorEventType; import org.apache.curator.framework.api.CuratorListener; import org.apache.curator.framework.api.CuratorWatcher; +import org.apache.curator.framework.imps.FailedRemoveWatchManager.FailedRemoveWatchDetails; +import org.apache.curator.framework.state.ConnectionState; +import org.apache.curator.framework.state.ConnectionStateListener; +import org.apache.curator.retry.ExponentialBackoffRetry; import org.apache.curator.retry.RetryOneTime; import org.apache.curator.test.BaseClassForTests; import org.apache.curator.test.Timing; @@ -438,6 +442,131 @@ public class TestRemoveWatches extends BaseClassForTests } } + @Test + public void testGuaranteedRemoveWatch() throws Exception { + Timing timing = new Timing(); + CuratorFramework client = CuratorFrameworkFactory.builder(). + connectString(server.getConnectString()). + retryPolicy(new RetryOneTime(1)). + build(); + try + { + client.start(); + + final CountDownLatch reconnectedLatch = new CountDownLatch(1); + final CountDownLatch suspendedLatch = new CountDownLatch(1); + client.getConnectionStateListenable().addListener(new ConnectionStateListener() + { + @Override + public void stateChanged(CuratorFramework client, ConnectionState newState) + { + if(newState == ConnectionState.SUSPENDED) + { + suspendedLatch.countDown(); + } + else if(newState == ConnectionState.RECONNECTED) + { + reconnectedLatch.countDown(); + } + } + }); + + String path = "/"; + + CountDownLatch removeLatch = new CountDownLatch(1); + + Watcher watcher = new CountDownWatcher(path, removeLatch, EventType.DataWatchRemoved); + client.checkExists().usingWatcher(watcher).forPath(path); + + server.stop(); + timing.awaitLatch(suspendedLatch); + + //Remove the watch while we're not connected + try + { + client.watches().remove(watcher).guaranteed().forPath(path); + Assert.fail(); + } + catch(KeeperException.ConnectionLossException e) + { + //Expected + } + + server.restart(); + + timing.awaitLatch(removeLatch); + } + finally + { + CloseableUtils.closeQuietly(client); + } + } + + @Test + public void testGuaranteedRemoveWatchInBackground() throws Exception { + Timing timing = new Timing(); + CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), timing.connection(), + new ExponentialBackoffRetry(100, 3)); + try + { + client.start(); + + final CountDownLatch reconnectedLatch = new CountDownLatch(1); + final CountDownLatch suspendedLatch = new CountDownLatch(1); + client.getConnectionStateListenable().addListener(new ConnectionStateListener() + { + @Override + public void stateChanged(CuratorFramework client, ConnectionState newState) + { + if(newState == ConnectionState.SUSPENDED) + { + suspendedLatch.countDown(); + } + else if(newState == ConnectionState.RECONNECTED) + { + reconnectedLatch.countDown(); + } + } + }); + + final CountDownLatch guaranteeAddedLatch = new CountDownLatch(1); + + ((CuratorFrameworkImpl)client).getFailedRemoveWatcherManager().debugListener = new FailedOperationManager.FailedOperationManagerListener<FailedRemoveWatchManager.FailedRemoveWatchDetails>() + { + + @Override + public void pathAddedForGuaranteedOperation( + FailedRemoveWatchDetails detail) + { + guaranteeAddedLatch.countDown(); + } + }; + + String path = "/"; + + CountDownLatch removeLatch = new CountDownLatch(1); + + Watcher watcher = new CountDownWatcher(path, removeLatch, EventType.DataWatchRemoved); + client.checkExists().usingWatcher(watcher).forPath(path); + + server.stop(); + timing.awaitLatch(suspendedLatch); + + //Remove the watch while we're not connected + client.watches().remove(watcher).guaranteed().inBackground().forPath(path); + + timing.awaitLatch(guaranteeAddedLatch); + + server.restart(); + + timing.awaitLatch(removeLatch); + } + finally + { + CloseableUtils.closeQuietly(client); + } + } + private static class CountDownWatcher implements Watcher { private String path; private EventType eventType;
