CURATOR-161 - Initial cut of remove watches functionality. This provides the ability to remove watches, but does not yet provide a framework for observers being notified when a watch has been removed.
Project: http://git-wip-us.apache.org/repos/asf/curator/repo Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/9ff9ccd2 Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/9ff9ccd2 Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/9ff9ccd2 Branch: refs/heads/CURATOR-3.0 Commit: 9ff9ccd23c8d033b2e7d72b83a0183d05f5dd685 Parents: d4883a8 Author: Cameron McKenzie <[email protected]> Authored: Tue Dec 2 09:16:40 2014 +1100 Committer: Cameron McKenzie <[email protected]> Committed: Tue Dec 2 09:18:40 2014 +1100 ---------------------------------------------------------------------- .../curator/framework/CuratorFramework.java | 471 ++++++++++--------- .../curator/framework/api/CuratorEventType.java | 165 +++---- .../framework/api/RemoveWatchesBuilder.java | 29 ++ .../framework/api/RemoveWatchesLocal.java | 18 + .../framework/api/RemoveWatchesType.java | 19 + .../framework/imps/CuratorFrameworkImpl.java | 8 +- .../imps/RemoveWatchesBuilderImpl.java | 192 ++++++++ .../framework/imps/TestRemoveWatches.java | 218 +++++++++ 8 files changed, 806 insertions(+), 314 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/curator/blob/9ff9ccd2/curator-framework/src/main/java/org/apache/curator/framework/CuratorFramework.java ---------------------------------------------------------------------- diff --git a/curator-framework/src/main/java/org/apache/curator/framework/CuratorFramework.java b/curator-framework/src/main/java/org/apache/curator/framework/CuratorFramework.java index 9c23ddb..9d1039a 100644 --- a/curator-framework/src/main/java/org/apache/curator/framework/CuratorFramework.java +++ b/curator-framework/src/main/java/org/apache/curator/framework/CuratorFramework.java @@ -1,233 +1,238 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.curator.framework; - -import org.apache.curator.CuratorZookeeperClient; -import org.apache.curator.framework.api.*; -import org.apache.curator.framework.api.transaction.CuratorTransaction; -import org.apache.curator.framework.imps.CuratorFrameworkState; -import org.apache.curator.framework.listen.Listenable; -import org.apache.curator.framework.state.ConnectionState; -import org.apache.curator.framework.state.ConnectionStateListener; -import org.apache.curator.utils.EnsurePath; -import org.apache.zookeeper.Watcher; - -import java.io.Closeable; -import java.util.concurrent.TimeUnit; - -/** - * Zookeeper framework-style client - */ -public interface CuratorFramework extends Closeable -{ - /** - * Start the client. Most mutator methods will not work until the client is started - */ - public void start(); - - /** - * Stop the client - */ - public void close(); - - /** - * Returns the state of this instance - * - * @return state - */ - public CuratorFrameworkState getState(); - - /** - * Return true if the client is started, not closed, etc. - * - * @return true/false - * @deprecated use {@link #getState()} instead - */ - public boolean isStarted(); - - /** - * Start a create builder - * - * @return builder object - */ - public CreateBuilder create(); - - /** - * Start a delete builder - * - * @return builder object - */ - public DeleteBuilder delete(); - - /** - * Start an exists builder - * <p> - * The builder will return a Stat object as if org.apache.zookeeper.ZooKeeper.exists() were called. Thus, a null - * means that it does not exist and an actual Stat object means it does exist. - * - * @return builder object - */ - public ExistsBuilder checkExists(); - - /** - * Start a get data builder - * - * @return builder object - */ - public GetDataBuilder getData(); - - /** - * Start a set data builder - * - * @return builder object - */ - public SetDataBuilder setData(); - - /** - * Start a get children builder - * - * @return builder object - */ - public GetChildrenBuilder getChildren(); - - /** - * Start a get ACL builder - * - * @return builder object - */ - public GetACLBuilder getACL(); - - /** - * Start a set ACL builder - * - * @return builder object - */ - public SetACLBuilder setACL(); - - /** - * Start a transaction builder - * - * @return builder object - */ - public CuratorTransaction inTransaction(); - - /** - * Perform a sync on the given path - syncs are always in the background - * - * @param path the path - * @param backgroundContextObject optional context - * @deprecated use {@link #sync()} instead - */ - public void sync(String path, Object backgroundContextObject); - - /** - * Start a sync builder. Note: sync is ALWAYS in the background even - * if you don't use one of the background() methods - * - * @return builder object - */ - public SyncBuilder sync(); - - /** - * Returns the listenable interface for the Connect State - * - * @return listenable - */ - public Listenable<ConnectionStateListener> getConnectionStateListenable(); - - /** - * Returns the listenable interface for events - * - * @return listenable - */ - public Listenable<CuratorListener> getCuratorListenable(); - - /** - * Returns the listenable interface for unhandled errors - * - * @return listenable - */ - public Listenable<UnhandledErrorListener> getUnhandledErrorListenable(); - - /** - * Returns a facade of the current instance that does _not_ automatically - * pre-pend the namespace to all paths - * - * @return facade - * @deprecated use {@link #usingNamespace} passing <code>null</code> - */ - public CuratorFramework nonNamespaceView(); - - /** - * Returns a facade of the current instance that uses the specified namespace - * or no namespace if <code>newNamespace</code> is <code>null</code>. - * - * @param newNamespace the new namespace or null for none - * @return facade - */ - public CuratorFramework usingNamespace(String newNamespace); - - /** - * Return the current namespace or "" if none - * - * @return namespace - */ - public String getNamespace(); - - /** - * Return the managed zookeeper client - * - * @return client - */ - public CuratorZookeeperClient getZookeeperClient(); - - /** - * Allocates an ensure path instance that is namespace aware - * - * @param path path to ensure - * @return new EnsurePath instance - */ - public EnsurePath newNamespaceAwareEnsurePath(String path); - - /** - * Curator can hold internal references to watchers that may inhibit garbage collection. - * Call this method on watchers you are no longer interested in. - * - * @param watcher the watcher - */ - public void clearWatcherReferences(Watcher watcher); - - /** - * Block until a connection to ZooKeeper is available or the maxWaitTime has been exceeded - * @param maxWaitTime The maximum wait time. Specify a value <= 0 to wait indefinitely - * @param units The time units for the maximum wait time. - * @return True if connection has been established, false otherwise. - * @throws InterruptedException If interrupted while waiting - */ - public boolean blockUntilConnected(int maxWaitTime, TimeUnit units) throws InterruptedException; - - /** - * Block until a connection to ZooKeeper is available. This method will not return until a - * connection is available or it is interrupted, in which case an InterruptedException will - * be thrown - * @throws InterruptedException If interrupted while waiting - */ - public void blockUntilConnected() throws InterruptedException; -} +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.curator.framework; + +import org.apache.curator.CuratorZookeeperClient; +import org.apache.curator.framework.api.*; +import org.apache.curator.framework.api.transaction.CuratorTransaction; +import org.apache.curator.framework.imps.CuratorFrameworkState; +import org.apache.curator.framework.listen.Listenable; +import org.apache.curator.framework.state.ConnectionStateListener; +import org.apache.curator.utils.EnsurePath; +import org.apache.zookeeper.Watcher; + +import java.io.Closeable; +import java.util.concurrent.TimeUnit; + +/** + * Zookeeper framework-style client + */ +public interface CuratorFramework extends Closeable +{ + /** + * Start the client. Most mutator methods will not work until the client is started + */ + public void start(); + + /** + * Stop the client + */ + public void close(); + + /** + * Returns the state of this instance + * + * @return state + */ + public CuratorFrameworkState getState(); + + /** + * Return true if the client is started, not closed, etc. + * + * @return true/false + * @deprecated use {@link #getState()} instead + */ + public boolean isStarted(); + + /** + * Start a create builder + * + * @return builder object + */ + public CreateBuilder create(); + + /** + * Start a delete builder + * + * @return builder object + */ + public DeleteBuilder delete(); + + /** + * Start an exists builder + * <p> + * The builder will return a Stat object as if org.apache.zookeeper.ZooKeeper.exists() were called. Thus, a null + * means that it does not exist and an actual Stat object means it does exist. + * + * @return builder object + */ + public ExistsBuilder checkExists(); + + /** + * Start a get data builder + * + * @return builder object + */ + public GetDataBuilder getData(); + + /** + * Start a set data builder + * + * @return builder object + */ + public SetDataBuilder setData(); + + /** + * Start a get children builder + * + * @return builder object + */ + public GetChildrenBuilder getChildren(); + + /** + * Start a get ACL builder + * + * @return builder object + */ + public GetACLBuilder getACL(); + + /** + * Start a set ACL builder + * + * @return builder object + */ + public SetACLBuilder setACL(); + + /** + * Start a transaction builder + * + * @return builder object + */ + public CuratorTransaction inTransaction(); + + /** + * Perform a sync on the given path - syncs are always in the background + * + * @param path the path + * @param backgroundContextObject optional context + * @deprecated use {@link #sync()} instead + */ + public void sync(String path, Object backgroundContextObject); + + /** + * Start a sync builder. Note: sync is ALWAYS in the background even + * if you don't use one of the background() methods + * + * @return builder object + */ + public SyncBuilder sync(); + + /** + * Start a remove watches builder. + * @return builder object + */ + public RemoveWatchesBuilder removeWatches(); + + /** + * Returns the listenable interface for the Connect State + * + * @return listenable + */ + public Listenable<ConnectionStateListener> getConnectionStateListenable(); + + /** + * Returns the listenable interface for events + * + * @return listenable + */ + public Listenable<CuratorListener> getCuratorListenable(); + + /** + * Returns the listenable interface for unhandled errors + * + * @return listenable + */ + public Listenable<UnhandledErrorListener> getUnhandledErrorListenable(); + + /** + * Returns a facade of the current instance that does _not_ automatically + * pre-pend the namespace to all paths + * + * @return facade + * @deprecated use {@link #usingNamespace} passing <code>null</code> + */ + public CuratorFramework nonNamespaceView(); + + /** + * Returns a facade of the current instance that uses the specified namespace + * or no namespace if <code>newNamespace</code> is <code>null</code>. + * + * @param newNamespace the new namespace or null for none + * @return facade + */ + public CuratorFramework usingNamespace(String newNamespace); + + /** + * Return the current namespace or "" if none + * + * @return namespace + */ + public String getNamespace(); + + /** + * Return the managed zookeeper client + * + * @return client + */ + public CuratorZookeeperClient getZookeeperClient(); + + /** + * Allocates an ensure path instance that is namespace aware + * + * @param path path to ensure + * @return new EnsurePath instance + */ + public EnsurePath newNamespaceAwareEnsurePath(String path); + + /** + * Curator can hold internal references to watchers that may inhibit garbage collection. + * Call this method on watchers you are no longer interested in. + * + * @param watcher the watcher + */ + public void clearWatcherReferences(Watcher watcher); + + /** + * Block until a connection to ZooKeeper is available or the maxWaitTime has been exceeded + * @param maxWaitTime The maximum wait time. Specify a value <= 0 to wait indefinitely + * @param units The time units for the maximum wait time. + * @return True if connection has been established, false otherwise. + * @throws InterruptedException If interrupted while waiting + */ + public boolean blockUntilConnected(int maxWaitTime, TimeUnit units) throws InterruptedException; + + /** + * Block until a connection to ZooKeeper is available. This method will not return until a + * connection is available or it is interrupted, in which case an InterruptedException will + * be thrown + * @throws InterruptedException If interrupted while waiting + */ + public void blockUntilConnected() throws InterruptedException; +} http://git-wip-us.apache.org/repos/asf/curator/blob/9ff9ccd2/curator-framework/src/main/java/org/apache/curator/framework/api/CuratorEventType.java ---------------------------------------------------------------------- diff --git a/curator-framework/src/main/java/org/apache/curator/framework/api/CuratorEventType.java b/curator-framework/src/main/java/org/apache/curator/framework/api/CuratorEventType.java index 684d11b..480d5ec 100644 --- a/curator-framework/src/main/java/org/apache/curator/framework/api/CuratorEventType.java +++ b/curator-framework/src/main/java/org/apache/curator/framework/api/CuratorEventType.java @@ -1,80 +1,85 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.curator.framework.api; - -import org.apache.curator.framework.CuratorFramework; -import org.apache.zookeeper.Watcher; - -public enum CuratorEventType -{ - /** - * Corresponds to {@link CuratorFramework#create()} - */ - CREATE, - - /** - * Corresponds to {@link CuratorFramework#delete()} - */ - DELETE, - - /** - * Corresponds to {@link CuratorFramework#checkExists()} - */ - EXISTS, - - /** - * Corresponds to {@link CuratorFramework#getData()} - */ - GET_DATA, - - /** - * Corresponds to {@link CuratorFramework#setData()} - */ - SET_DATA, - - /** - * Corresponds to {@link CuratorFramework#getChildren()} - */ - CHILDREN, - - /** - * Corresponds to {@link CuratorFramework#sync(String, Object)} - */ - SYNC, - - /** - * Corresponds to {@link CuratorFramework#getACL()} - */ - GET_ACL, - - /** - * Corresponds to {@link CuratorFramework#setACL()} - */ - SET_ACL, - - /** - * Corresponds to {@link Watchable#usingWatcher(Watcher)} or {@link Watchable#watched()} - */ - WATCHED, - - /** - * Event sent when client is being closed - */ - CLOSING -} +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.curator.framework.api; + +import org.apache.curator.framework.CuratorFramework; +import org.apache.zookeeper.Watcher; + +public enum CuratorEventType +{ + /** + * Corresponds to {@link CuratorFramework#create()} + */ + CREATE, + + /** + * Corresponds to {@link CuratorFramework#delete()} + */ + DELETE, + + /** + * Corresponds to {@link CuratorFramework#checkExists()} + */ + EXISTS, + + /** + * Corresponds to {@link CuratorFramework#getData()} + */ + GET_DATA, + + /** + * Corresponds to {@link CuratorFramework#setData()} + */ + SET_DATA, + + /** + * Corresponds to {@link CuratorFramework#getChildren()} + */ + CHILDREN, + + /** + * Corresponds to {@link CuratorFramework#sync(String, Object)} + */ + SYNC, + + /** + * Corresponds to {@link CuratorFramework#getACL()} + */ + GET_ACL, + + /** + * Corresponds to {@link CuratorFramework#setACL()} + */ + SET_ACL, + + /** + * Corresponds to {@link Watchable#usingWatcher(Watcher)} or {@link Watchable#watched()} + */ + WATCHED, + + /** + * Corresponds to {@link CuratorFramework#removeWatches()} + */ + REMOVE_WATCHES, + + /** + * Event sent when client is being closed + */ + CLOSING +} http://git-wip-us.apache.org/repos/asf/curator/blob/9ff9ccd2/curator-framework/src/main/java/org/apache/curator/framework/api/RemoveWatchesBuilder.java ---------------------------------------------------------------------- diff --git a/curator-framework/src/main/java/org/apache/curator/framework/api/RemoveWatchesBuilder.java b/curator-framework/src/main/java/org/apache/curator/framework/api/RemoveWatchesBuilder.java new file mode 100644 index 0000000..2ed3c05 --- /dev/null +++ b/curator-framework/src/main/java/org/apache/curator/framework/api/RemoveWatchesBuilder.java @@ -0,0 +1,29 @@ +package org.apache.curator.framework.api; + +import org.apache.zookeeper.Watcher; + +/** + * Builder to allow watches to be removed + */ +public interface RemoveWatchesBuilder +{ + /** + * Specify the watcher to be removed + * @param watcher + * @return + */ + public RemoveWatchesType watcher(Watcher watcher); + + /** + * Specify the watcher to be removed + * @param watcher + * @return + */ + public RemoveWatchesType watcher(CuratorWatcher watcher); + + /** + * Specify that all watches should be removed + * @return + */ + public RemoveWatchesType allWatches(); +} http://git-wip-us.apache.org/repos/asf/curator/blob/9ff9ccd2/curator-framework/src/main/java/org/apache/curator/framework/api/RemoveWatchesLocal.java ---------------------------------------------------------------------- diff --git a/curator-framework/src/main/java/org/apache/curator/framework/api/RemoveWatchesLocal.java b/curator-framework/src/main/java/org/apache/curator/framework/api/RemoveWatchesLocal.java new file mode 100644 index 0000000..d54638c --- /dev/null +++ b/curator-framework/src/main/java/org/apache/curator/framework/api/RemoveWatchesLocal.java @@ -0,0 +1,18 @@ +package org.apache.curator.framework.api; + +/** + * Builder to allow the specification of whether it is acceptable to remove client side watch information + * in the case where ZK cannot be contacted. + */ +public interface RemoveWatchesLocal extends BackgroundPathable<Void> +{ + + /** + * Specify if the client should just remove client side watches if a connection to ZK + * is not available. + * @param local + * @return + */ + public BackgroundPathable<Void> local(boolean local); + +} http://git-wip-us.apache.org/repos/asf/curator/blob/9ff9ccd2/curator-framework/src/main/java/org/apache/curator/framework/api/RemoveWatchesType.java ---------------------------------------------------------------------- diff --git a/curator-framework/src/main/java/org/apache/curator/framework/api/RemoveWatchesType.java b/curator-framework/src/main/java/org/apache/curator/framework/api/RemoveWatchesType.java new file mode 100644 index 0000000..3c58b7b --- /dev/null +++ b/curator-framework/src/main/java/org/apache/curator/framework/api/RemoveWatchesType.java @@ -0,0 +1,19 @@ +package org.apache.curator.framework.api; + +import org.apache.zookeeper.Watcher.WatcherType; + +/** + * Builder to allow the specification of whether it is acceptable to remove client side watch information + * in the case where ZK cannot be contacted. + */ +public interface RemoveWatchesType +{ + + /** + * Specify the type of watcher to be removed. + * @param watcherType + * @return + */ + public RemoveWatchesLocal ofType(WatcherType watcherType); + +} http://git-wip-us.apache.org/repos/asf/curator/blob/9ff9ccd2/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java ---------------------------------------------------------------------- diff --git a/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java b/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java index cf38e21..b9614ee 100644 --- a/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java +++ b/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java @@ -453,6 +453,12 @@ public class CuratorFrameworkImpl implements CuratorFramework { return new SyncBuilderImpl(this); } + + @Override + public RemoveWatchesBuilder removeWatches() + { + return new RemoveWatchesBuilderImpl(this); + } protected void internalSync(CuratorFrameworkImpl impl, String path, Object context) { @@ -471,7 +477,7 @@ public class CuratorFrameworkImpl implements CuratorFramework { return namespace.newNamespaceAwareEnsurePath(path); } - + ACLProvider getAclProvider() { return aclProvider; http://git-wip-us.apache.org/repos/asf/curator/blob/9ff9ccd2/curator-framework/src/main/java/org/apache/curator/framework/imps/RemoveWatchesBuilderImpl.java ---------------------------------------------------------------------- diff --git a/curator-framework/src/main/java/org/apache/curator/framework/imps/RemoveWatchesBuilderImpl.java b/curator-framework/src/main/java/org/apache/curator/framework/imps/RemoveWatchesBuilderImpl.java new file mode 100644 index 0000000..08f0791 --- /dev/null +++ b/curator-framework/src/main/java/org/apache/curator/framework/imps/RemoveWatchesBuilderImpl.java @@ -0,0 +1,192 @@ +package org.apache.curator.framework.imps; + +import java.util.concurrent.Callable; +import java.util.concurrent.Executor; + +import org.apache.curator.RetryLoop; +import org.apache.curator.TimeTrace; +import org.apache.curator.framework.api.BackgroundCallback; +import org.apache.curator.framework.api.BackgroundPathable; +import org.apache.curator.framework.api.CuratorEvent; +import org.apache.curator.framework.api.CuratorEventType; +import org.apache.curator.framework.api.CuratorWatcher; +import org.apache.curator.framework.api.Pathable; +import org.apache.curator.framework.api.RemoveWatchesLocal; +import org.apache.curator.framework.api.RemoveWatchesBuilder; +import org.apache.curator.framework.api.RemoveWatchesType; +import org.apache.zookeeper.AsyncCallback; +import org.apache.zookeeper.Watcher; +import org.apache.zookeeper.Watcher.WatcherType; +import org.apache.zookeeper.ZooKeeper; + + +public class RemoveWatchesBuilderImpl implements RemoveWatchesBuilder, RemoveWatchesType, RemoveWatchesLocal, BackgroundOperation<String> +{ + private CuratorFrameworkImpl client; + private Watcher watcher; + private WatcherType watcherType; + private boolean local; + private Backgrounding backgrounding; + + public RemoveWatchesBuilderImpl(CuratorFrameworkImpl client) + { + this.client = client; + this.watcher = null; + this.watcherType = null; + this.local = false; + this.backgrounding = new Backgrounding(); + } + + @Override + public RemoveWatchesType watcher(Watcher watcher) + { + this.watcher = watcher == null ? null : client.getNamespaceWatcherMap().getNamespaceWatcher(watcher); + return this; + } + + @Override + public RemoveWatchesType watcher(CuratorWatcher watcher) + { + this.watcher = watcher == null ? null : client.getNamespaceWatcherMap().getNamespaceWatcher(watcher); + return this; + } + + @Override + public RemoveWatchesType allWatches() + { + this.watcher = null; + return this; + } + + @Override + public RemoveWatchesLocal ofType(WatcherType watcherType) + { + this.watcherType = watcherType; + + return this; + } + + @Override + public Pathable<Void> inBackground(BackgroundCallback callback, Object context) + { + backgrounding = new Backgrounding(callback, context); + return this; + } + + @Override + public Pathable<Void> inBackground(BackgroundCallback callback, Object context, Executor executor) + { + backgrounding = new Backgrounding(client, callback, context, executor); + return this; + } + + @Override + public Pathable<Void> inBackground(BackgroundCallback callback) + { + backgrounding = new Backgrounding(callback); + return this; + } + + @Override + public Pathable<Void> inBackground(BackgroundCallback callback, Executor executor) + { + backgrounding = new Backgrounding(client, callback, executor); + return this; + } + + @Override + public Pathable<Void> inBackground() + { + backgrounding = new Backgrounding(true); + return this; + } + + @Override + public Pathable<Void> inBackground(Object context) + { + backgrounding = new Backgrounding(context); + return this; + } + + @Override + public BackgroundPathable<Void> local(boolean local) + { + this.local = local; + return this; + } + + @Override + public Void forPath(String path) throws Exception + { + final String adjustedPath = client.fixForNamespace(path); + + if(backgrounding.inBackground()) + { + pathInBackground(adjustedPath); + } + else + { + pathInForeground(adjustedPath); + } + + return null; + } + + private void pathInBackground(String path) + { + OperationAndData.ErrorCallback<String> errorCallback = null; + client.processBackgroundOperation(new OperationAndData<String>(this, path, backgrounding.getCallback(), errorCallback, backgrounding.getContext()), null); + } + + private void pathInForeground(final String path) throws Exception + { + RetryLoop.callWithRetry(client.getZookeeperClient(), + new Callable<Void>() + { + @Override + public Void call() throws Exception + { + ZooKeeper zkClient = client.getZooKeeper(); + if(watcher == null) + { + zkClient.removeAllWatches(path, watcherType, local); + } + else + { + zkClient.removeWatches(path, watcher, watcherType, local); + } + + return null; + } + }); + } + + @Override + public void performBackgroundOperation(final OperationAndData<String> operationAndData) + throws Exception + { + final TimeTrace trace = client.getZookeeperClient().startTracer("RemoteWatches-Background"); + + AsyncCallback.VoidCallback callback = new AsyncCallback.VoidCallback() + { + @Override + public void processResult(int rc, String path, Object ctx) + { + trace.commit(); + CuratorEvent event = new CuratorEventImpl(client, CuratorEventType.REMOVE_WATCHES, rc, path, null, ctx, null, null, null, null, null); + client.processBackgroundOperation(operationAndData, event); + } + }; + + ZooKeeper zkClient = client.getZooKeeper(); + if(watcher == null) + { + zkClient.removeAllWatches(operationAndData.getData(), watcherType, local, callback, operationAndData.getContext()); + } + else + { + zkClient.removeWatches(operationAndData.getData(), watcher, watcherType, local, callback, operationAndData.getContext()); + } + + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/curator/blob/9ff9ccd2/curator-framework/src/test/java/org/apache/curator/framework/imps/TestRemoveWatches.java ---------------------------------------------------------------------- diff --git a/curator-framework/src/test/java/org/apache/curator/framework/imps/TestRemoveWatches.java b/curator-framework/src/test/java/org/apache/curator/framework/imps/TestRemoveWatches.java new file mode 100644 index 0000000..d7e8886 --- /dev/null +++ b/curator-framework/src/test/java/org/apache/curator/framework/imps/TestRemoveWatches.java @@ -0,0 +1,218 @@ +package org.apache.curator.framework.imps; + +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.atomic.AtomicBoolean; + +import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.framework.CuratorFrameworkFactory; +import org.apache.curator.framework.api.BackgroundCallback; +import org.apache.curator.framework.api.CuratorEvent; +import org.apache.curator.framework.api.CuratorListener; +import org.apache.curator.framework.api.CuratorWatcher; +import org.apache.curator.retry.RetryOneTime; +import org.apache.curator.test.BaseClassForTests; +import org.apache.curator.test.Timing; +import org.apache.curator.utils.CloseableUtils; +import org.apache.zookeeper.WatchedEvent; +import org.apache.zookeeper.Watcher; +import org.apache.zookeeper.Watcher.WatcherType; +import org.apache.zookeeper.ZooKeeper; +import org.testng.Assert; +import org.testng.annotations.Test; + +public class TestRemoveWatches extends BaseClassForTests +{ + @Test + public void testRemoveCuratorWatch() throws Exception + { + CuratorFramework client = CuratorFrameworkFactory.builder(). + connectString(server.getConnectString()). + retryPolicy(new RetryOneTime(1)). + build(); + try + { + client.start(); + + CuratorWatcher watcher = new CuratorWatcher() + { + + @Override + public void process(WatchedEvent event) throws Exception + { + // TODO Auto-generated method stub + + } + }; + + String path = "/"; + client.checkExists().usingWatcher(watcher).forPath(path); + + client.removeWatches().watcher(watcher).ofType(WatcherType.Any).forPath(path); + } + finally + { + CloseableUtils.closeQuietly(client); + } + } + + @Test + public void testRemoveWatch() throws Exception + { + CuratorFramework client = CuratorFrameworkFactory.builder(). + connectString(server.getConnectString()). + retryPolicy(new RetryOneTime(1)). + build(); + try + { + client.start(); + + Watcher watcher = new Watcher() + { + @Override + public void process(WatchedEvent event) + { + } + }; + + String path = "/"; + client.checkExists().usingWatcher(watcher).forPath(path); + + client.removeWatches().watcher(watcher).ofType(WatcherType.Any).forPath(path); + } + finally + { + CloseableUtils.closeQuietly(client); + } + } + + @Test + public void testRemoveWatchInBackgroundWithCallback() throws Exception + { + Timing timing = new Timing(); + CuratorFramework client = CuratorFrameworkFactory.builder(). + connectString(server.getConnectString()). + retryPolicy(new RetryOneTime(1)). + build(); + try + { + client.start(); + + Watcher watcher = new Watcher() + { + @Override + public void process(WatchedEvent event) + { + } + }; + + final CountDownLatch removedLatch = new CountDownLatch(1); + BackgroundCallback callback = new BackgroundCallback() + { + + @Override + public void processResult(CuratorFramework client, CuratorEvent event) + throws Exception + { + removedLatch.countDown(); + } + }; + + String path = "/"; + client.checkExists().usingWatcher(watcher).forPath(path); + + client.removeWatches().watcher(watcher).ofType(WatcherType.Any).inBackground(callback).forPath(path); + + Assert.assertTrue(timing.awaitLatch(removedLatch), "Timed out waiting for watch removal"); + + } + finally + { + CloseableUtils.closeQuietly(client); + } + } + + @Test + public void testRemoveWatchInBackgroundWithNoCallback() throws Exception + { + Timing timing = new Timing(); + CuratorFramework client = CuratorFrameworkFactory.builder(). + connectString(server.getConnectString()). + retryPolicy(new RetryOneTime(1)). + build(); + try + { + client.start(); + + Watcher watcher = new Watcher() + { + @Override + public void process(WatchedEvent event) + { + } + }; + + final CountDownLatch removedLatch = new CountDownLatch(1); + client.getCuratorListenable().addListener(new CuratorListener() + { + + @Override + public void eventReceived(CuratorFramework client, CuratorEvent event) + throws Exception + { + removedLatch.countDown(); + } + }); + + String path = "/"; + client.checkExists().usingWatcher(watcher).forPath(path); + + client.removeWatches().watcher(watcher).ofType(WatcherType.Any).inBackground().forPath(path); + + Assert.assertTrue(timing.awaitLatch(removedLatch), "Timed out waiting for watch removal"); + + } + finally + { + CloseableUtils.closeQuietly(client); + } + } + + @Test + public void testRemoveAllWatches() throws Exception + { + CuratorFramework client = CuratorFrameworkFactory.builder(). + connectString(server.getConnectString()). + retryPolicy(new RetryOneTime(1)). + build(); + try + { + client.start(); + + Watcher watcher1 = new Watcher() + { + @Override + public void process(WatchedEvent event) + { + } + }; + + Watcher watcher2 = new Watcher() + { + @Override + public void process(WatchedEvent event) + { + } + }; + + String path = "/"; + client.checkExists().usingWatcher(watcher1).forPath(path); + client.checkExists().usingWatcher(watcher2).forPath(path); + + client.removeWatches().allWatches().ofType(WatcherType.Any).forPath(path); + } + finally + { + CloseableUtils.closeQuietly(client); + } + } +}
