It would really be better if you put these comments into reviewboard, so they will appear there and on the associated JIRA. The mailing list is a bad place for code reviews.
Thanks! Camille On Tue, Jan 17, 2012 at 4:31 PM, Robert Crocombe <[email protected]> wrote: > Well, I didn't get to look at it when I thought I would, and now I see > there's a ReviewBoard thing. I'd been working on some comments on and off: > perhaps they would be valuable in the sense that they're from someone with > little understanding of ZooKeeper's internals. While working through the > patch I also minimized it by removing whitespace-only changes and putting > the imports back in their original order (one file dropped out of the patch > as a result: it was only import order changes): I've attached that in case > it might be of some use. My comments are inline below. > > > diff --git a/src/java/main/org/apache/zookeeper/ClientCnxn.java > b/src/java/main/org/apache/zookeeper/ClientCnxn.java > > --- a/src/java/main/org/apache/zookeeper/ClientCnxn.java > > +++ b/src/java/main/org/apache/zookeeper/ClientCnxn.java > > @@ -53,6 +53,7 @@ import org.apache.zookeeper.Watcher.Even > > import org.apache.zookeeper.Watcher.Event.KeeperState; > > import org.apache.zookeeper.ZooDefs.OpCode; > > import org.apache.zookeeper.ZooKeeper.States; > > +import org.apache.zookeeper.ZooKeeper.WatchDeregistration; > > import org.apache.zookeeper.ZooKeeper.WatchRegistration; > > import org.apache.zookeeper.client.HostProvider; > > import org.apache.zookeeper.client.ZooKeeperSaslClient; > > @@ -203,6 +204,10 @@ public class ClientCnxn { > > return negotiatedSessionTimeout; > > } > > > > + public Object getEventThreadLock() { > > + return eventThreadLock; > > + } > > + > > @Override > > public String toString() { > > StringBuilder sb = new StringBuilder(); > > @@ -251,6 +256,8 @@ public class ClientCnxn { > > > > WatchRegistration watchRegistration; > > > > + WatchDeregistration watchDeregistration; > > + > > /** Convenience ctor */ > > Packet(RequestHeader requestHeader, ReplyHeader replyHeader, > > Record request, Record response, > > @@ -380,6 +387,10 @@ public class ClientCnxn { > > > > } > > > > + public void queueCallback(AsyncCallback cb, int rc, String path, > Object ctx) { > > + eventThread.queueCallback(cb, rc, path, ctx); > > + } > > + > > // used by ZooKeeperSaslClient.queueSaslPacket(). > > public void queuePacket(RequestHeader h, ReplyHeader r, Record > request, > > Record response, AsyncCallback cb) { > > @@ -424,6 +435,20 @@ public class ClientCnxn { > > } > > } > > > > + private static class LocalCallback { > > + private final AsyncCallback cb; > > + private final int rc; > > + private final String path; > > + private final Object ctx; > > + > > + public LocalCallback(AsyncCallback cb, int rc, String path, > Object ctx) { > > + this.cb = cb; > > + this.rc = rc; > > + this.path = path; > > + this.ctx = ctx; > > + } > > + } > > + > > /** > > * Guard against creating > "-EventThread-EventThread-EventThread-..." thread > > * names when ZooKeeper object is being created from within a > watcher. > > @@ -435,6 +460,8 @@ public class ClientCnxn { > > return name + suffix; > > } > > There should be documentation for what this lock synchronizes. I believe > the > concern is that ZKWatchManager.materialize() and ZkWatchManager.remove() > both > access the members dataWatches, existWatches, and childWatches. However, > within both materialize() and remove() there are individual synchronized > blocks protecting the maps individually. Is the concern therefore that you > need a lock across all of them simultaneously, or have I missed something? > If > not, mightn't it better to simply make materialize() and remove() both > synchronized, or does that introduce some bottleneck (I don't see it)? > > I suppose this change introduces the minimum amount of synchronization > necessary (least change vs. existing behavior), but I guess I'd prefer to > see > it fixed for any potential uses cases now rather than have the next person > discover belatedly that he/she needs to have this kind of locking. > > > + private final Object eventThreadLock = new Object(); > > + > > class EventThread extends Thread { > > private final LinkedBlockingQueue<Object> waitingEvents = > > new LinkedBlockingQueue<Object>(); > > @@ -454,6 +481,10 @@ public class ClientCnxn { > > setDaemon(true); > > } > > > > + public void queueCallback(AsyncCallback cb, int rc, String > path, Object ctx) { > > + waitingEvents.add(new LocalCallback(cb, rc, path, ctx)); > > + } > > + > > public void queueEvent(WatchedEvent event) { > > if (event.getType() == EventType.None > > && sessionState == event.getState()) { > > @@ -461,13 +492,15 @@ public class ClientCnxn { > > } > > sessionState = event.getState(); > > > > - // materialize the watchers based on the event > > - WatcherSetEventPair pair = new WatcherSetEventPair( > > - watcher.materialize(event.getState(), > event.getType(), > > - event.getPath()), > > - event); > > - // queue the pair (watch set & event) for later processing > > - waitingEvents.add(pair); > > + synchronized (eventThreadLock) { > > + // materialize the watchers based on the event > > + WatcherSetEventPair pair = new WatcherSetEventPair( > > + watcher.materialize(event.getState(), > event.getType(), > > + event.getPath()), > > + event); > > + // queue the pair (watch set & event) for later > processing > > + waitingEvents.add(pair); > > + } > > } > > > > public void queuePacket(Packet packet) { > > @@ -523,6 +556,23 @@ public class ClientCnxn { > > LOG.error("Error while calling watcher ", t); > > } > > } > > + } else if (event instanceof LocalCallback) { > > + LocalCallback lcb = (LocalCallback) event; > > + if (lcb.cb instanceof StatCallback) { > > + ((StatCallback)lcb.cb).processResult(lcb.rc, > lcb.path, lcb.ctx, null); > > + } else if (lcb.cb instanceof DataCallback) { > > + ((DataCallback)lcb.cb).processResult(lcb.rc, > lcb.path, lcb.ctx, null, null); > > + } else if (lcb.cb instanceof ACLCallback) { > > + ((ACLCallback)lcb.cb).processResult(lcb.rc, > lcb.path, lcb.ctx, null, null); > > + } else if (lcb.cb instanceof ChildrenCallback) { > > + ((ChildrenCallback)lcb.cb).processResult(lcb.rc, > lcb.path, lcb.ctx, null); > > + } else if (lcb.cb instanceof Children2Callback) { > > + ((Children2Callback)lcb.cb).processResult(lcb.rc, > lcb.path, lcb.ctx, null, null); > > + } else if (lcb.cb instanceof StringCallback) { > > + ((StringCallback)lcb.cb).processResult(lcb.rc, > lcb.path, lcb.ctx, null); > > + } else { > > + ((VoidCallback)lcb.cb).processResult(lcb.rc, > lcb.path, lcb.ctx); > > + } > > Ugh. I suppose there's no help for it, though. > > > } else { > > Packet p = (Packet) event; > > int rc = 0; > > @@ -629,6 +679,9 @@ public class ClientCnxn { > > if (p.watchRegistration != null) { > > p.watchRegistration.register(p.replyHeader.getErr()); > > } > > + if (p.watchDeregistration != null) { > > + p.watchDeregistration.unregister(p.replyHeader.getErr()); > > + } > > > > if (p.cb == null) { > > synchronized (p) { > > @@ -1250,10 +1303,16 @@ public class ClientCnxn { > > > > public ReplyHeader submitRequest(RequestHeader h, Record request, > > Record response, WatchRegistration watchRegistration) > > + throws InterruptedException { > > + return submitRequest(h, request, response, watchRegistration, > null); > > + } > > + > > + public ReplyHeader submitRequest(RequestHeader h, Record request, > > + Record response, WatchRegistration watchRegistration, > WatchDeregistration watchDeregistration) > > throws InterruptedException { > > ReplyHeader r = new ReplyHeader(); > > Packet packet = queuePacket(h, r, request, response, null, > null, null, > > - null, watchRegistration); > > + null, watchRegistration, watchDeregistration); > > synchronized (packet) { > > while (!packet.finished) { > > packet.wait(); > > @@ -1266,6 +1325,14 @@ public class ClientCnxn { > > Record response, AsyncCallback cb, String clientPath, > > String serverPath, Object ctx, WatchRegistration > watchRegistration) > > { > > + return queuePacket(h, r, request, response, cb, clientPath, > serverPath, ctx, watchRegistration, null); > > + } > > + > > + Packet queuePacket(RequestHeader h, ReplyHeader r, Record request, > > + Record response, AsyncCallback cb, String clientPath, > > + String serverPath, Object ctx, WatchRegistration > watchRegistration, > > + WatchDeregistration watchDeregistration) > > + { > > Packet packet = null; > > synchronized (outgoingQueue) { > > if (h.getType() != OpCode.ping && h.getType() != > OpCode.auth) { > > @@ -1276,6 +1343,7 @@ public class ClientCnxn { > > packet.ctx = ctx; > > packet.clientPath = clientPath; > > packet.serverPath = serverPath; > > + packet.watchDeregistration = watchDeregistration; > > if (!state.isAlive() || closing) { > > conLossPacket(packet); > > } else { > > diff --git a/src/java/main/org/apache/zookeeper/KeeperException.java > b/src/java/main/org/apache/zookeeper/KeeperException.java > > --- a/src/java/main/org/apache/zookeeper/KeeperException.java > > +++ b/src/java/main/org/apache/zookeeper/KeeperException.java > > @@ -131,7 +131,8 @@ public abstract class KeeperException ex > > return new SessionMovedException(); > > case NOTREADONLY: > > return new NotReadOnlyException(); > > - > > + case NOWATCHER: > > + return new NoWatcherException(); > > case OK: > > default: > > throw new IllegalArgumentException("Invalid exception > code"); > > @@ -345,7 +346,9 @@ public abstract class KeeperException ex > > /** Session moved to another server, so operation is ignored */ > > SESSIONMOVED (-118), > > /** State-changing request is passed to read-only server */ > > - NOTREADONLY (-119); > > + NOTREADONLY (-119), > > + /** Tried to remove inexistent watcher */ > > + NOWATCHER (-120); > > "nonexistent"? > > > > > private static final Map<Integer,Code> lookup > > = new HashMap<Integer,Code>(); > > @@ -422,6 +425,8 @@ public abstract class KeeperException ex > > return "Session moved"; > > case NOTREADONLY: > > return "Not a read-only call"; > > + case NOWATCHER: > > + return "No such watcher"; > > default: > > return "Unknown error " + code; > > } > > @@ -696,4 +701,13 @@ public abstract class KeeperException ex > > super(Code.UNIMPLEMENTED); > > } > > } > > + > > + /** > > + * @see Code#NOWATCHER > > + */ > > + public static class NoWatcherException extends KeeperException { > > + public NoWatcherException() { > > + super(Code.NOWATCHER); > > + } > > + } > > } > > diff --git a/src/java/main/org/apache/zookeeper/Watcher.java > b/src/java/main/org/apache/zookeeper/Watcher.java > > --- a/src/java/main/org/apache/zookeeper/Watcher.java > > +++ b/src/java/main/org/apache/zookeeper/Watcher.java > > @@ -145,5 +145,10 @@ public interface Watcher { > > } > > } > > > > + public enum WatcherType { > > + Children, > > + Data, > > + } > > + > > abstract public void process(WatchedEvent event); > > } > > diff --git a/src/java/main/org/apache/zookeeper/ZooDefs.java > b/src/java/main/org/apache/zookeeper/ZooDefs.java > > --- a/src/java/main/org/apache/zookeeper/ZooDefs.java > > +++ b/src/java/main/org/apache/zookeeper/ZooDefs.java > > @@ -54,6 +54,8 @@ public class ZooDefs { > > > > public final int multi = 14; > > > > + public final int removeWatches = 15; > > + > > public final int auth = 100; > > > > public final int setWatches = 101; > > diff --git a/src/java/main/org/apache/zookeeper/ZooKeeper.java > b/src/java/main/org/apache/zookeeper/ZooKeeper.java > > --- a/src/java/main/org/apache/zookeeper/ZooKeeper.java > > +++ b/src/java/main/org/apache/zookeeper/ZooKeeper.java > > @@ -18,8 +18,11 @@ > > > > package org.apache.zookeeper; > > > > +import java.util.concurrent.Exchanger; > > import org.apache.zookeeper.AsyncCallback.*; > > +import org.apache.zookeeper.KeeperException.Code; > > import org.apache.zookeeper.OpResult.ErrorResult; > > +import org.apache.zookeeper.Watcher.WatcherType; > > import org.apache.zookeeper.client.ConnectStringParser; > > import org.apache.zookeeper.client.HostProvider; > > import org.apache.zookeeper.client.StaticHostProvider; > > @@ -222,6 +225,108 @@ public class ZooKeeper { > > > > return result; > > } > > + > > + private boolean containsWatcher (Map<String,Set<Watcher>> > watches, Watcher watcher, String path) { > > + Set<Watcher> watchers = watches.get(path); > > + if (watchers == null) { > > + return false; > > + } > > + if (watcher != null) { > > + return watchers.contains(watcher); > > + } else { > > + return true; > > + } > > + } > > + > > + private boolean removeWatches (Map<String,Set<Watcher>> > watches, Watcher watcher, String path, boolean local) { > > + if (!local && watcher == null) { > > + // Remove all watchers > > + watches.remove(path); > > + return true; > > + } > > + Set<Watcher> watchers = watches.get(path); > > + if (local && (watcher == null || watchers.size() == 1)) { > > + return false; > > + } else { > > + watchers.remove(watcher); > > + return true; > > + } > > + } > > Please elaborate on why 'local' is necessary: I see that remove() below is > called with 'true' in ZooKeeper.removeWatches() and 'false' in > WatcherDeregistration.unregister(), but I decided I was too lazy to parse > the > semantics of the behavior difference in the two cases and why they are > necessary without any documentation. > > > + > > + /** > > + * Removes a watcher from the interest lists. > > + * > > + * @param clientPath path where the watcher is set > > + * @param watcher concrete watcher object or null for any > watcher set on the znode > > + * @param type watcher type > > + * @param local whether the call is local to the client or has > ben triggered by a server's message > > + * @return false if the removal can't be done local to the > client > > + * @throws KeeperException if no such watcher exists > > + */ > > + public boolean remove(String clientPath, Watcher watcher, > WatcherType type, boolean local) throws KeeperException { > > + boolean contains = false; > > + switch (type) { > > + case Children: { > > + synchronized(childWatches) { > > + contains = containsWatcher(childWatches, watcher, > clientPath); > > + if (contains) { > > + return removeWatches(childWatches, watcher, > clientPath, local); > > + } > > + } > > + break; > > + } > > + case Data: { > > + synchronized (dataWatches) { > > + contains = containsWatcher(dataWatches, watcher, > clientPath); > > + if (contains && removeWatches(dataWatches, watcher, > clientPath, local)) { > > + return true; > > + } > > + } > > + synchronized (existWatches) { > > + boolean contains_temp = > containsWatcher(existWatches, watcher, clientPath); > > + if (contains_temp && removeWatches(existWatches, > watcher, clientPath, local)) { > > + return true; > > + } > > + contains |= contains_temp; > > + } > > + } > > + } > > + if (!contains) { > > + throw KeeperException.create(Code.NOWATCHER, > clientPath); > > + } > > + return false; > > + } > > + } > > + > > + /** > > + * Unregisters a set of watchers for a particular path. > > + */ > > + class WatchDeregistration { > > + private Watcher watcher; > > + private WatcherType type; > > + private String clientPath; > > + public WatchDeregistration(Watcher watcher, WatcherType type, > String clientPath) > > + { > > + this.watcher = watcher; > > + this.type = type; > > + this.clientPath = clientPath; > > + } > > + > > + /** > > + * Unregisters the watcher with the set of watches on path. > > + * @param rc the result code of the operation that attempted to > > + * add the watch on the path. > > + */ > > Looks like copy/paste error from WatchRegistration. The "add" should be > "remove". > > > + public void unregister(int rc) { > > + if (rc != 0) > > + return; > > + try { > > + watchManager.remove(clientPath, watcher, type, false); > > + } catch (KeeperException e) { > > + // Shouldn't happen > > + throw new RuntimeException(e); > > + } > > + } > > } > > > > /** > > @@ -1629,6 +1734,118 @@ public class ZooKeeper { > > } > > > > /** > > + * For the given znode removes the specified watchers. > > + * <p> > > + * If watcher is null all watchers will be removed, otherwise only > the > > + * specified watcher will be deleted. > > + * <p> > > + * A successfull call guarantees that the removed watcher won't be > triggered. > > + * This call is also intended to avoid watch leaks both client and > serverside. > > + * <p> > > + * A KeeperException with error code KeeperException.NoWatcher will > be thrown > > + * if no watcher exists that match the specified parameters. > > + * > > + * TODO > > + * @since 3.X.X > > + * > > + * @param path > > + * @param watcher a concrete watcher or null to remove all > > + * @param type the type of watcher to be removed > > + * @throws InterruptedException If the server transaction is > interrupted. > > + * @throws KeeperException If the server signals an error with a > non-zero > > + * error code. > > + */ > > + public void removeWatches(String path, Watcher watcher, WatcherType > type) > > + throws InterruptedException, KeeperException > > + { > > + final Exchanger<Integer> exchanger = new Exchanger<Integer>(); > > + > > + VoidCallback cb = new VoidCallback() { > > + @Override > > + public void processResult(int rc, String path, Object ctx) { > > + while(true) { > > + try { > > + exchanger.exchange(rc); > > + return; > > + } catch (InterruptedException e) { > > + // needed? > > + Thread.currentThread().interrupt(); > > + } > > + } > > + } > > + }; > > So it looks like this loop is here to suppress InterruptedExceptions and > keep > trying the exchange() until it works. I believe this was done so that the > opposing thread in the exchange will not be left blocked in exchange() > forever (as you noted in the comment below). I thought I'd found a case > where > the loop was infinite if an exception was thrown before queueCallback() > could > be called, but it looks like you have that covered, too. Uhm, seems like > it'd > be good to document this since I spent about 10 minutes noodling about how > this all worked. I gotta say I'm not a fan of this weird two matching > loops > with exception suppression thing, but no alternative comes readily to mind. > > I do believe that the > > Thread.currentThread().interrupt() > > is necessary. > > > + > > + KeeperException ke = null; > > + > > + // Call the async version > > + try { > > + removeWatches(path, watcher, type, cb, null); > > + } catch (KeeperException e) { > > + // Can't rethrow yet, must call exchanger.exchange() to > unblock eventThread on processResult() > > + ke = e; > > + } > > + > > + int rc; > > + while(true) { > > + try { > > + rc = exchanger.exchange(0); > > + break; > > + } catch (InterruptedException e) { > > + // needed? > > + Thread.currentThread().interrupt(); > > + } > > + } > > + > > + if (ke != null) { > > + throw ke; > > + } > > + > > + if (rc != 0) { > > + throw KeeperException.create(KeeperException.Code.get(rc), > path); > > + } > > + } > > + > > + /** > > + * The asynchronous version of removeWatches. It is guaranteed that > the watcher won't > > + * be triggered after the callback is notified. > > + * > > + * @throws KeeperException > > + * > > + * @since 3.3.0 > > + * > > + * @see #removeWatches(String, Watcher, Watcher.WatcherType) > > + */ > > + public void removeWatches(String path, Watcher watcher, WatcherType > type, VoidCallback cb, > > + Object ctx) throws KeeperException > > + { > > + final String clientPath = path; > > + PathUtils.validatePath(clientPath); > > + > > + final String serverPath = prependChroot(clientPath); > > + > > + WatchDeregistration wcb = new WatchDeregistration(watcher, > type, clientPath); > > + > > + RequestHeader h = new RequestHeader(); > > + h.setType(ZooDefs.OpCode.removeWatches); > > + try { > > + synchronized (cnxn.getEventThreadLock()) { > > + if (watchManager.remove(clientPath, watcher, type, > true)) { > > + cnxn.queueCallback(cb, > KeeperException.Code.OK.intValue(), clientPath, ctx); > > + return; > > + } > > + } > > + } catch (KeeperException ke) { > > + cnxn.queueCallback(cb, ke.code().intValue(), clientPath, > ctx); > > + // TODO should we throw the exception or just return and > let the callback handle this case? > > + throw ke; > > + } > > + RemoveWatchesRequest request = new RemoveWatchesRequest(); > > + request.setPath(serverPath); > > + request.setType(type.ordinal()); > > + cnxn.queuePacket(h, new ReplyHeader(), request, null, cb, > clientPath, serverPath, ctx, null, wcb); > > + } > > + > > + /** > > * Asynchronous sync. Flushes channel between process and leader. > > * @param path > > * @param cb a handler for the callback > > diff --git a/src/java/main/org/apache/zookeeper/server/DataTree.java > b/src/java/main/org/apache/zookeeper/server/DataTree.java > > --- a/src/java/main/org/apache/zookeeper/server/DataTree.java > > +++ b/src/java/main/org/apache/zookeeper/server/DataTree.java > > @@ -48,6 +48,7 @@ import org.apache.zookeeper.KeeperExcept > > import org.apache.zookeeper.Watcher.Event; > > import org.apache.zookeeper.Watcher.Event.EventType; > > import org.apache.zookeeper.Watcher.Event.KeeperState; > > +import org.apache.zookeeper.Watcher.WatcherType; > > import org.apache.zookeeper.ZooDefs.Ids; > > import org.apache.zookeeper.ZooDefs.OpCode; > > import org.apache.zookeeper.common.PathTrie; > > @@ -1275,4 +1276,15 @@ public class DataTree { > > } > > } > > } > > + > > + public void removeWatch(String path, WatcherType type, Watcher > watcher) { > > + switch (type) { > > + case Children: > > + this.childWatches.removeWatcher(path, watcher); > > + break; > > + case Data: > > + this.dataWatches.removeWatcher(path, watcher); > > + break; > > + } > > + } > > } > > diff --git > a/src/java/main/org/apache/zookeeper/server/FinalRequestProcessor.java > b/src/java/main/org/apache/zookeeper/server/FinalRequestProcessor.java > > --- > a/src/java/main/org/apache/zookeeper/server/FinalRequestProcessor.java > > +++ > b/src/java/main/org/apache/zookeeper/server/FinalRequestProcessor.java > > @@ -30,6 +30,7 @@ import org.apache.zookeeper.MultiRespons > > import org.apache.zookeeper.ZooDefs; > > import org.apache.zookeeper.KeeperException.Code; > > import org.apache.zookeeper.KeeperException.SessionMovedException; > > +import org.apache.zookeeper.Watcher.WatcherType; > > import org.apache.zookeeper.ZooDefs.OpCode; > > import org.apache.zookeeper.data.ACL; > > import org.apache.zookeeper.data.Stat; > > @@ -44,6 +45,7 @@ import org.apache.zookeeper.proto.GetChi > > import org.apache.zookeeper.proto.GetChildrenResponse; > > import org.apache.zookeeper.proto.GetDataRequest; > > import org.apache.zookeeper.proto.GetDataResponse; > > +import org.apache.zookeeper.proto.RemoveWatchesRequest; > > import org.apache.zookeeper.proto.ReplyHeader; > > import org.apache.zookeeper.proto.SetACLResponse; > > import org.apache.zookeeper.proto.SetDataResponse; > > @@ -312,6 +314,14 @@ public class FinalRequestProcessor imple > > setWatches.getChildWatches(), cnxn); > > break; > > } > > + case OpCode.removeWatches: { > > + lastOp = "REMW"; > > + RemoveWatchesRequest removeWatches = new > RemoveWatchesRequest(); > > + > ByteBufferInputStream.byteBuffer2Record(request.request, removeWatches); > > + WatcherType type = > WatcherType.values()[removeWatches.getType()]; > > + > zks.getZKDatabase().removeWatch(removeWatches.getPath(), type, cnxn); > > + break; > > + } > > case OpCode.getACL: { > > lastOp = "GETA"; > > GetACLRequest getACLRequest = new GetACLRequest(); > > diff --git a/src/java/main/org/apache/zookeeper/server/Request.java > b/src/java/main/org/apache/zookeeper/server/Request.java > > --- a/src/java/main/org/apache/zookeeper/server/Request.java > > +++ b/src/java/main/org/apache/zookeeper/server/Request.java > > @@ -132,6 +132,7 @@ public class Request { > > case OpCode.ping: > > case OpCode.closeSession: > > case OpCode.setWatches: > > + case OpCode.removeWatches: > > return true; > > default: > > return false; > > @@ -169,6 +170,8 @@ public class Request { > > return "create"; > > case OpCode.setWatches: > > return "setWatches"; > > + case OpCode.removeWatches: > > + return "removeWatches"; > > case OpCode.delete: > > return "delete"; > > case OpCode.exists: > > diff --git a/src/java/main/org/apache/zookeeper/server/WatchManager.java > b/src/java/main/org/apache/zookeeper/server/WatchManager.java > > --- a/src/java/main/org/apache/zookeeper/server/WatchManager.java > > +++ b/src/java/main/org/apache/zookeeper/server/WatchManager.java > > @@ -88,6 +88,22 @@ class WatchManager { > > } > > } > > > > + synchronized void removeWatcher(String path, Watcher watcher) { > > + HashSet<String> paths = watch2Paths.get(watcher); > > + if (paths == null) { > > + return; > > + } > > + paths.remove(path); > > + HashSet<Watcher> list = watchTable.get(path); > > + if (list == null) { > > + return; > > + } > > + list.remove(watcher); > > + if (list.size() == 0) { > > + watchTable.remove(path); > > + } > > + } > > + > > Set<Watcher> triggerWatch(String path, EventType type) { > > return triggerWatch(path, type, null); > > } > > diff --git a/src/java/main/org/apache/zookeeper/server/ZKDatabase.java > b/src/java/main/org/apache/zookeeper/server/ZKDatabase.java > > --- a/src/java/main/org/apache/zookeeper/server/ZKDatabase.java > > +++ b/src/java/main/org/apache/zookeeper/server/ZKDatabase.java > > @@ -39,6 +39,7 @@ import org.slf4j.LoggerFactory; > > import org.apache.zookeeper.KeeperException; > > import org.apache.zookeeper.KeeperException.NoNodeException; > > import org.apache.zookeeper.Watcher; > > +import org.apache.zookeeper.Watcher.WatcherType; > > import org.apache.zookeeper.data.ACL; > > import org.apache.zookeeper.data.Stat; > > import org.apache.zookeeper.server.DataTree.ProcessTxnResult; > > @@ -373,6 +374,17 @@ public class ZKDatabase { > > } > > > > /** > > + * remove watch from the datatree > > + * > > + * @param path node to remove watches from > > + * @param type the type of watcher to remove > > + * @param watcher the watcher function to remove > > + */ > > + public void removeWatch(String path, WatcherType type, Watcher > watcher) { > > + dataTree.removeWatch(path, type, watcher); > > + } > > + > > + /** > > * get acl for a path > > * @param path the path to query for acl > > * @param stat the stat for the node > > diff --git > a/src/java/test/org/apache/zookeeper/test/RemoveWatchesTest.java > b/src/java/test/org/apache/zookeeper/test/RemoveWatchesTest.java > > new file mode 100644 > > --- /dev/null > > +++ b/src/java/test/org/apache/zookeeper/test/RemoveWatchesTest.java > > @@ -0,0 +1,231 @@ > > +/** > > + * Licensed to the Apache Software Foundation (ASF) under one > > + * or more contributor license agreements. See the NOTICE file > > + * distributed with this work for additional information > > + * regarding copyright ownership. The ASF licenses this file > > + * to you under the Apache License, Version 2.0 (the > > + * "License"); you may not use this file except in compliance > > + * with the License. You may obtain a copy of the License at > > + * > > + * http://www.apache.org/licenses/LICENSE-2.0 > > + * > > + * Unless required by applicable law or agreed to in writing, software > > + * distributed under the License is distributed on an "AS IS" BASIS, > > + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or > implied. > > + * See the License for the specific language governing permissions and > > + * limitations under the License. > > + */ > > + > > +package org.apache.zookeeper.test; > > + > > +import java.io.IOException; > > +import java.util.concurrent.CountDownLatch; > > +import java.util.concurrent.TimeUnit; > > + > > +import org.apache.zookeeper.AsyncCallback; > > +import org.apache.zookeeper.CreateMode; > > +import org.apache.zookeeper.KeeperException; > > +import org.apache.zookeeper.WatchedEvent; > > +import org.apache.zookeeper.Watcher; > > +import org.apache.zookeeper.Watcher.WatcherType; > > +import org.apache.zookeeper.ZooDefs.Ids; > > +import org.apache.zookeeper.ZooKeeper; > > +import org.junit.Assert; > > +import org.junit.Test; > > + > > +public class RemoveWatchesTest extends ClientBase { > > + private class MyWatcher implements Watcher { > > + private final String path; > > + private String eventPath; > > + private CountDownLatch latch = new CountDownLatch(1); > > + > > + public MyWatcher(String path) { > > + this.path = path; > > + } > > + > > + public void process(WatchedEvent event) { > > + System.out.println("latch:" + path + " " + event.getPath()); > > + this.eventPath = event.getPath(); > > + latch.countDown(); > > + } > > + > > + public boolean matches() throws InterruptedException { > > + if (!latch.await(CONNECTION_TIMEOUT / 30, > TimeUnit.MILLISECONDS)) { > > + return false; > > + } > > When I first saw this, I was concerned that there could be false test > failures > due to the await() timing out before process() would be called. However, I > think I was wrong about this because you always seem to call matches() on > a > ZooKeeper client on which you've first called close(), which I presume > means > that all pending watcher notifications must have been delivered. Is this > what > you have found or know to be true? I think it's worth documenting this. > > > + return path.equals(eventPath); > > + } > > + } > > + > > + private class MyCallback implements AsyncCallback.VoidCallback { > > + private final String path; > > + private final int rc; > > + private String eventPath; > > + private int eventRc; > > + private CountDownLatch latch = new CountDownLatch(1); > > + > > + public MyCallback(int rc, String path) { > > + this.rc = rc; > > + this.path = path; > > + } > > + > > + @Override > > + public void processResult(int rc, String eventPath, Object ctx) > { > > + System.out.println("latch:" + path + " " + eventPath); > > + this.eventPath = eventPath; > > + this.eventRc = rc; > > + this.latch.countDown(); > > + } > > + > > + public boolean matches() throws InterruptedException { > > + if (!latch.await(CONNECTION_TIMEOUT / 60, > TimeUnit.MILLISECONDS)) { > > + return false; > > + } > > + return path.equals(eventPath) && rc == eventRc; > > + } > > + } > > + > > + @Test > > + public void testRemoveWatcher() throws IOException, > InterruptedException, KeeperException { > > + ZooKeeper zk1 = createClient(); > > + ZooKeeper zk2 = createClient(); > > + try { > > + zk1.create("/node1", null, Ids.OPEN_ACL_UNSAFE, > CreateMode.EPHEMERAL); > > + zk1.create("/node2", null, Ids.OPEN_ACL_UNSAFE, > CreateMode.EPHEMERAL); > > + MyWatcher w1 = new MyWatcher("/node1"); > > + Assert.assertNotNull(zk2.exists("/node1", w1)); > > + MyWatcher w2 = new MyWatcher("/node2"); > > + Assert.assertNotNull(zk2.exists("/node2", w2)); > > + zk2.removeWatches("/node2", w2, WatcherType.Data); > > + try { > > + zk2.removeWatches("/node2", w2, WatcherType.Children); > > + Assert.fail("Didn't complain about unexisting > watcher."); > > + } catch (KeeperException e) { > > Maybe catch NoWatcherException specifically? > > > + // should always happen > > + } > > + if (zk1 != null) { > > + zk1.close(); > > + zk1 = null; > > + } > > + Assert.assertTrue(w1.matches()); > > + Assert.assertFalse(w2.matches()); > > + Assert.assertNull(zk2.exists("/node1", false)); > > + Assert.assertNull(zk2.exists("/node2", false)); > > + } finally { > > + if (zk1 != null) > > + zk1.close(); > > + if (zk2 != null) > > + zk2.close(); > > + } > > + } > > + > > + @Test > > + public void testMultipleWatchers() throws IOException, > InterruptedException, KeeperException { > > + ZooKeeper zk1 = createClient(); > > + ZooKeeper zk2 = createClient(); > > + try { > > + zk1.create("/node1", null, Ids.OPEN_ACL_UNSAFE, > CreateMode.EPHEMERAL); > > + MyWatcher w1 = new MyWatcher("/node1"); > > + Assert.assertNotNull(zk2.exists("/node1", w1)); > > + MyWatcher w2 = new MyWatcher("/node1"); > > + Assert.assertNotNull(zk2.exists("/node1", w2)); > > + zk2.removeWatches("/node1", w2, WatcherType.Data); > > + if (zk1 != null) { > > + zk1.close(); > > + zk1 = null; > > + } > > + Assert.assertTrue(w1.matches()); > > + Assert.assertFalse(w2.matches()); > > + Assert.assertNull(zk2.exists("/node1", false)); > > + } finally { > > + if (zk1 != null) > > + zk1.close(); > > + if (zk2 != null) > > + zk2.close(); > > + } > > + } > > + > > + @Test > > + public void testMultipleTypes() throws IOException, > InterruptedException, KeeperException { > > + ZooKeeper zk1 = createClient(); > > + ZooKeeper zk2 = createClient(); > > + try { > > + zk1.create("/node1", null, Ids.OPEN_ACL_UNSAFE, > CreateMode.PERSISTENT); > > + MyWatcher w1 = new MyWatcher("/node1"); > > + Assert.assertNotNull(zk2.exists("/node1", w1)); > > + Assert.assertNotNull(zk2.getChildren("/node1", w1)); > > + zk2.removeWatches("/node1", w1, WatcherType.Data); > > + zk1.create("/node1/child", null, Ids.OPEN_ACL_UNSAFE, > CreateMode.EPHEMERAL); > > + Assert.assertTrue(w1.matches()); > > + Assert.assertNotNull(zk2.exists("/node1", false)); > > + Assert.assertNotNull(zk2.exists("/node1/child", false)); > > + } finally { > > + if (zk1 != null) > > + zk1.close(); > > + if (zk2 != null) > > + zk2.close(); > > + } > > + } > > + > > + @Test > > + public void testRemoveAll() throws IOException, > InterruptedException, KeeperException { > > + ZooKeeper zk1 = createClient(); > > + ZooKeeper zk2 = createClient(); > > + try { > > + zk1.create("/node1", null, Ids.OPEN_ACL_UNSAFE, > CreateMode.EPHEMERAL); > > + MyWatcher w1 = new MyWatcher("/node1"); > > + Assert.assertNotNull(zk2.exists("/node1", w1)); > > + MyWatcher w2 = new MyWatcher("/node1"); > > + Assert.assertNotNull(zk2.exists("/node1", w2)); > > + zk2.removeWatches("/node1", null, WatcherType.Data); > > + if (zk1 != null) { > > + zk1.close(); > > + zk1 = null; > > + } > > + Assert.assertFalse(w1.matches()); > > + Assert.assertFalse(w2.matches()); > > + Assert.assertNull(zk2.exists("/node1", false)); > > + } finally { > > + if (zk1 != null) > > + zk1.close(); > > + if (zk2 != null) > > + zk2.close(); > > + } > > + } > > + > > + @Test > > + public void testRemoveAsync() throws IOException, > InterruptedException, KeeperException { > > + ZooKeeper zk1 = createClient(); > > + ZooKeeper zk2 = createClient(); > > + try { > > + zk1.create("/node1", null, Ids.OPEN_ACL_UNSAFE, > CreateMode.EPHEMERAL); > > + MyCallback c1 = new > MyCallback(KeeperException.Code.NOWATCHER.intValue(), "/node1"); > > + try { > > + zk1.removeWatches("/node1", null, WatcherType.Children, > c1, null); > > + Assert.fail("Didn't raise exception"); > > + } catch (KeeperException ke) { > > + Assert.assertEquals(KeeperException.Code.NOWATCHER, > ke.code()); > > + } > > + Assert.assertTrue(c1.matches()); > > + MyWatcher w1 = new MyWatcher("/node1"); > > + Assert.assertNotNull(zk2.exists("/node1", w1)); > > + MyWatcher w2 = new MyWatcher("/node1"); > > + Assert.assertNotNull(zk2.exists("/node1", w2)); > > + MyCallback c2 = new > MyCallback(KeeperException.Code.OK.intValue(), "/node1"); > > + zk2.removeWatches("/node1", null, WatcherType.Data, c2, > null); > > + Assert.assertTrue(c2.matches()); > > + if (zk1 != null) { > > + zk1.close(); > > + zk1 = null; > > + } > > + Assert.assertFalse(w1.matches()); > > + Assert.assertFalse(w2.matches()); > > + Assert.assertNull(zk2.exists("/node1", false)); > > + } finally { > > + if (zk1 != null) > > + zk1.close(); > > + if (zk2 != null) > > + zk2.close(); > > + } > > + } > > +} > > diff --git a/src/zookeeper.jute b/src/zookeeper.jute > > --- a/src/zookeeper.jute > > +++ b/src/zookeeper.jute > > @@ -145,6 +145,10 @@ module org.apache.zookeeper.proto { > > ustring path; > > int max; > > } > > + class RemoveWatchesRequest { > > + ustring path; > > + int type; > > + } > > class SyncRequest { > > ustring path; > > } > > -- > Robert Crocombe > >
