It would really be better if you put these comments into reviewboard, so
they will appear there and on the associated JIRA. The mailing list is a
bad place for code reviews.

Thanks!
Camille

On Tue, Jan 17, 2012 at 4:31 PM, Robert Crocombe <[email protected]> wrote:

> Well, I didn't get to look at it when I thought I would, and now I see
> there's a ReviewBoard thing.  I'd been working on some comments on and off:
> perhaps they would be valuable in the sense that they're from someone with
> little understanding of ZooKeeper's internals.  While working through the
> patch I also minimized it by removing whitespace-only changes and putting
> the imports back in their original order (one file dropped out of the patch
> as a result: it was only import order changes): I've attached that in case
> it might be of some use.  My comments are inline below.
>
> > diff --git a/src/java/main/org/apache/zookeeper/ClientCnxn.java
> b/src/java/main/org/apache/zookeeper/ClientCnxn.java
> > --- a/src/java/main/org/apache/zookeeper/ClientCnxn.java
> > +++ b/src/java/main/org/apache/zookeeper/ClientCnxn.java
> > @@ -53,6 +53,7 @@ import org.apache.zookeeper.Watcher.Even
> >  import org.apache.zookeeper.Watcher.Event.KeeperState;
> >  import org.apache.zookeeper.ZooDefs.OpCode;
> >  import org.apache.zookeeper.ZooKeeper.States;
> > +import org.apache.zookeeper.ZooKeeper.WatchDeregistration;
> >  import org.apache.zookeeper.ZooKeeper.WatchRegistration;
> >  import org.apache.zookeeper.client.HostProvider;
> >  import org.apache.zookeeper.client.ZooKeeperSaslClient;
> > @@ -203,6 +204,10 @@ public class ClientCnxn {
> >          return negotiatedSessionTimeout;
> >      }
> >
> > +    public Object getEventThreadLock() {
> > +        return eventThreadLock;
> > +    }
> > +
> >      @Override
> >      public String toString() {
> >          StringBuilder sb = new StringBuilder();
> > @@ -251,6 +256,8 @@ public class ClientCnxn {
> >
> >          WatchRegistration watchRegistration;
> >
>  > +        WatchDeregistration watchDeregistration;
> > +
> >          /** Convenience ctor */
> >          Packet(RequestHeader requestHeader, ReplyHeader replyHeader,
> >                 Record request, Record response,
> > @@ -380,6 +387,10 @@ public class ClientCnxn {
> >
> >      }
> >
> > +    public void queueCallback(AsyncCallback cb, int rc, String path,
> Object ctx) {
> > +        eventThread.queueCallback(cb, rc, path, ctx);
> > +    }
> > +
> >      // used by ZooKeeperSaslClient.queueSaslPacket().
> >      public void queuePacket(RequestHeader h, ReplyHeader r, Record
> request,
> >              Record response, AsyncCallback cb) {
> > @@ -424,6 +435,20 @@ public class ClientCnxn {
> >          }
> >      }
> >
> > +    private static class LocalCallback {
> > +        private final AsyncCallback cb;
> > +        private final int rc;
> > +        private final String path;
> > +        private final Object ctx;
> > +
>  > +        public LocalCallback(AsyncCallback cb, int rc, String path,
> Object ctx) {
> > +            this.cb = cb;
> > +            this.rc = rc;
> > +            this.path = path;
> > +            this.ctx = ctx;
> > +        }
> > +    }
> > +
> >      /**
> >       * Guard against creating
> "-EventThread-EventThread-EventThread-..." thread
> >       * names when ZooKeeper object is being created from within a
> watcher.
> > @@ -435,6 +460,8 @@ public class ClientCnxn {
> >          return name + suffix;
> >      }
>
> There should be documentation for what this lock synchronizes.  I believe
> the
> concern is that ZKWatchManager.materialize() and ZkWatchManager.remove()
> both
> access the members dataWatches, existWatches, and childWatches.  However,
> within both materialize() and remove() there are individual synchronized
> blocks protecting the maps individually.  Is the concern therefore that you
> need a lock across all of them simultaneously, or have I missed something?
>  If
> not, mightn't it better to simply make materialize() and remove() both
> synchronized, or does that introduce some bottleneck (I don't see it)?
>
> I suppose this change introduces the minimum amount of synchronization
> necessary (least change vs. existing behavior), but I guess I'd prefer to
> see
> it fixed for any potential uses cases now rather than have the next person
> discover belatedly that he/she needs to have this kind of locking.
>
> > +    private final Object eventThreadLock = new Object();
> > +
> >      class EventThread extends Thread {
> >          private final LinkedBlockingQueue<Object> waitingEvents =
> >              new LinkedBlockingQueue<Object>();
> > @@ -454,6 +481,10 @@ public class ClientCnxn {
> >              setDaemon(true);
> >          }
> >
> > +        public void queueCallback(AsyncCallback cb, int rc, String
> path, Object ctx) {
> > +            waitingEvents.add(new LocalCallback(cb, rc, path, ctx));
> > +        }
> > +
> >          public void queueEvent(WatchedEvent event) {
> >              if (event.getType() == EventType.None
> >                      && sessionState == event.getState()) {
> > @@ -461,13 +492,15 @@ public class ClientCnxn {
> >              }
> >              sessionState = event.getState();
> >
> > -            // materialize the watchers based on the event
> > -            WatcherSetEventPair pair = new WatcherSetEventPair(
> > -                    watcher.materialize(event.getState(),
> event.getType(),
> > -                            event.getPath()),
> > -                            event);
> > -            // queue the pair (watch set & event) for later processing
> > -            waitingEvents.add(pair);
> > +            synchronized (eventThreadLock) {
> > +                // materialize the watchers based on the event
> > +                WatcherSetEventPair pair = new WatcherSetEventPair(
> > +                        watcher.materialize(event.getState(),
> event.getType(),
> > +                                event.getPath()),
> > +                                event);
> > +                // queue the pair (watch set & event) for later
> processing
> > +                waitingEvents.add(pair);
> > +            }
> >          }
> >
> >         public void queuePacket(Packet packet) {
> > @@ -523,6 +556,23 @@ public class ClientCnxn {
> >                            LOG.error("Error while calling watcher ", t);
> >                        }
> >                    }
> > +              } else if (event instanceof LocalCallback) {
> > +                  LocalCallback lcb = (LocalCallback) event;
> > +                  if (lcb.cb instanceof StatCallback) {
> > +                      ((StatCallback)lcb.cb).processResult(lcb.rc,
> lcb.path, lcb.ctx, null);
> > +                  } else if (lcb.cb instanceof DataCallback) {
> > +                      ((DataCallback)lcb.cb).processResult(lcb.rc,
> lcb.path, lcb.ctx, null, null);
> > +                  } else if (lcb.cb instanceof ACLCallback) {
> > +                      ((ACLCallback)lcb.cb).processResult(lcb.rc,
> lcb.path, lcb.ctx, null, null);
> > +                  } else if (lcb.cb instanceof ChildrenCallback) {
> > +                      ((ChildrenCallback)lcb.cb).processResult(lcb.rc,
> lcb.path, lcb.ctx, null);
> > +                  } else if (lcb.cb instanceof Children2Callback) {
> > +                      ((Children2Callback)lcb.cb).processResult(lcb.rc,
> lcb.path, lcb.ctx, null, null);
> > +                  } else if (lcb.cb instanceof StringCallback) {
> > +                      ((StringCallback)lcb.cb).processResult(lcb.rc,
> lcb.path, lcb.ctx, null);
> > +                  } else {
> > +                      ((VoidCallback)lcb.cb).processResult(lcb.rc,
> lcb.path, lcb.ctx);
> > +                  }
>
> Ugh.  I suppose there's no help for it, though.
>
> >                } else {
> >                    Packet p = (Packet) event;
> >                    int rc = 0;
> > @@ -629,6 +679,9 @@ public class ClientCnxn {
> >          if (p.watchRegistration != null) {
> >              p.watchRegistration.register(p.replyHeader.getErr());
> >          }
> > +        if (p.watchDeregistration != null) {
> > +            p.watchDeregistration.unregister(p.replyHeader.getErr());
> > +        }
> >
> >          if (p.cb == null) {
> >              synchronized (p) {
> > @@ -1250,10 +1303,16 @@ public class ClientCnxn {
> >
> >      public ReplyHeader submitRequest(RequestHeader h, Record request,
> >              Record response, WatchRegistration watchRegistration)
> > +            throws InterruptedException  {
> > +        return submitRequest(h, request, response, watchRegistration,
> null);
> > +    }
> > +
> > +    public ReplyHeader submitRequest(RequestHeader h, Record request,
> > +            Record response, WatchRegistration watchRegistration,
> WatchDeregistration watchDeregistration)
> >              throws InterruptedException {
> >          ReplyHeader r = new ReplyHeader();
> >          Packet packet = queuePacket(h, r, request, response, null,
> null, null,
> > -                    null, watchRegistration);
> > +                    null, watchRegistration, watchDeregistration);
> >          synchronized (packet) {
> >              while (!packet.finished) {
> >                  packet.wait();
> > @@ -1266,6 +1325,14 @@ public class ClientCnxn {
> >              Record response, AsyncCallback cb, String clientPath,
> >              String serverPath, Object ctx, WatchRegistration
> watchRegistration)
> >      {
> > +        return queuePacket(h, r, request, response, cb, clientPath,
> serverPath, ctx, watchRegistration, null);
> > +    }
> > +
> > +    Packet queuePacket(RequestHeader h, ReplyHeader r, Record request,
> > +            Record response, AsyncCallback cb, String clientPath,
> > +            String serverPath, Object ctx, WatchRegistration
> watchRegistration,
> > +            WatchDeregistration watchDeregistration)
> > +    {
> >          Packet packet = null;
> >          synchronized (outgoingQueue) {
> >              if (h.getType() != OpCode.ping && h.getType() !=
> OpCode.auth) {
> > @@ -1276,6 +1343,7 @@ public class ClientCnxn {
> >              packet.ctx = ctx;
> >              packet.clientPath = clientPath;
> >              packet.serverPath = serverPath;
> > +            packet.watchDeregistration = watchDeregistration;
> >              if (!state.isAlive() || closing) {
> >                  conLossPacket(packet);
> >              } else {
> > diff --git a/src/java/main/org/apache/zookeeper/KeeperException.java
> b/src/java/main/org/apache/zookeeper/KeeperException.java
> > --- a/src/java/main/org/apache/zookeeper/KeeperException.java
> > +++ b/src/java/main/org/apache/zookeeper/KeeperException.java
> > @@ -131,7 +131,8 @@ public abstract class KeeperException ex
> >                  return new SessionMovedException();
> >              case NOTREADONLY:
> >                  return new NotReadOnlyException();
> > -
> > +            case NOWATCHER:
> > +                return new NoWatcherException();
> >              case OK:
> >              default:
> >                  throw new IllegalArgumentException("Invalid exception
> code");
> > @@ -345,7 +346,9 @@ public abstract class KeeperException ex
> >          /** Session moved to another server, so operation is ignored */
> >          SESSIONMOVED (-118),
> >          /** State-changing request is passed to read-only server */
> > -        NOTREADONLY (-119);
> > +        NOTREADONLY (-119),
> > +        /** Tried to remove inexistent watcher */
> > +        NOWATCHER (-120);
>
> "nonexistent"?
>
> >
> >          private static final Map<Integer,Code> lookup
> >              = new HashMap<Integer,Code>();
> > @@ -422,6 +425,8 @@ public abstract class KeeperException ex
> >                  return "Session moved";
> >              case NOTREADONLY:
> >                  return "Not a read-only call";
> > +            case NOWATCHER:
> > +                return "No such watcher";
> >              default:
> >                  return "Unknown error " + code;
> >          }
> > @@ -696,4 +701,13 @@ public abstract class KeeperException ex
> >              super(Code.UNIMPLEMENTED);
> >          }
> >      }
> > +
> > +    /**
> > +     * @see Code#NOWATCHER
> > +     */
> > +    public static class NoWatcherException extends KeeperException {
> > +        public NoWatcherException() {
> > +            super(Code.NOWATCHER);
> > +        }
> > +    }
> >  }
> > diff --git a/src/java/main/org/apache/zookeeper/Watcher.java
> b/src/java/main/org/apache/zookeeper/Watcher.java
> > --- a/src/java/main/org/apache/zookeeper/Watcher.java
> > +++ b/src/java/main/org/apache/zookeeper/Watcher.java
> > @@ -145,5 +145,10 @@ public interface Watcher {
> >          }
> >      }
> >
> > +    public enum WatcherType {
> > +        Children,
> > +        Data,
> > +    }
> > +
> >      abstract public void process(WatchedEvent event);
> >  }
> > diff --git a/src/java/main/org/apache/zookeeper/ZooDefs.java
> b/src/java/main/org/apache/zookeeper/ZooDefs.java
> > --- a/src/java/main/org/apache/zookeeper/ZooDefs.java
> > +++ b/src/java/main/org/apache/zookeeper/ZooDefs.java
> > @@ -54,6 +54,8 @@ public class ZooDefs {
> >
> >          public final int multi = 14;
> >
> > +        public final int removeWatches = 15;
> > +
> >          public final int auth = 100;
> >
> >          public final int setWatches = 101;
> > diff --git a/src/java/main/org/apache/zookeeper/ZooKeeper.java
> b/src/java/main/org/apache/zookeeper/ZooKeeper.java
> > --- a/src/java/main/org/apache/zookeeper/ZooKeeper.java
> > +++ b/src/java/main/org/apache/zookeeper/ZooKeeper.java
> > @@ -18,8 +18,11 @@
> >
> >  package org.apache.zookeeper;
> >
> > +import java.util.concurrent.Exchanger;
> >  import org.apache.zookeeper.AsyncCallback.*;
> > +import org.apache.zookeeper.KeeperException.Code;
> >  import org.apache.zookeeper.OpResult.ErrorResult;
> > +import org.apache.zookeeper.Watcher.WatcherType;
> >  import org.apache.zookeeper.client.ConnectStringParser;
> >  import org.apache.zookeeper.client.HostProvider;
> >  import org.apache.zookeeper.client.StaticHostProvider;
> > @@ -222,6 +225,108 @@ public class ZooKeeper {
> >
> >              return result;
> >          }
> > +
> > +        private boolean containsWatcher (Map<String,Set<Watcher>>
> watches, Watcher watcher, String path) {
> > +            Set<Watcher> watchers = watches.get(path);
> > +            if (watchers == null) {
> > +                return false;
> > +            }
> > +            if (watcher != null) {
> > +                return watchers.contains(watcher);
> > +            } else {
> > +                return true;
> > +            }
> > +        }
> > +
> > +        private boolean removeWatches (Map<String,Set<Watcher>>
> watches, Watcher watcher, String path, boolean local) {
> > +            if (!local && watcher == null) {
> > +                // Remove all watchers
> > +                watches.remove(path);
> > +                return true;
> > +            }
> > +            Set<Watcher> watchers = watches.get(path);
> > +            if (local && (watcher == null || watchers.size() == 1)) {
> > +                return false;
> > +            } else {
> > +                watchers.remove(watcher);
> > +                return true;
> > +            }
> > +        }
>
> Please elaborate on why 'local' is necessary: I see that remove() below is
> called with 'true' in ZooKeeper.removeWatches() and 'false' in
> WatcherDeregistration.unregister(), but I decided I was too lazy to parse
> the
> semantics of the behavior difference in the two cases and why they are
> necessary without any documentation.
>
> > +
> > +        /**
> > +         * Removes a watcher from the interest lists.
> > +         *
> > +         * @param clientPath path where the watcher is set
> > +         * @param watcher concrete watcher object or null for any
> watcher set on the znode
> > +         * @param type watcher type
> > +         * @param local whether the call is local to the client or has
> ben triggered by a server's message
> > +         * @return false if the removal can't be done local to the
> client
> > +         * @throws KeeperException if no such watcher exists
> > +         */
> > +        public boolean remove(String clientPath, Watcher watcher,
> WatcherType type, boolean local) throws KeeperException {
> > +            boolean contains = false;
> > +            switch (type) {
> > +            case Children: {
> > +                synchronized(childWatches) {
> > +                    contains = containsWatcher(childWatches, watcher,
> clientPath);
> > +                    if (contains) {
> > +                        return removeWatches(childWatches, watcher,
> clientPath, local);
> > +                    }
> > +                }
> > +                break;
> > +            }
> > +            case Data: {
> > +                synchronized (dataWatches) {
> > +                    contains = containsWatcher(dataWatches, watcher,
> clientPath);
> > +                    if (contains && removeWatches(dataWatches, watcher,
> clientPath, local)) {
> > +                        return true;
> > +                    }
> > +                }
> > +                synchronized (existWatches) {
> > +                    boolean contains_temp =
> containsWatcher(existWatches, watcher, clientPath);
> > +                    if (contains_temp && removeWatches(existWatches,
> watcher, clientPath, local)) {
> > +                        return true;
> > +                    }
> > +                    contains |= contains_temp;
> > +                }
> > +            }
> > +            }
> > +            if (!contains) {
> > +                throw KeeperException.create(Code.NOWATCHER,
> clientPath);
> > +            }
> > +            return false;
> > +        }
> > +    }
> > +
> > +    /**
> > +     * Unregisters a set of watchers for a particular path.
> > +     */
> > +    class WatchDeregistration {
> > +        private Watcher watcher;
> > +        private WatcherType type;
> > +        private String clientPath;
> > +        public WatchDeregistration(Watcher watcher, WatcherType type,
> String clientPath)
> > +        {
> > +            this.watcher = watcher;
> > +            this.type = type;
> > +            this.clientPath = clientPath;
> > +        }
> > +
> > +        /**
> > +         * Unregisters the watcher with the set of watches on path.
> > +         * @param rc the result code of the operation that attempted to
> > +         * add the watch on the path.
> > +         */
>
> Looks like copy/paste error from WatchRegistration.  The "add" should be
> "remove".
>
> > +        public void unregister(int rc) {
> > +            if (rc != 0)
> > +                return;
> > +            try {
> > +                watchManager.remove(clientPath, watcher, type, false);
> > +            } catch (KeeperException e) {
> > +                // Shouldn't happen
> > +                throw new RuntimeException(e);
> > +            }
> > +        }
> >      }
> >
> >      /**
> > @@ -1629,6 +1734,118 @@ public class ZooKeeper {
> >      }
> >
> >      /**
> > +     * For the given znode removes the specified watchers.
> > +     * <p>
> > +     * If watcher is null all watchers will be removed, otherwise only
> the
> > +     * specified watcher will be deleted.
> > +     * <p>
> > +     * A successfull call guarantees that the removed watcher won't be
> triggered.
> > +     * This call is also intended to avoid watch leaks both client and
> serverside.
> > +     * <p>
> > +     * A KeeperException with error code KeeperException.NoWatcher will
> be thrown
> > +     * if no watcher exists that match the specified parameters.
> > +     *
> > +     * TODO
> > +     * @since 3.X.X
> > +     *
> > +     * @param path
> > +     * @param watcher a concrete watcher or null to remove all
> > +     * @param type the type of watcher to be removed
> > +     * @throws InterruptedException If the server transaction is
> interrupted.
> > +     * @throws KeeperException If the server signals an error with a
> non-zero
> > +     *  error code.
> > +     */
> > +    public void removeWatches(String path, Watcher watcher, WatcherType
> type)
> > +            throws InterruptedException, KeeperException
> > +    {
> > +        final Exchanger<Integer> exchanger = new Exchanger<Integer>();
> > +
> > +        VoidCallback cb = new VoidCallback() {
> > +            @Override
> > +            public void processResult(int rc, String path, Object ctx) {
> > +                while(true) {
> > +                    try {
> > +                        exchanger.exchange(rc);
> > +                        return;
> > +                    } catch (InterruptedException e) {
> > +                        // needed?
> > +                        Thread.currentThread().interrupt();
> > +                    }
> > +                }
> > +            }
> > +        };
>
> So it looks like this loop is here to suppress InterruptedExceptions and
> keep
> trying the exchange() until it works.  I believe this was done so that the
> opposing thread in the exchange will not be left blocked in exchange()
> forever (as you noted in the comment below).  I thought I'd found a case
> where
> the loop was infinite if an exception was thrown before queueCallback()
> could
> be called, but it looks like you have that covered, too.  Uhm, seems like
> it'd
> be good to document this since I spent about 10 minutes noodling about how
> this all worked.  I gotta say I'm not a fan of this weird two matching
> loops
> with exception suppression thing, but no alternative comes readily to mind.
>
> I do believe that the
>
> Thread.currentThread().interrupt()
>
> is necessary.
>
> > +
> > +        KeeperException ke = null;
> > +
> > +        // Call the async version
> > +        try {
> > +            removeWatches(path, watcher, type, cb, null);
> > +        } catch (KeeperException e) {
> > +            // Can't rethrow yet, must call exchanger.exchange() to
> unblock eventThread on processResult()
> > +            ke = e;
> > +        }
> > +
> > +        int rc;
> > +        while(true) {
> > +            try {
> > +                rc = exchanger.exchange(0);
> > +                break;
> > +            } catch (InterruptedException e) {
> > +                // needed?
> > +                Thread.currentThread().interrupt();
> > +            }
> > +        }
> > +
> > +        if (ke != null) {
> > +            throw ke;
> > +        }
> > +
> > +        if (rc != 0) {
> > +            throw KeeperException.create(KeeperException.Code.get(rc),
> path);
> > +        }
> > +    }
> > +
> > +    /**
> > +     * The asynchronous version of removeWatches. It is guaranteed that
> the watcher won't
> > +     * be triggered after the callback is notified.
> > +     *
> > +     * @throws KeeperException
> > +     *
> > +     * @since 3.3.0
> > +     *
> > +     * @see #removeWatches(String, Watcher, Watcher.WatcherType)
> > +     */
> > +    public void removeWatches(String path, Watcher watcher, WatcherType
> type, VoidCallback cb,
> > +            Object ctx) throws KeeperException
> > +    {
>  > +        final String clientPath = path;
> > +        PathUtils.validatePath(clientPath);
> > +
> > +        final String serverPath = prependChroot(clientPath);
> > +
> > +        WatchDeregistration wcb = new WatchDeregistration(watcher,
> type, clientPath);
> > +
> > +        RequestHeader h = new RequestHeader();
> > +        h.setType(ZooDefs.OpCode.removeWatches);
> > +        try {
> > +            synchronized (cnxn.getEventThreadLock()) {
> > +                if (watchManager.remove(clientPath, watcher, type,
> true)) {
> > +                    cnxn.queueCallback(cb,
> KeeperException.Code.OK.intValue(), clientPath, ctx);
> > +                    return;
> > +                }
> > +            }
> > +        } catch (KeeperException ke) {
> > +            cnxn.queueCallback(cb, ke.code().intValue(), clientPath,
> ctx);
> > +            // TODO should we throw the exception or just return and
> let the callback handle this case?
> > +            throw ke;
> > +        }
> > +        RemoveWatchesRequest request = new RemoveWatchesRequest();
> > +        request.setPath(serverPath);
> > +        request.setType(type.ordinal());
> > +        cnxn.queuePacket(h, new ReplyHeader(), request, null, cb,
> clientPath, serverPath, ctx, null, wcb);
> > +    }
> > +
> > +    /**
> >       * Asynchronous sync. Flushes channel between process and leader.
> >       * @param path
> >       * @param cb a handler for the callback
> > diff --git a/src/java/main/org/apache/zookeeper/server/DataTree.java
> b/src/java/main/org/apache/zookeeper/server/DataTree.java
> > --- a/src/java/main/org/apache/zookeeper/server/DataTree.java
> > +++ b/src/java/main/org/apache/zookeeper/server/DataTree.java
> > @@ -48,6 +48,7 @@ import org.apache.zookeeper.KeeperExcept
> >  import org.apache.zookeeper.Watcher.Event;
> >  import org.apache.zookeeper.Watcher.Event.EventType;
> >  import org.apache.zookeeper.Watcher.Event.KeeperState;
> > +import org.apache.zookeeper.Watcher.WatcherType;
> >  import org.apache.zookeeper.ZooDefs.Ids;
> >  import org.apache.zookeeper.ZooDefs.OpCode;
> >  import org.apache.zookeeper.common.PathTrie;
> > @@ -1275,4 +1276,15 @@ public class DataTree {
> >              }
> >          }
> >      }
> > +
> > +    public void removeWatch(String path, WatcherType type, Watcher
> watcher) {
> > +        switch (type) {
> > +        case Children:
> > +            this.childWatches.removeWatcher(path, watcher);
> > +            break;
> > +        case Data:
> > +            this.dataWatches.removeWatcher(path, watcher);
> > +            break;
> > +        }
> > +    }
> >  }
> > diff --git
> a/src/java/main/org/apache/zookeeper/server/FinalRequestProcessor.java
> b/src/java/main/org/apache/zookeeper/server/FinalRequestProcessor.java
> > ---
> a/src/java/main/org/apache/zookeeper/server/FinalRequestProcessor.java
> > +++
> b/src/java/main/org/apache/zookeeper/server/FinalRequestProcessor.java
> > @@ -30,6 +30,7 @@ import org.apache.zookeeper.MultiRespons
> >  import org.apache.zookeeper.ZooDefs;
> >  import org.apache.zookeeper.KeeperException.Code;
> >  import org.apache.zookeeper.KeeperException.SessionMovedException;
> > +import org.apache.zookeeper.Watcher.WatcherType;
> >  import org.apache.zookeeper.ZooDefs.OpCode;
> >  import org.apache.zookeeper.data.ACL;
> >  import org.apache.zookeeper.data.Stat;
> > @@ -44,6 +45,7 @@ import org.apache.zookeeper.proto.GetChi
> >  import org.apache.zookeeper.proto.GetChildrenResponse;
> >  import org.apache.zookeeper.proto.GetDataRequest;
> >  import org.apache.zookeeper.proto.GetDataResponse;
> > +import org.apache.zookeeper.proto.RemoveWatchesRequest;
> >  import org.apache.zookeeper.proto.ReplyHeader;
> >  import org.apache.zookeeper.proto.SetACLResponse;
> >  import org.apache.zookeeper.proto.SetDataResponse;
> > @@ -312,6 +314,14 @@ public class FinalRequestProcessor imple
> >                          setWatches.getChildWatches(), cnxn);
> >                  break;
> >              }
> > +            case OpCode.removeWatches: {
> > +                lastOp = "REMW";
> > +                RemoveWatchesRequest removeWatches = new
> RemoveWatchesRequest();
> > +
>  ByteBufferInputStream.byteBuffer2Record(request.request, removeWatches);
> > +                WatcherType type =
> WatcherType.values()[removeWatches.getType()];
> > +
>  zks.getZKDatabase().removeWatch(removeWatches.getPath(), type, cnxn);
> > +                break;
> > +            }
> >              case OpCode.getACL: {
> >                  lastOp = "GETA";
> >                  GetACLRequest getACLRequest = new GetACLRequest();
> > diff --git a/src/java/main/org/apache/zookeeper/server/Request.java
> b/src/java/main/org/apache/zookeeper/server/Request.java
> > --- a/src/java/main/org/apache/zookeeper/server/Request.java
> > +++ b/src/java/main/org/apache/zookeeper/server/Request.java
> > @@ -132,6 +132,7 @@ public class Request {
> >          case OpCode.ping:
> >          case OpCode.closeSession:
> >          case OpCode.setWatches:
> > +        case OpCode.removeWatches:
> >              return true;
> >          default:
> >              return false;
> > @@ -169,6 +170,8 @@ public class Request {
> >              return "create";
> >          case OpCode.setWatches:
> >              return "setWatches";
> > +        case OpCode.removeWatches:
> > +            return "removeWatches";
> >          case OpCode.delete:
> >              return "delete";
> >          case OpCode.exists:
> > diff --git a/src/java/main/org/apache/zookeeper/server/WatchManager.java
> b/src/java/main/org/apache/zookeeper/server/WatchManager.java
> > --- a/src/java/main/org/apache/zookeeper/server/WatchManager.java
> > +++ b/src/java/main/org/apache/zookeeper/server/WatchManager.java
> > @@ -88,6 +88,22 @@ class WatchManager {
> >          }
> >      }
> >
> > +    synchronized void removeWatcher(String path, Watcher watcher) {
> > +        HashSet<String> paths = watch2Paths.get(watcher);
> > +        if (paths == null) {
> > +            return;
> > +        }
> > +        paths.remove(path);
> > +        HashSet<Watcher> list = watchTable.get(path);
> > +        if (list == null) {
> > +            return;
> > +        }
> > +        list.remove(watcher);
> > +        if (list.size() == 0) {
> > +            watchTable.remove(path);
> > +        }
> > +    }
> > +
> >      Set<Watcher> triggerWatch(String path, EventType type) {
> >          return triggerWatch(path, type, null);
> >      }
> > diff --git a/src/java/main/org/apache/zookeeper/server/ZKDatabase.java
> b/src/java/main/org/apache/zookeeper/server/ZKDatabase.java
> > --- a/src/java/main/org/apache/zookeeper/server/ZKDatabase.java
> > +++ b/src/java/main/org/apache/zookeeper/server/ZKDatabase.java
> > @@ -39,6 +39,7 @@ import org.slf4j.LoggerFactory;
> >  import org.apache.zookeeper.KeeperException;
> >  import org.apache.zookeeper.KeeperException.NoNodeException;
> >  import org.apache.zookeeper.Watcher;
> > +import org.apache.zookeeper.Watcher.WatcherType;
> >  import org.apache.zookeeper.data.ACL;
> >  import org.apache.zookeeper.data.Stat;
> >  import org.apache.zookeeper.server.DataTree.ProcessTxnResult;
> > @@ -373,6 +374,17 @@ public class ZKDatabase {
> >      }
> >
> >      /**
> > +     * remove watch from the datatree
> > +     *
> > +     * @param path node to remove watches from
> > +     * @param type the type of watcher to remove
> > +     * @param watcher the watcher function to remove
>  > +     */
> > +    public void removeWatch(String path, WatcherType type, Watcher
> watcher) {
> > +        dataTree.removeWatch(path, type, watcher);
> > +    }
> > +
> > +    /**
> >       * get acl for a path
> >       * @param path the path to query for acl
> >       * @param stat the stat for the node
> > diff --git
> a/src/java/test/org/apache/zookeeper/test/RemoveWatchesTest.java
> b/src/java/test/org/apache/zookeeper/test/RemoveWatchesTest.java
> > new file mode 100644
> > --- /dev/null
> > +++ b/src/java/test/org/apache/zookeeper/test/RemoveWatchesTest.java
> > @@ -0,0 +1,231 @@
> > +/**
> > + * Licensed to the Apache Software Foundation (ASF) under one
> > + * or more contributor license agreements.  See the NOTICE file
> > + * distributed with this work for additional information
> > + * regarding copyright ownership.  The ASF licenses this file
> > + * to you under the Apache License, Version 2.0 (the
> > + * "License"); you may not use this file except in compliance
> > + * with the License.  You may obtain a copy of the License at
> > + *
> > + *     http://www.apache.org/licenses/LICENSE-2.0
> > + *
> > + * Unless required by applicable law or agreed to in writing, software
> > + * distributed under the License is distributed on an "AS IS" BASIS,
> > + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
> implied.
> > + * See the License for the specific language governing permissions and
> > + * limitations under the License.
> > + */
> > +
> > +package org.apache.zookeeper.test;
> > +
> > +import java.io.IOException;
> > +import java.util.concurrent.CountDownLatch;
> > +import java.util.concurrent.TimeUnit;
> > +
> > +import org.apache.zookeeper.AsyncCallback;
> > +import org.apache.zookeeper.CreateMode;
> > +import org.apache.zookeeper.KeeperException;
> > +import org.apache.zookeeper.WatchedEvent;
> > +import org.apache.zookeeper.Watcher;
> > +import org.apache.zookeeper.Watcher.WatcherType;
> > +import org.apache.zookeeper.ZooDefs.Ids;
> > +import org.apache.zookeeper.ZooKeeper;
> > +import org.junit.Assert;
> > +import org.junit.Test;
> > +
> > +public class RemoveWatchesTest extends ClientBase {
>  > +    private class MyWatcher implements Watcher {
> > +        private final String path;
> > +        private String eventPath;
> > +        private CountDownLatch latch = new CountDownLatch(1);
> > +
> > +        public MyWatcher(String path) {
> > +            this.path = path;
> > +        }
> > +
> > +        public void process(WatchedEvent event) {
> > +            System.out.println("latch:" + path + " " + event.getPath());
> > +            this.eventPath = event.getPath();
> > +            latch.countDown();
> > +        }
> > +
> > +        public boolean matches() throws InterruptedException {
> > +            if (!latch.await(CONNECTION_TIMEOUT / 30,
> TimeUnit.MILLISECONDS)) {
> > +                return false;
> > +            }
>
> When I first saw this, I was concerned that there could be false test
> failures
> due to the await() timing out before process() would be called.  However, I
>  think I was wrong about this because you always seem to call matches() on
> a
> ZooKeeper client on which you've first called close(), which I presume
> means
> that all pending watcher notifications must have been delivered.  Is this
> what
> you have found or know to be true?  I think it's worth documenting this.
>
> > +            return path.equals(eventPath);
> > +        }
> > +    }
> > +
> > +    private class MyCallback implements AsyncCallback.VoidCallback {
> > +        private final String path;
> > +        private final int rc;
> > +        private String eventPath;
> > +        private int eventRc;
> > +        private CountDownLatch latch = new CountDownLatch(1);
> > +
> > +        public MyCallback(int rc, String path) {
> > +            this.rc = rc;
> > +            this.path = path;
> > +        }
> > +
> > +        @Override
> > +        public void processResult(int rc, String eventPath, Object ctx)
> {
> > +            System.out.println("latch:" + path + " " + eventPath);
> > +            this.eventPath = eventPath;
> > +            this.eventRc = rc;
> > +            this.latch.countDown();
> > +        }
> > +
> > +        public boolean matches() throws InterruptedException {
> > +            if (!latch.await(CONNECTION_TIMEOUT / 60,
> TimeUnit.MILLISECONDS)) {
> > +                return false;
> > +            }
> > +            return path.equals(eventPath) && rc == eventRc;
> > +        }
> > +    }
> > +
> > +    @Test
> > +    public void testRemoveWatcher() throws IOException,
> InterruptedException, KeeperException {
> > +        ZooKeeper zk1 = createClient();
> > +        ZooKeeper zk2 = createClient();
> > +        try {
> > +            zk1.create("/node1", null, Ids.OPEN_ACL_UNSAFE,
> CreateMode.EPHEMERAL);
> > +            zk1.create("/node2", null, Ids.OPEN_ACL_UNSAFE,
> CreateMode.EPHEMERAL);
> > +            MyWatcher w1 = new MyWatcher("/node1");
> > +            Assert.assertNotNull(zk2.exists("/node1", w1));
> > +            MyWatcher w2 = new MyWatcher("/node2");
> > +            Assert.assertNotNull(zk2.exists("/node2", w2));
> > +            zk2.removeWatches("/node2", w2, WatcherType.Data);
> > +            try {
> > +                zk2.removeWatches("/node2", w2, WatcherType.Children);
> > +                Assert.fail("Didn't complain about unexisting
> watcher.");
> > +            } catch (KeeperException e) {
>
> Maybe catch NoWatcherException specifically?
>
> > +                // should always happen
> > +            }
> > +            if (zk1 != null) {
> > +                zk1.close();
> > +                zk1 = null;
> > +            }
> > +            Assert.assertTrue(w1.matches());
> > +            Assert.assertFalse(w2.matches());
> > +            Assert.assertNull(zk2.exists("/node1", false));
> > +            Assert.assertNull(zk2.exists("/node2", false));
> > +        } finally {
> > +            if (zk1 != null)
> > +                zk1.close();
> > +            if (zk2 != null)
> > +                zk2.close();
> > +        }
> > +    }
> > +
> > +    @Test
> > +    public void testMultipleWatchers() throws IOException,
> InterruptedException, KeeperException {
> > +        ZooKeeper zk1 = createClient();
> > +        ZooKeeper zk2 = createClient();
> > +        try {
> > +            zk1.create("/node1", null, Ids.OPEN_ACL_UNSAFE,
> CreateMode.EPHEMERAL);
> > +            MyWatcher w1 = new MyWatcher("/node1");
> > +            Assert.assertNotNull(zk2.exists("/node1", w1));
> > +            MyWatcher w2 = new MyWatcher("/node1");
> > +            Assert.assertNotNull(zk2.exists("/node1", w2));
> > +            zk2.removeWatches("/node1", w2, WatcherType.Data);
> > +            if (zk1 != null) {
> > +                zk1.close();
> > +                zk1 = null;
> > +            }
> > +            Assert.assertTrue(w1.matches());
> > +            Assert.assertFalse(w2.matches());
> > +            Assert.assertNull(zk2.exists("/node1", false));
> > +        } finally {
> > +            if (zk1 != null)
> > +                zk1.close();
> > +            if (zk2 != null)
> > +                zk2.close();
> > +        }
> > +    }
> > +
> > +    @Test
> > +    public void testMultipleTypes() throws IOException,
> InterruptedException, KeeperException {
> > +        ZooKeeper zk1 = createClient();
> > +        ZooKeeper zk2 = createClient();
> > +        try {
> > +            zk1.create("/node1", null, Ids.OPEN_ACL_UNSAFE,
> CreateMode.PERSISTENT);
> > +            MyWatcher w1 = new MyWatcher("/node1");
> > +            Assert.assertNotNull(zk2.exists("/node1", w1));
> > +            Assert.assertNotNull(zk2.getChildren("/node1", w1));
> > +            zk2.removeWatches("/node1", w1, WatcherType.Data);
> > +            zk1.create("/node1/child", null, Ids.OPEN_ACL_UNSAFE,
> CreateMode.EPHEMERAL);
> > +            Assert.assertTrue(w1.matches());
> > +            Assert.assertNotNull(zk2.exists("/node1", false));
> > +            Assert.assertNotNull(zk2.exists("/node1/child", false));
> > +        } finally {
>  > +            if (zk1 != null)
> > +                zk1.close();
> > +            if (zk2 != null)
> > +                zk2.close();
> > +        }
> > +    }
> > +
> > +    @Test
> > +    public void testRemoveAll() throws IOException,
> InterruptedException, KeeperException {
> > +        ZooKeeper zk1 = createClient();
> > +        ZooKeeper zk2 = createClient();
> > +        try {
> > +            zk1.create("/node1", null, Ids.OPEN_ACL_UNSAFE,
> CreateMode.EPHEMERAL);
> > +            MyWatcher w1 = new MyWatcher("/node1");
> > +            Assert.assertNotNull(zk2.exists("/node1", w1));
> > +            MyWatcher w2 = new MyWatcher("/node1");
> > +            Assert.assertNotNull(zk2.exists("/node1", w2));
> > +            zk2.removeWatches("/node1", null, WatcherType.Data);
> > +            if (zk1 != null) {
> > +                zk1.close();
> > +                zk1 = null;
> > +            }
> > +            Assert.assertFalse(w1.matches());
> > +            Assert.assertFalse(w2.matches());
> > +            Assert.assertNull(zk2.exists("/node1", false));
> > +        } finally {
> > +            if (zk1 != null)
> > +                zk1.close();
> > +            if (zk2 != null)
> > +                zk2.close();
> > +        }
> > +    }
> > +
> > +    @Test
> > +    public void testRemoveAsync() throws IOException,
> InterruptedException, KeeperException {
> > +        ZooKeeper zk1 = createClient();
> > +        ZooKeeper zk2 = createClient();
> > +        try {
> > +            zk1.create("/node1", null, Ids.OPEN_ACL_UNSAFE,
> CreateMode.EPHEMERAL);
> > +            MyCallback c1 = new
> MyCallback(KeeperException.Code.NOWATCHER.intValue(), "/node1");
> > +            try {
> > +                zk1.removeWatches("/node1", null, WatcherType.Children,
> c1, null);
> > +                Assert.fail("Didn't raise exception");
> > +            } catch (KeeperException ke) {
> > +                Assert.assertEquals(KeeperException.Code.NOWATCHER,
> ke.code());
> > +            }
> > +            Assert.assertTrue(c1.matches());
> > +            MyWatcher w1 = new MyWatcher("/node1");
> > +            Assert.assertNotNull(zk2.exists("/node1", w1));
> > +            MyWatcher w2 = new MyWatcher("/node1");
> > +            Assert.assertNotNull(zk2.exists("/node1", w2));
> > +            MyCallback c2 = new
> MyCallback(KeeperException.Code.OK.intValue(), "/node1");
> > +            zk2.removeWatches("/node1", null, WatcherType.Data, c2,
> null);
> > +            Assert.assertTrue(c2.matches());
> > +            if (zk1 != null) {
> > +                zk1.close();
> > +                zk1 = null;
> > +            }
> > +            Assert.assertFalse(w1.matches());
> > +            Assert.assertFalse(w2.matches());
> > +            Assert.assertNull(zk2.exists("/node1", false));
> > +        } finally {
> > +            if (zk1 != null)
> > +                zk1.close();
> > +            if (zk2 != null)
> > +                zk2.close();
> > +        }
> > +    }
> > +}
> > diff --git a/src/zookeeper.jute b/src/zookeeper.jute
> > --- a/src/zookeeper.jute
> > +++ b/src/zookeeper.jute
> > @@ -145,6 +145,10 @@ module org.apache.zookeeper.proto {
> >          ustring path;
> >          int max;
> >      }
> > +    class RemoveWatchesRequest {
> > +        ustring path;
> > +        int type;
> > +    }
> >      class SyncRequest {
> >          ustring path;
> >      }
>
> --
> Robert Crocombe
>
>

Reply via email to