This is an automated email from the ASF dual-hosted git repository.

eolivelli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/zookeeper.git


The following commit(s) were added to refs/heads/master by this push:
     new d99c9e8  ZOOKEEPER-837: Eliminate cycle dependency between ClientCnxn 
and ZooKeeper
d99c9e8 is described below

commit d99c9e8b701ebdbd3f0098d4d81696df97432262
Author: tison <wander4...@gmail.com>
AuthorDate: Sun May 17 15:13:37 2020 +0200

    ZOOKEEPER-837: Eliminate cycle dependency between ClientCnxn and ZooKeeper
    
    1. Extract ZKWatchManager to single file
    2. Move ZKWatchManager instance to ClientCnxn in order to eliminate cycle 
dependency
    3. let `ZooKeeper` syncs a copy of default watcher, in order to reduce 
dependencies to `getWatchManager()`
    
    Author: tison <wander4...@gmail.com>
    
    Reviewers: Enrico Olivelli <eolive...@apache.org>, Andor Molnar 
<an...@apache.org>
    
    Closes #1095 from TisonKun/ZOOKEEPER-837
---
 .../main/java/org/apache/zookeeper/ClientCnxn.java | 110 ++---
 .../org/apache/zookeeper/WatchDeregistration.java  |   1 -
 .../java/org/apache/zookeeper/ZKWatchManager.java  | 455 +++++++++++++++++++
 .../main/java/org/apache/zookeeper/ZooKeeper.java  | 504 +++------------------
 .../zookeeper/ClientCnxnSocketFragilityTest.java   |  35 +-
 .../org/apache/zookeeper/ClientReconnectTest.java  |  10 +-
 .../apache/zookeeper/ClientRequestTimeoutTest.java |  37 +-
 .../org/apache/zookeeper/RemoveWatchesTest.java    |  70 ++-
 .../org/apache/zookeeper/TestableZooKeeper.java    |  70 +--
 9 files changed, 651 insertions(+), 641 deletions(-)

diff --git 
a/zookeeper-server/src/main/java/org/apache/zookeeper/ClientCnxn.java 
b/zookeeper-server/src/main/java/org/apache/zookeeper/ClientCnxn.java
index c87d3cb..87094b8 100644
--- a/zookeeper-server/src/main/java/org/apache/zookeeper/ClientCnxn.java
+++ b/zookeeper-server/src/main/java/org/apache/zookeeper/ClientCnxn.java
@@ -165,13 +165,11 @@ public class ClientCnxn {
 
     private final int sessionTimeout;
 
-    private final ZooKeeper zooKeeper;
-
-    private final ClientWatchManager watcher;
+    private final ZKWatchManager watchManager;
 
     private long sessionId;
 
-    private byte[] sessionPasswd = new byte[16];
+    private byte[] sessionPasswd;
 
     /**
      * If true, the connection is allowed to go to r-o mode. This field's value
@@ -224,6 +222,10 @@ public class ClientCnxn {
      */
     private long requestTimeout;
 
+    ZKWatchManager getWatcherManager() {
+        return watchManager;
+    }
+
     public long getSessionId() {
         return sessionId;
     }
@@ -362,35 +364,29 @@ public class ClientCnxn {
      * established until needed. The start() instance method must be called
      * subsequent to construction.
      *
-     * @param chrootPath - the chroot of this client. Should be removed from 
this Class in ZOOKEEPER-838
-     * @param hostProvider
-     *                the list of ZooKeeper servers to connect to
-     * @param sessionTimeout
-     *                the timeout for connections.
-     * @param zooKeeper
-     *                the zookeeper object that this connection is related to.
-     * @param watcher watcher for this connection
-     * @param clientCnxnSocket
-     *                the socket implementation used (e.g. NIO/Netty)
-     * @param canBeReadOnly
-     *                whether the connection is allowed to go to read-only
-     *                mode in case of partitioning
-     * @throws IOException
+     * @param chrootPath the chroot of this client. Should be removed from 
this Class in ZOOKEEPER-838
+     * @param hostProvider the list of ZooKeeper servers to connect to
+     * @param sessionTimeout the timeout for connections.
+     * @param clientConfig the client configuration.
+     * @param defaultWatcher default watcher for this connection
+     * @param clientCnxnSocket the socket implementation used (e.g. NIO/Netty)
+     * @param canBeReadOnly whether the connection is allowed to go to 
read-only mode in case of partitioning
      */
     public ClientCnxn(
         String chrootPath,
         HostProvider hostProvider,
         int sessionTimeout,
-        ZooKeeper zooKeeper,
-        ClientWatchManager watcher,
+        ZKClientConfig clientConfig,
+        Watcher defaultWatcher,
         ClientCnxnSocket clientCnxnSocket,
-        boolean canBeReadOnly) throws IOException {
+        boolean canBeReadOnly
+    ) throws IOException {
         this(
             chrootPath,
             hostProvider,
             sessionTimeout,
-            zooKeeper,
-            watcher,
+            clientConfig,
+            defaultWatcher,
             clientCnxnSocket,
             0,
             new byte[16],
@@ -402,48 +398,45 @@ public class ClientCnxn {
      * established until needed. The start() instance method must be called
      * subsequent to construction.
      *
-     * @param chrootPath - the chroot of this client. Should be removed from 
this Class in ZOOKEEPER-838
-     * @param hostProvider
-     *                the list of ZooKeeper servers to connect to
-     * @param sessionTimeout
-     *                the timeout for connections.
-     * @param zooKeeper
-     *                the zookeeper object that this connection is related to.
-     * @param watcher watcher for this connection
-     * @param clientCnxnSocket
-     *                the socket implementation used (e.g. NIO/Netty)
+     * @param chrootPath the chroot of this client. Should be removed from 
this Class in ZOOKEEPER-838
+     * @param hostProvider the list of ZooKeeper servers to connect to
+     * @param sessionTimeout the timeout for connections.
+     * @param clientConfig the client configuration.
+     * @param defaultWatcher default watcher for this connection
+     * @param clientCnxnSocket the socket implementation used (e.g. NIO/Netty)
      * @param sessionId session id if re-establishing session
      * @param sessionPasswd session passwd if re-establishing session
-     * @param canBeReadOnly
-     *                whether the connection is allowed to go to read-only
-     *                mode in case of partitioning
+     * @param canBeReadOnly whether the connection is allowed to go to 
read-only mode in case of partitioning
      * @throws IOException in cases of broken network
      */
     public ClientCnxn(
         String chrootPath,
         HostProvider hostProvider,
         int sessionTimeout,
-        ZooKeeper zooKeeper,
-        ClientWatchManager watcher,
+        ZKClientConfig clientConfig,
+        Watcher defaultWatcher,
         ClientCnxnSocket clientCnxnSocket,
         long sessionId,
         byte[] sessionPasswd,
-        boolean canBeReadOnly) throws IOException {
-        this.zooKeeper = zooKeeper;
-        this.watcher = watcher;
+        boolean canBeReadOnly
+    ) throws IOException {
+        this.chrootPath = chrootPath;
+        this.hostProvider = hostProvider;
+        this.sessionTimeout = sessionTimeout;
+        this.clientConfig = clientConfig;
         this.sessionId = sessionId;
         this.sessionPasswd = sessionPasswd;
-        this.sessionTimeout = sessionTimeout;
-        this.hostProvider = hostProvider;
-        this.chrootPath = chrootPath;
+        this.readOnly = canBeReadOnly;
+
+        this.watchManager = new ZKWatchManager(
+                
clientConfig.getBoolean(ZKClientConfig.DISABLE_AUTO_WATCH_RESET),
+                defaultWatcher);
 
-        connectTimeout = sessionTimeout / hostProvider.size();
-        readTimeout = sessionTimeout * 2 / 3;
-        readOnly = canBeReadOnly;
+        this.connectTimeout = sessionTimeout / hostProvider.size();
+        this.readTimeout = sessionTimeout * 2 / 3;
 
-        sendThread = new SendThread(clientCnxnSocket);
-        eventThread = new EventThread();
-        this.clientConfig = zooKeeper.getClientConfig();
+        this.sendThread = new SendThread(clientCnxnSocket);
+        this.eventThread = new EventThread();
         initRequestTimeout();
     }
 
@@ -506,10 +499,9 @@ public class ClientCnxn {
             final Set<Watcher> watchers;
             if (materializedWatchers == null) {
                 // materialize the watchers based on the event
-                watchers = watcher.materialize(event.getState(), 
event.getType(), event.getPath());
+                watchers = watchManager.materialize(event.getState(), 
event.getType(), event.getPath());
             } else {
-                watchers = new HashSet<Watcher>();
-                watchers.addAll(materializedWatchers);
+                watchers = new HashSet<>(materializedWatchers);
             }
             WatcherSetEventPair pair = new WatcherSetEventPair(watchers, 
event);
             // queue the pair (watch set & event) for later processing
@@ -1007,14 +999,12 @@ public class ClientCnxn {
             ConnectRequest conReq = new ConnectRequest(0, lastZxid, 
sessionTimeout, sessId, sessionPasswd);
             // We add backwards since we are pushing into the front
             // Only send if there's a pending watch
-            // TODO: here we have the only remaining use of zooKeeper in
-            // this class. It's to be eliminated!
             if 
(!clientConfig.getBoolean(ZKClientConfig.DISABLE_AUTO_WATCH_RESET)) {
-                List<String> dataWatches = zooKeeper.getDataWatches();
-                List<String> existWatches = zooKeeper.getExistWatches();
-                List<String> childWatches = zooKeeper.getChildWatches();
-                List<String> persistentWatches = 
zooKeeper.getPersistentWatches();
-                List<String> persistentRecursiveWatches = 
zooKeeper.getPersistentRecursiveWatches();
+                List<String> dataWatches = watchManager.getDataWatchList();
+                List<String> existWatches = watchManager.getExistWatchList();
+                List<String> childWatches = watchManager.getChildWatchList();
+                List<String> persistentWatches = 
watchManager.getPersistentWatchList();
+                List<String> persistentRecursiveWatches = 
watchManager.getPersistentRecursiveWatchList();
                 if (!dataWatches.isEmpty() || !existWatches.isEmpty() || 
!childWatches.isEmpty()
                         || !persistentWatches.isEmpty() || 
!persistentRecursiveWatches.isEmpty()) {
                     Iterator<String> dataWatchesIter = 
prependChroot(dataWatches).iterator();
diff --git 
a/zookeeper-server/src/main/java/org/apache/zookeeper/WatchDeregistration.java 
b/zookeeper-server/src/main/java/org/apache/zookeeper/WatchDeregistration.java
index 16c7f84..710f47b 100644
--- 
a/zookeeper-server/src/main/java/org/apache/zookeeper/WatchDeregistration.java
+++ 
b/zookeeper-server/src/main/java/org/apache/zookeeper/WatchDeregistration.java
@@ -22,7 +22,6 @@ import java.util.Map;
 import java.util.Set;
 import org.apache.zookeeper.Watcher.Event.EventType;
 import org.apache.zookeeper.Watcher.WatcherType;
-import org.apache.zookeeper.ZooKeeper.ZKWatchManager;
 
 /**
  * Handles the special case of removing watches which has registered for a
diff --git 
a/zookeeper-server/src/main/java/org/apache/zookeeper/ZKWatchManager.java 
b/zookeeper-server/src/main/java/org/apache/zookeeper/ZKWatchManager.java
new file mode 100644
index 0000000..95a07f0
--- /dev/null
+++ b/zookeeper-server/src/main/java/org/apache/zookeeper/ZKWatchManager.java
@@ -0,0 +1,455 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import org.apache.zookeeper.server.watch.PathParentIterator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Manage watchers and handle events generated by the {@link ClientCnxn} 
object.
+ *
+ * This class is intended to be packaged-private so that it doesn't serve
+ * as part of ZooKeeper client API.
+ */
+class ZKWatchManager implements ClientWatchManager {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(ZKWatchManager.class);
+
+    private final Map<String, Set<Watcher>> dataWatches = new HashMap<>();
+    private final Map<String, Set<Watcher>> existWatches = new HashMap<>();
+    private final Map<String, Set<Watcher>> childWatches = new HashMap<>();
+    private final Map<String, Set<Watcher>> persistentWatches = new 
HashMap<>();
+    private final Map<String, Set<Watcher>> persistentRecursiveWatches = new 
HashMap<>();
+    private final boolean disableAutoWatchReset;
+
+    private volatile Watcher defaultWatcher;
+
+    ZKWatchManager(boolean disableAutoWatchReset, Watcher defaultWatcher) {
+        this.disableAutoWatchReset = disableAutoWatchReset;
+        this.defaultWatcher = defaultWatcher;
+    }
+
+    void setDefaultWatcher(Watcher defaultWatcher) {
+        this.defaultWatcher = defaultWatcher;
+    }
+
+    Watcher getDefaultWatcher() {
+        return defaultWatcher;
+    }
+
+    List<String> getDataWatchList() {
+        synchronized (dataWatches) {
+            return new ArrayList<>(dataWatches.keySet());
+        }
+    }
+
+    List<String> getChildWatchList() {
+        synchronized (childWatches) {
+            return new ArrayList<>(childWatches.keySet());
+        }
+    }
+
+    List<String> getExistWatchList() {
+        synchronized (existWatches) {
+            return new ArrayList<>(existWatches.keySet());
+        }
+    }
+
+    List<String> getPersistentWatchList() {
+        synchronized (persistentWatches) {
+            return new ArrayList<>(persistentWatches.keySet());
+        }
+    }
+
+    List<String> getPersistentRecursiveWatchList() {
+        synchronized (persistentRecursiveWatches) {
+            return new ArrayList<>(persistentRecursiveWatches.keySet());
+        }
+    }
+
+    Map<String, Set<Watcher>> getDataWatches() {
+        return dataWatches;
+    }
+
+    Map<String, Set<Watcher>> getExistWatches() {
+        return existWatches;
+    }
+
+    Map<String, Set<Watcher>> getChildWatches() {
+        return childWatches;
+    }
+
+    Map<String, Set<Watcher>> getPersistentWatches() {
+        return persistentWatches;
+    }
+
+    Map<String, Set<Watcher>> getPersistentRecursiveWatches() {
+        return persistentRecursiveWatches;
+    }
+
+    private void addTo(Set<Watcher> from, Set<Watcher> to) {
+        if (from != null) {
+            to.addAll(from);
+        }
+    }
+
+    public Map<Watcher.Event.EventType, Set<Watcher>> removeWatcher(
+        String clientPath,
+        Watcher watcher,
+        Watcher.WatcherType watcherType,
+        boolean local,
+        int rc
+    ) throws KeeperException {
+        // Validate the provided znode path contains the given watcher of
+        // watcherType
+        containsWatcher(clientPath, watcher, watcherType);
+
+        Map<Watcher.Event.EventType, Set<Watcher>> removedWatchers = new 
HashMap<>();
+        HashSet<Watcher> childWatchersToRem = new HashSet<>();
+        removedWatchers.put(Watcher.Event.EventType.ChildWatchRemoved, 
childWatchersToRem);
+        HashSet<Watcher> dataWatchersToRem = new HashSet<>();
+        removedWatchers.put(Watcher.Event.EventType.DataWatchRemoved, 
dataWatchersToRem);
+        HashSet<Watcher> persistentWatchersToRem = new HashSet<>();
+        removedWatchers.put(Watcher.Event.EventType.PersistentWatchRemoved, 
persistentWatchersToRem);
+        boolean removedWatcher = false;
+        switch (watcherType) {
+        case Children: {
+            synchronized (childWatches) {
+                removedWatcher = removeWatches(childWatches, watcher, 
clientPath, local, rc, childWatchersToRem);
+            }
+            break;
+        }
+        case Data: {
+            synchronized (dataWatches) {
+                removedWatcher = removeWatches(dataWatches, watcher, 
clientPath, local, rc, dataWatchersToRem);
+            }
+
+            synchronized (existWatches) {
+                boolean removedDataWatcher = removeWatches(existWatches, 
watcher, clientPath, local, rc, dataWatchersToRem);
+                removedWatcher |= removedDataWatcher;
+            }
+            break;
+        }
+        case Any: {
+            synchronized (childWatches) {
+                removedWatcher = removeWatches(childWatches, watcher, 
clientPath, local, rc, childWatchersToRem);
+            }
+
+            synchronized (dataWatches) {
+                boolean removedDataWatcher = removeWatches(dataWatches, 
watcher, clientPath, local, rc, dataWatchersToRem);
+                removedWatcher |= removedDataWatcher;
+            }
+
+            synchronized (existWatches) {
+                boolean removedDataWatcher = removeWatches(existWatches, 
watcher, clientPath, local, rc, dataWatchersToRem);
+                removedWatcher |= removedDataWatcher;
+            }
+
+            synchronized (persistentWatches) {
+                boolean removedPersistentWatcher = 
removeWatches(persistentWatches,
+                        watcher, clientPath, local, rc, 
persistentWatchersToRem);
+                removedWatcher |= removedPersistentWatcher;
+            }
+
+            synchronized (persistentRecursiveWatches) {
+                boolean removedPersistentRecursiveWatcher = 
removeWatches(persistentRecursiveWatches,
+                        watcher, clientPath, local, rc, 
persistentWatchersToRem);
+                removedWatcher |= removedPersistentRecursiveWatcher;
+            }
+        }
+        }
+        // Watcher function doesn't exists for the specified params
+        if (!removedWatcher) {
+            throw new KeeperException.NoWatcherException(clientPath);
+        }
+        return removedWatchers;
+    }
+
+    private boolean contains(String path, Watcher watcherObj, Map<String, 
Set<Watcher>> pathVsWatchers) {
+        boolean watcherExists = true;
+        if (pathVsWatchers == null || pathVsWatchers.size() == 0) {
+            watcherExists = false;
+        } else {
+            Set<Watcher> watchers = pathVsWatchers.get(path);
+            if (watchers == null) {
+                watcherExists = false;
+            } else if (watcherObj == null) {
+                watcherExists = watchers.size() > 0;
+            } else {
+                watcherExists = watchers.contains(watcherObj);
+            }
+        }
+        return watcherExists;
+    }
+
+    /**
+     * Validate the provided znode path contains the given watcher and
+     * watcherType
+     *
+     * @param path
+     *            - client path
+     * @param watcher
+     *            - watcher object reference
+     * @param watcherType
+     *            - type of the watcher
+     * @throws KeeperException.NoWatcherException
+     */
+    void containsWatcher(String path, Watcher watcher, Watcher.WatcherType 
watcherType) throws
+            KeeperException.NoWatcherException {
+        boolean containsWatcher = false;
+        switch (watcherType) {
+        case Children: {
+            synchronized (childWatches) {
+                containsWatcher = contains(path, watcher, childWatches);
+            }
+
+            synchronized (persistentWatches) {
+                boolean contains_temp = contains(path, watcher,
+                        persistentWatches);
+                containsWatcher |= contains_temp;
+            }
+
+            synchronized (persistentRecursiveWatches) {
+                boolean contains_temp = contains(path, watcher,
+                        persistentRecursiveWatches);
+                containsWatcher |= contains_temp;
+            }
+            break;
+        }
+        case Data: {
+            synchronized (dataWatches) {
+                containsWatcher = contains(path, watcher, dataWatches);
+            }
+
+            synchronized (existWatches) {
+                boolean contains_temp = contains(path, watcher, existWatches);
+                containsWatcher |= contains_temp;
+            }
+
+            synchronized (persistentWatches) {
+                boolean contains_temp = contains(path, watcher,
+                        persistentWatches);
+                containsWatcher |= contains_temp;
+            }
+
+            synchronized (persistentRecursiveWatches) {
+                boolean contains_temp = contains(path, watcher,
+                        persistentRecursiveWatches);
+                containsWatcher |= contains_temp;
+            }
+            break;
+        }
+        case Any: {
+            synchronized (childWatches) {
+                containsWatcher = contains(path, watcher, childWatches);
+            }
+
+            synchronized (dataWatches) {
+                boolean contains_temp = contains(path, watcher, dataWatches);
+                containsWatcher |= contains_temp;
+            }
+
+            synchronized (existWatches) {
+                boolean contains_temp = contains(path, watcher, existWatches);
+                containsWatcher |= contains_temp;
+            }
+
+            synchronized (persistentWatches) {
+                boolean contains_temp = contains(path, watcher,
+                        persistentWatches);
+                containsWatcher |= contains_temp;
+            }
+
+            synchronized (persistentRecursiveWatches) {
+                boolean contains_temp = contains(path, watcher,
+                        persistentRecursiveWatches);
+                containsWatcher |= contains_temp;
+            }
+        }
+        }
+        // Watcher function doesn't exists for the specified params
+        if (!containsWatcher) {
+            throw new KeeperException.NoWatcherException(path);
+        }
+    }
+
+    protected boolean removeWatches(
+        Map<String, Set<Watcher>> pathVsWatcher,
+        Watcher watcher,
+        String path,
+        boolean local,
+        int rc,
+        Set<Watcher> removedWatchers) throws KeeperException {
+        if (!local && rc != KeeperException.Code.OK.intValue()) {
+            throw KeeperException.create(KeeperException.Code.get(rc), path);
+        }
+        boolean success = false;
+        // When local flag is true, remove watchers for the given path
+        // irrespective of rc. Otherwise shouldn't remove watchers locally
+        // when sees failure from server.
+        if (rc == KeeperException.Code.OK.intValue() || (local && rc != 
KeeperException.Code.OK.intValue())) {
+            // Remove all the watchers for the given path
+            if (watcher == null) {
+                Set<Watcher> pathWatchers = pathVsWatcher.remove(path);
+                if (pathWatchers != null) {
+                    // found path watchers
+                    removedWatchers.addAll(pathWatchers);
+                    success = true;
+                }
+            } else {
+                Set<Watcher> watchers = pathVsWatcher.get(path);
+                if (watchers != null) {
+                    if (watchers.remove(watcher)) {
+                        // found path watcher
+                        removedWatchers.add(watcher);
+                        // cleanup <path vs watchlist>
+                        if (watchers.size() <= 0) {
+                            pathVsWatcher.remove(path);
+                        }
+                        success = true;
+                    }
+                }
+            }
+        }
+        return success;
+    }
+
+    /* (non-Javadoc)
+     * @see 
org.apache.zookeeper.ClientWatchManager#materialize(Event.KeeperState,
+     *                                                        Event.EventType, 
java.lang.String)
+     */
+    @Override
+    public Set<Watcher> materialize(
+        Watcher.Event.KeeperState state,
+        Watcher.Event.EventType type,
+        String clientPath
+    ) {
+        final Set<Watcher> result = new HashSet<>();
+
+        switch (type) {
+        case None:
+            if (defaultWatcher != null) {
+                result.add(defaultWatcher);
+            }
+
+            boolean clear = disableAutoWatchReset && state != 
Watcher.Event.KeeperState.SyncConnected;
+            synchronized (dataWatches) {
+                for (Set<Watcher> ws : dataWatches.values()) {
+                    result.addAll(ws);
+                }
+                if (clear) {
+                    dataWatches.clear();
+                }
+            }
+
+            synchronized (existWatches) {
+                for (Set<Watcher> ws : existWatches.values()) {
+                    result.addAll(ws);
+                }
+                if (clear) {
+                    existWatches.clear();
+                }
+            }
+
+            synchronized (childWatches) {
+                for (Set<Watcher> ws : childWatches.values()) {
+                    result.addAll(ws);
+                }
+                if (clear) {
+                    childWatches.clear();
+                }
+            }
+
+            synchronized (persistentWatches) {
+                for (Set<Watcher> ws: persistentWatches.values()) {
+                    result.addAll(ws);
+                }
+            }
+
+            synchronized (persistentRecursiveWatches) {
+                for (Set<Watcher> ws: persistentRecursiveWatches.values()) {
+                    result.addAll(ws);
+                }
+            }
+
+            return result;
+        case NodeDataChanged:
+        case NodeCreated:
+            synchronized (dataWatches) {
+                addTo(dataWatches.remove(clientPath), result);
+            }
+            synchronized (existWatches) {
+                addTo(existWatches.remove(clientPath), result);
+            }
+            addPersistentWatches(clientPath, result);
+            break;
+        case NodeChildrenChanged:
+            synchronized (childWatches) {
+                addTo(childWatches.remove(clientPath), result);
+            }
+            addPersistentWatches(clientPath, result);
+            break;
+        case NodeDeleted:
+            synchronized (dataWatches) {
+                addTo(dataWatches.remove(clientPath), result);
+            }
+            // TODO This shouldn't be needed, but just in case
+            synchronized (existWatches) {
+                Set<Watcher> list = existWatches.remove(clientPath);
+                if (list != null) {
+                    addTo(list, result);
+                    LOG.warn("We are triggering an exists watch for delete! 
Shouldn't happen!");
+                }
+            }
+            synchronized (childWatches) {
+                addTo(childWatches.remove(clientPath), result);
+            }
+            addPersistentWatches(clientPath, result);
+            break;
+        default:
+            String errorMsg = String.format(
+                "Unhandled watch event type %s with state %s on path %s",
+                type,
+                state,
+                clientPath);
+            LOG.error(errorMsg);
+            throw new RuntimeException(errorMsg);
+        }
+
+        return result;
+    }
+
+    private void addPersistentWatches(String clientPath, Set<Watcher> result) {
+        synchronized (persistentWatches) {
+            addTo(persistentWatches.get(clientPath), result);
+        }
+        synchronized (persistentRecursiveWatches) {
+            for (String path : 
PathParentIterator.forAll(clientPath).asIterable()) {
+                addTo(persistentRecursiveWatches.get(path), result);
+            }
+        }
+    }
+}
diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/ZooKeeper.java 
b/zookeeper-server/src/main/java/org/apache/zookeeper/ZooKeeper.java
index 0210493..201cf20 100644
--- a/zookeeper-server/src/main/java/org/apache/zookeeper/ZooKeeper.java
+++ b/zookeeper-server/src/main/java/org/apache/zookeeper/ZooKeeper.java
@@ -24,7 +24,6 @@ import java.net.InetSocketAddress;
 import java.net.SocketAddress;
 import java.util.ArrayList;
 import java.util.Collection;
-import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
@@ -40,10 +39,7 @@ import org.apache.zookeeper.AsyncCallback.MultiCallback;
 import org.apache.zookeeper.AsyncCallback.StatCallback;
 import org.apache.zookeeper.AsyncCallback.StringCallback;
 import org.apache.zookeeper.AsyncCallback.VoidCallback;
-import org.apache.zookeeper.KeeperException.Code;
-import org.apache.zookeeper.KeeperException.NoWatcherException;
 import org.apache.zookeeper.OpResult.ErrorResult;
-import org.apache.zookeeper.Watcher.Event.EventType;
 import org.apache.zookeeper.Watcher.WatcherType;
 import org.apache.zookeeper.client.ConnectStringParser;
 import org.apache.zookeeper.client.HostProvider;
@@ -85,7 +81,6 @@ import org.apache.zookeeper.proto.SyncRequest;
 import org.apache.zookeeper.proto.SyncResponse;
 import org.apache.zookeeper.server.DataTree;
 import org.apache.zookeeper.server.EphemeralType;
-import org.apache.zookeeper.server.watch.PathParentIterator;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -231,8 +226,6 @@ public class ZooKeeper implements AutoCloseable {
         return cnxn.zooKeeperSaslClient;
     }
 
-    protected final ZKWatchManager watchManager;
-
     private final ZKClientConfig clientConfig;
 
     public ZKClientConfig getClientConfig() {
@@ -240,407 +233,37 @@ public class ZooKeeper implements AutoCloseable {
     }
 
     protected List<String> getDataWatches() {
-        synchronized (watchManager.dataWatches) {
-            List<String> rc = new 
ArrayList<String>(watchManager.dataWatches.keySet());
-            return rc;
-        }
+        return getWatchManager().getDataWatchList();
     }
+
     protected List<String> getExistWatches() {
-        synchronized (watchManager.existWatches) {
-            List<String> rc = new 
ArrayList<String>(watchManager.existWatches.keySet());
-            return rc;
-        }
+        return getWatchManager().getExistWatchList();
     }
+
     protected List<String> getChildWatches() {
-        synchronized (watchManager.childWatches) {
-            List<String> rc = new 
ArrayList<String>(watchManager.childWatches.keySet());
-            return rc;
-        }
+        return getWatchManager().getChildWatchList();
     }
+
     protected List<String> getPersistentWatches() {
-        synchronized (watchManager.persistentWatches) {
-            List<String> rc = new 
ArrayList<String>(watchManager.persistentWatches.keySet());
-            return rc;
-        }
+        return getWatchManager().getPersistentWatchList();
     }
+
     protected List<String> getPersistentRecursiveWatches() {
-        synchronized (watchManager.persistentRecursiveWatches) {
-            List<String> rc = new 
ArrayList<String>(watchManager.persistentRecursiveWatches.keySet());
-            return rc;
-        }
+        return getWatchManager().getPersistentRecursiveWatchList();
     }
 
-    /**
-     * Manage watchers and handle events generated by the ClientCnxn object.
-     *
-     * We are implementing this as a nested class of ZooKeeper so that
-     * the public methods will not be exposed as part of the ZooKeeper client
-     * API.
-     */
-    static class ZKWatchManager implements ClientWatchManager {
-
-        private final Map<String, Set<Watcher>> dataWatches = new 
HashMap<String, Set<Watcher>>();
-        private final Map<String, Set<Watcher>> existWatches = new 
HashMap<String, Set<Watcher>>();
-        private final Map<String, Set<Watcher>> childWatches = new 
HashMap<String, Set<Watcher>>();
-        private final Map<String, Set<Watcher>> persistentWatches = new 
HashMap<String, Set<Watcher>>();
-        private final Map<String, Set<Watcher>> persistentRecursiveWatches = 
new HashMap<String, Set<Watcher>>();
-        private boolean disableAutoWatchReset;
-
-        ZKWatchManager(boolean disableAutoWatchReset) {
-            this.disableAutoWatchReset = disableAutoWatchReset;
-        }
-
-        protected volatile Watcher defaultWatcher;
-
-        private void addTo(Set<Watcher> from, Set<Watcher> to) {
-            if (from != null) {
-                to.addAll(from);
-            }
-        }
-
-        public Map<EventType, Set<Watcher>> removeWatcher(
-            String clientPath,
-            Watcher watcher,
-            WatcherType watcherType,
-            boolean local,
-            int rc) throws KeeperException {
-            // Validate the provided znode path contains the given watcher of
-            // watcherType
-            containsWatcher(clientPath, watcher, watcherType);
-
-            Map<EventType, Set<Watcher>> removedWatchers = new HashMap<>();
-            HashSet<Watcher> childWatchersToRem = new HashSet<>();
-            removedWatchers.put(EventType.ChildWatchRemoved, 
childWatchersToRem);
-            HashSet<Watcher> dataWatchersToRem = new HashSet<>();
-            removedWatchers.put(EventType.DataWatchRemoved, dataWatchersToRem);
-            HashSet<Watcher> persistentWatchersToRem = new HashSet<>();
-            removedWatchers.put(EventType.PersistentWatchRemoved, 
persistentWatchersToRem);
-            boolean removedWatcher = false;
-            switch (watcherType) {
-            case Children: {
-                synchronized (childWatches) {
-                    removedWatcher = removeWatches(childWatches, watcher, 
clientPath, local, rc, childWatchersToRem);
-                }
-                break;
-            }
-            case Data: {
-                synchronized (dataWatches) {
-                    removedWatcher = removeWatches(dataWatches, watcher, 
clientPath, local, rc, dataWatchersToRem);
-                }
-
-                synchronized (existWatches) {
-                    boolean removedDataWatcher = removeWatches(existWatches, 
watcher, clientPath, local, rc, dataWatchersToRem);
-                    removedWatcher |= removedDataWatcher;
-                }
-                break;
-            }
-            case Any: {
-                synchronized (childWatches) {
-                    removedWatcher = removeWatches(childWatches, watcher, 
clientPath, local, rc, childWatchersToRem);
-                }
-
-                synchronized (dataWatches) {
-                    boolean removedDataWatcher = removeWatches(dataWatches, 
watcher, clientPath, local, rc, dataWatchersToRem);
-                    removedWatcher |= removedDataWatcher;
-                }
-
-                synchronized (existWatches) {
-                    boolean removedDataWatcher = removeWatches(existWatches, 
watcher, clientPath, local, rc, dataWatchersToRem);
-                    removedWatcher |= removedDataWatcher;
-                }
-
-                synchronized (persistentWatches) {
-                    boolean removedPersistentWatcher = 
removeWatches(persistentWatches,
-                            watcher, clientPath, local, rc, 
persistentWatchersToRem);
-                    removedWatcher |= removedPersistentWatcher;
-                }
-
-                synchronized (persistentRecursiveWatches) {
-                    boolean removedPersistentRecursiveWatcher = 
removeWatches(persistentRecursiveWatches,
-                            watcher, clientPath, local, rc, 
persistentWatchersToRem);
-                    removedWatcher |= removedPersistentRecursiveWatcher;
-                }
-            }
-            }
-            // Watcher function doesn't exists for the specified params
-            if (!removedWatcher) {
-                throw new KeeperException.NoWatcherException(clientPath);
-            }
-            return removedWatchers;
-        }
-
-        private boolean contains(String path, Watcher watcherObj, Map<String, 
Set<Watcher>> pathVsWatchers) {
-            boolean watcherExists = true;
-            if (pathVsWatchers == null || pathVsWatchers.size() == 0) {
-                watcherExists = false;
-            } else {
-                Set<Watcher> watchers = pathVsWatchers.get(path);
-                if (watchers == null) {
-                    watcherExists = false;
-                } else if (watcherObj == null) {
-                    watcherExists = watchers.size() > 0;
-                } else {
-                    watcherExists = watchers.contains(watcherObj);
-                }
-            }
-            return watcherExists;
-        }
-
-        /**
-         * Validate the provided znode path contains the given watcher and
-         * watcherType
-         *
-         * @param path
-         *            - client path
-         * @param watcher
-         *            - watcher object reference
-         * @param watcherType
-         *            - type of the watcher
-         * @throws NoWatcherException
-         */
-        void containsWatcher(String path, Watcher watcher, WatcherType 
watcherType) throws NoWatcherException {
-            boolean containsWatcher = false;
-            switch (watcherType) {
-            case Children: {
-                synchronized (childWatches) {
-                    containsWatcher = contains(path, watcher, childWatches);
-                }
-
-                synchronized (persistentWatches) {
-                    boolean contains_temp = contains(path, watcher,
-                            persistentWatches);
-                    containsWatcher |= contains_temp;
-                }
-
-                synchronized (persistentRecursiveWatches) {
-                    boolean contains_temp = contains(path, watcher,
-                            persistentRecursiveWatches);
-                    containsWatcher |= contains_temp;
-                }
-                break;
-            }
-            case Data: {
-                synchronized (dataWatches) {
-                    containsWatcher = contains(path, watcher, dataWatches);
-                }
-
-                synchronized (existWatches) {
-                    boolean contains_temp = contains(path, watcher, 
existWatches);
-                    containsWatcher |= contains_temp;
-                }
-
-                synchronized (persistentWatches) {
-                    boolean contains_temp = contains(path, watcher,
-                            persistentWatches);
-                    containsWatcher |= contains_temp;
-                }
-
-                synchronized (persistentRecursiveWatches) {
-                    boolean contains_temp = contains(path, watcher,
-                            persistentRecursiveWatches);
-                    containsWatcher |= contains_temp;
-                }
-                break;
-            }
-            case Any: {
-                synchronized (childWatches) {
-                    containsWatcher = contains(path, watcher, childWatches);
-                }
-
-                synchronized (dataWatches) {
-                    boolean contains_temp = contains(path, watcher, 
dataWatches);
-                    containsWatcher |= contains_temp;
-                }
-
-                synchronized (existWatches) {
-                    boolean contains_temp = contains(path, watcher, 
existWatches);
-                    containsWatcher |= contains_temp;
-                }
-
-                synchronized (persistentWatches) {
-                    boolean contains_temp = contains(path, watcher,
-                            persistentWatches);
-                    containsWatcher |= contains_temp;
-                }
-
-                synchronized (persistentRecursiveWatches) {
-                    boolean contains_temp = contains(path, watcher,
-                            persistentRecursiveWatches);
-                    containsWatcher |= contains_temp;
-                }
-            }
-            }
-            // Watcher function doesn't exists for the specified params
-            if (!containsWatcher) {
-                throw new KeeperException.NoWatcherException(path);
-            }
-        }
-
-        protected boolean removeWatches(
-            Map<String, Set<Watcher>> pathVsWatcher,
-            Watcher watcher,
-            String path,
-            boolean local,
-            int rc,
-            Set<Watcher> removedWatchers) throws KeeperException {
-            if (!local && rc != Code.OK.intValue()) {
-                throw KeeperException.create(KeeperException.Code.get(rc), 
path);
-            }
-            boolean success = false;
-            // When local flag is true, remove watchers for the given path
-            // irrespective of rc. Otherwise shouldn't remove watchers locally
-            // when sees failure from server.
-            if (rc == Code.OK.intValue() || (local && rc != 
Code.OK.intValue())) {
-                // Remove all the watchers for the given path
-                if (watcher == null) {
-                    Set<Watcher> pathWatchers = pathVsWatcher.remove(path);
-                    if (pathWatchers != null) {
-                        // found path watchers
-                        removedWatchers.addAll(pathWatchers);
-                        success = true;
-                    }
-                } else {
-                    Set<Watcher> watchers = pathVsWatcher.get(path);
-                    if (watchers != null) {
-                        if (watchers.remove(watcher)) {
-                            // found path watcher
-                            removedWatchers.add(watcher);
-                            // cleanup <path vs watchlist>
-                            if (watchers.size() <= 0) {
-                                pathVsWatcher.remove(path);
-                            }
-                            success = true;
-                        }
-                    }
-                }
-            }
-            return success;
-        }
-
-        /* (non-Javadoc)
-         * @see 
org.apache.zookeeper.ClientWatchManager#materialize(Event.KeeperState,
-         *                                                        
Event.EventType, java.lang.String)
-         */
-        @Override
-        public Set<Watcher> materialize(
-            Watcher.Event.KeeperState state,
-            Watcher.Event.EventType type,
-            String clientPath
-        ) {
-            final Set<Watcher> result = new HashSet<>();
-
-            switch (type) {
-            case None:
-                if (defaultWatcher != null) {
-                    result.add(defaultWatcher);
-                }
-
-                boolean clear = disableAutoWatchReset && state != 
Watcher.Event.KeeperState.SyncConnected;
-                synchronized (dataWatches) {
-                    for (Set<Watcher> ws : dataWatches.values()) {
-                        result.addAll(ws);
-                    }
-                    if (clear) {
-                        dataWatches.clear();
-                    }
-                }
-
-                synchronized (existWatches) {
-                    for (Set<Watcher> ws : existWatches.values()) {
-                        result.addAll(ws);
-                    }
-                    if (clear) {
-                        existWatches.clear();
-                    }
-                }
-
-                synchronized (childWatches) {
-                    for (Set<Watcher> ws : childWatches.values()) {
-                        result.addAll(ws);
-                    }
-                    if (clear) {
-                        childWatches.clear();
-                    }
-                }
-
-                synchronized (persistentWatches) {
-                    for (Set<Watcher> ws: persistentWatches.values()) {
-                        result.addAll(ws);
-                    }
-                }
-
-                synchronized (persistentRecursiveWatches) {
-                    for (Set<Watcher> ws: persistentRecursiveWatches.values()) 
{
-                        result.addAll(ws);
-                    }
-                }
-
-                return result;
-            case NodeDataChanged:
-            case NodeCreated:
-                synchronized (dataWatches) {
-                    addTo(dataWatches.remove(clientPath), result);
-                }
-                synchronized (existWatches) {
-                    addTo(existWatches.remove(clientPath), result);
-                }
-                addPersistentWatches(clientPath, result);
-                break;
-            case NodeChildrenChanged:
-                synchronized (childWatches) {
-                    addTo(childWatches.remove(clientPath), result);
-                }
-                addPersistentWatches(clientPath, result);
-                break;
-            case NodeDeleted:
-                synchronized (dataWatches) {
-                    addTo(dataWatches.remove(clientPath), result);
-                }
-                // TODO This shouldn't be needed, but just in case
-                synchronized (existWatches) {
-                    Set<Watcher> list = existWatches.remove(clientPath);
-                    if (list != null) {
-                        addTo(list, result);
-                        LOG.warn("We are triggering an exists watch for 
delete! Shouldn't happen!");
-                    }
-                }
-                synchronized (childWatches) {
-                    addTo(childWatches.remove(clientPath), result);
-                }
-                addPersistentWatches(clientPath, result);
-                break;
-            default:
-                String errorMsg = String.format(
-                    "Unhandled watch event type %s with state %s on path %s",
-                    type,
-                    state,
-                    clientPath);
-                LOG.error(errorMsg);
-                throw new RuntimeException(errorMsg);
-            }
-
-            return result;
-        }
-
-        private void addPersistentWatches(String clientPath, Set<Watcher> 
result) {
-            synchronized (persistentWatches) {
-                addTo(persistentWatches.get(clientPath), result);
-            }
-            synchronized (persistentRecursiveWatches) {
-                for (String path : 
PathParentIterator.forAll(clientPath).asIterable()) {
-                    addTo(persistentRecursiveWatches.get(path), result);
-                }
-            }
-        }
+    ZKWatchManager getWatchManager() {
+        return cnxn.getWatcherManager();
     }
 
     /**
      * Register a watcher for a particular path.
      */
-    public abstract class WatchRegistration {
+    public abstract static class WatchRegistration {
 
         private Watcher watcher;
         private String clientPath;
+
         public WatchRegistration(Watcher watcher, String clientPath) {
             this.watcher = watcher;
             this.clientPath = clientPath;
@@ -689,7 +312,7 @@ public class ZooKeeper implements AutoCloseable {
 
         @Override
         protected Map<String, Set<Watcher>> getWatches(int rc) {
-            return rc == 0 ? watchManager.dataWatches : 
watchManager.existWatches;
+            return rc == 0 ? getWatchManager().getDataWatches() : 
getWatchManager().getExistWatches();
         }
 
         @Override
@@ -707,7 +330,7 @@ public class ZooKeeper implements AutoCloseable {
 
         @Override
         protected Map<String, Set<Watcher>> getWatches(int rc) {
-            return watchManager.dataWatches;
+            return getWatchManager().getDataWatches();
         }
 
     }
@@ -720,7 +343,7 @@ public class ZooKeeper implements AutoCloseable {
 
         @Override
         protected Map<String, Set<Watcher>> getWatches(int rc) {
-            return watchManager.childWatches;
+            return getWatchManager().getChildWatches();
         }
 
     }
@@ -737,9 +360,9 @@ public class ZooKeeper implements AutoCloseable {
         protected Map<String, Set<Watcher>> getWatches(int rc) {
             switch (mode) {
                 case PERSISTENT:
-                    return watchManager.persistentWatches;
+                    return getWatchManager().getPersistentWatches();
                 case PERSISTENT_RECURSIVE:
-                    return watchManager.persistentRecursiveWatches;
+                    return getWatchManager().getPersistentRecursiveWatches();
             }
             throw new IllegalArgumentException("Mode not supported: " + mode);
         }
@@ -989,7 +612,7 @@ public class ZooKeeper implements AutoCloseable {
      *            connects to one in read-only mode, i.e. read requests are
      *            allowed while write requests are not. It continues seeking 
for
      *            majority in the background.
-     * @param aHostProvider
+     * @param hostProvider
      *            use this as HostProvider to enable custom behaviour.
      * @param clientConfig
      *            (added in 3.5.2) passing this conf object gives each client 
the flexibility of
@@ -1004,49 +627,45 @@ public class ZooKeeper implements AutoCloseable {
         int sessionTimeout,
         Watcher watcher,
         boolean canBeReadOnly,
-        HostProvider aHostProvider,
-        ZKClientConfig clientConfig) throws IOException {
+        HostProvider hostProvider,
+        ZKClientConfig clientConfig
+    ) throws IOException {
         LOG.info(
             "Initiating client connection, connectString={} sessionTimeout={} 
watcher={}",
             connectString,
             sessionTimeout,
             watcher);
 
-        if (clientConfig == null) {
-            clientConfig = new ZKClientConfig();
-        }
-        this.clientConfig = clientConfig;
-        watchManager = defaultWatchManager();
-        watchManager.defaultWatcher = watcher;
+        this.clientConfig = clientConfig != null ? clientConfig : new 
ZKClientConfig();
+        this.hostProvider = hostProvider;
         ConnectStringParser connectStringParser = new 
ConnectStringParser(connectString);
-        hostProvider = aHostProvider;
 
         cnxn = createConnection(
             connectStringParser.getChrootPath(),
             hostProvider,
             sessionTimeout,
-            this,
-            watchManager,
+            this.clientConfig,
+            watcher,
             getClientCnxnSocket(),
             canBeReadOnly);
         cnxn.start();
     }
 
-    // @VisibleForTesting
-    protected ClientCnxn createConnection(
+    ClientCnxn createConnection(
         String chrootPath,
         HostProvider hostProvider,
         int sessionTimeout,
-        ZooKeeper zooKeeper,
-        ClientWatchManager watcher,
+        ZKClientConfig clientConfig,
+        Watcher defaultWatcher,
         ClientCnxnSocket clientCnxnSocket,
-        boolean canBeReadOnly) throws IOException {
+        boolean canBeReadOnly
+    ) throws IOException {
         return new ClientCnxn(
             chrootPath,
             hostProvider,
             sessionTimeout,
-            this,
-            watchManager,
+            clientConfig,
+            defaultWatcher,
             clientCnxnSocket,
             canBeReadOnly);
     }
@@ -1383,7 +1002,7 @@ public class ZooKeeper implements AutoCloseable {
      *            connects to one in read-only mode, i.e. read requests are
      *            allowed while write requests are not. It continues seeking 
for
      *            majority in the background.
-     * @param aHostProvider
+     * @param hostProvider
      *            use this as HostProvider to enable custom behaviour.
      * @param clientConfig
      *            (added in 3.5.2) passing this conf object gives each client 
the flexibility of
@@ -1400,7 +1019,7 @@ public class ZooKeeper implements AutoCloseable {
         long sessionId,
         byte[] sessionPasswd,
         boolean canBeReadOnly,
-        HostProvider aHostProvider,
+        HostProvider hostProvider,
         ZKClientConfig clientConfig) throws IOException {
         LOG.info(
             "Initiating client connection, connectString={} "
@@ -1411,22 +1030,16 @@ public class ZooKeeper implements AutoCloseable {
             Long.toHexString(sessionId),
             (sessionPasswd == null ? "<null>" : "<hidden>"));
 
-        if (clientConfig == null) {
-            clientConfig = new ZKClientConfig();
-        }
-        this.clientConfig = clientConfig;
-        watchManager = defaultWatchManager();
-        watchManager.defaultWatcher = watcher;
-
+        this.clientConfig = clientConfig != null ? clientConfig : new 
ZKClientConfig();
         ConnectStringParser connectStringParser = new 
ConnectStringParser(connectString);
-        hostProvider = aHostProvider;
+        this.hostProvider = hostProvider;
 
         cnxn = new ClientCnxn(
             connectStringParser.getChrootPath(),
             hostProvider,
             sessionTimeout,
-            this,
-            watchManager,
+            this.clientConfig,
+            watcher,
             getClientCnxnSocket(),
             sessionId,
             sessionPasswd,
@@ -1523,11 +1136,6 @@ public class ZooKeeper implements AutoCloseable {
         return new ZooKeeperTestable(cnxn);
     }
 
-    /* Useful for testing watch handling behavior */
-    protected ZKWatchManager defaultWatchManager() {
-        return new 
ZKWatchManager(getClientConfig().getBoolean(ZKClientConfig.DISABLE_AUTO_WATCH_RESET));
-    }
-
     /**
      * The session id for this ZooKeeper client instance. The value returned is
      * not valid until the client connects to a server and may change after a
@@ -1582,11 +1190,9 @@ public class ZooKeeper implements AutoCloseable {
     /**
      * Specify the default watcher for the connection (overrides the one
      * specified during construction).
-     *
-     * @param watcher
      */
     public synchronized void register(Watcher watcher) {
-        watchManager.defaultWatcher = watcher;
+        getWatchManager().setDefaultWatcher(watcher);
     }
 
     /**
@@ -3213,9 +2819,11 @@ public class ZooKeeper implements AutoCloseable {
      *  error code.
      * @since 3.6.0
      */
-    public void addWatch(String basePath, AddWatchMode mode)
-            throws KeeperException, InterruptedException {
-        addWatch(basePath, watchManager.defaultWatcher, mode);
+    public void addWatch(
+            String basePath,
+            AddWatchMode mode
+    ) throws KeeperException, InterruptedException {
+        addWatch(basePath, getWatchManager().getDefaultWatcher(), mode);
     }
 
     /**
@@ -3229,8 +2837,12 @@ public class ZooKeeper implements AutoCloseable {
      * @throws IllegalArgumentException if an invalid path is specified
      * @since 3.6.0
      */
-    public void addWatch(String basePath, Watcher watcher, AddWatchMode mode,
-                         VoidCallback cb, Object ctx) {
+    public void addWatch(
+            String basePath,
+            Watcher watcher, AddWatchMode mode,
+            VoidCallback cb,
+            Object ctx
+    ) {
         PathUtils.validatePath(basePath);
         String serverPath = prependChroot(basePath);
 
@@ -3251,9 +2863,8 @@ public class ZooKeeper implements AutoCloseable {
      * @throws IllegalArgumentException if an invalid path is specified
      * @since 3.6.0
      */
-    public void addWatch(String basePath, AddWatchMode mode,
-                         VoidCallback cb, Object ctx) {
-        addWatch(basePath, watchManager.defaultWatcher, mode, cb, ctx);
+    public void addWatch(String basePath, AddWatchMode mode, VoidCallback cb, 
Object ctx) {
+        addWatch(basePath, getWatchManager().getDefaultWatcher(), mode, cb, 
ctx);
     }
 
     private void validateWatcher(Watcher watcher) {
@@ -3271,7 +2882,7 @@ public class ZooKeeper implements AutoCloseable {
         PathUtils.validatePath(path);
         final String clientPath = path;
         final String serverPath = prependChroot(clientPath);
-        WatchDeregistration wcb = new WatchDeregistration(clientPath, watcher, 
watcherType, local, watchManager);
+        WatchDeregistration wcb = new WatchDeregistration(clientPath, watcher, 
watcherType, local, getWatchManager());
 
         RequestHeader h = new RequestHeader();
         h.setType(opCode);
@@ -3294,7 +2905,7 @@ public class ZooKeeper implements AutoCloseable {
         PathUtils.validatePath(path);
         final String clientPath = path;
         final String serverPath = prependChroot(clientPath);
-        WatchDeregistration wcb = new WatchDeregistration(clientPath, watcher, 
watcherType, local, watchManager);
+        WatchDeregistration wcb = new WatchDeregistration(clientPath, watcher, 
watcherType, local, getWatchManager());
 
         RequestHeader h = new RequestHeader();
         h.setType(opCode);
@@ -3424,8 +3035,9 @@ public class ZooKeeper implements AutoCloseable {
      */
     private Watcher getDefaultWatcher(boolean required) {
         if (required) {
-            if (watchManager.defaultWatcher != null) {
-                return watchManager.defaultWatcher;
+            final Watcher defaultWatcher = 
getWatchManager().getDefaultWatcher();
+            if (defaultWatcher != null) {
+                return defaultWatcher;
             } else {
                 throw new IllegalStateException("Default watcher is required, 
but it is null.");
             }
diff --git 
a/zookeeper-server/src/test/java/org/apache/zookeeper/ClientCnxnSocketFragilityTest.java
 
b/zookeeper-server/src/test/java/org/apache/zookeeper/ClientCnxnSocketFragilityTest.java
index 2c7f6bd..5ed43fc 100644
--- 
a/zookeeper-server/src/test/java/org/apache/zookeeper/ClientCnxnSocketFragilityTest.java
+++ 
b/zookeeper-server/src/test/java/org/apache/zookeeper/ClientCnxnSocketFragilityTest.java
@@ -281,11 +281,19 @@ public class ClientCnxnSocketFragilityTest extends 
QuorumPeerTestBase {
             String chrootPath,
             HostProvider hostProvider,
             int sessionTimeout,
-            ZooKeeper zooKeeper,
-            ClientWatchManager watcher,
+            ZKClientConfig zkClientConfig,
+            Watcher defaultWatcher,
             ClientCnxnSocket clientCnxnSocket,
-            boolean canBeReadOnly) throws IOException {
-            super(chrootPath, hostProvider, sessionTimeout, zooKeeper, 
watcher, clientCnxnSocket, canBeReadOnly);
+            boolean canBeReadOnly
+        ) throws IOException {
+            super(
+                chrootPath,
+                hostProvider,
+                sessionTimeout,
+                zkClientConfig,
+                defaultWatcher,
+                clientCnxnSocket,
+                canBeReadOnly);
         }
 
         void attemptClose() {
@@ -344,18 +352,25 @@ public class ClientCnxnSocketFragilityTest extends 
QuorumPeerTestBase {
             return cnxn.getState().isAlive();
         }
 
-        @Override
-        protected ClientCnxn createConnection(
+        ClientCnxn createConnection(
             String chrootPath,
             HostProvider hostProvider,
             int sessionTimeout,
-            ZooKeeper zooKeeper,
-            ClientWatchManager watcher,
+            ZKClientConfig clientConfig,
+            Watcher defaultWatcher,
             ClientCnxnSocket clientCnxnSocket,
-            boolean canBeReadOnly) throws IOException {
+            boolean canBeReadOnly
+        ) throws IOException {
             Assert.assertTrue(clientCnxnSocket instanceof 
FragileClientCnxnSocketNIO);
             socket = (FragileClientCnxnSocketNIO) clientCnxnSocket;
-            ClientCnxnSocketFragilityTest.this.cnxn = new 
CustomClientCnxn(chrootPath, hostProvider, sessionTimeout, zooKeeper, watcher, 
clientCnxnSocket, canBeReadOnly);
+            ClientCnxnSocketFragilityTest.this.cnxn = new CustomClientCnxn(
+                chrootPath,
+                hostProvider,
+                sessionTimeout,
+                clientConfig,
+                defaultWatcher,
+                clientCnxnSocket,
+                canBeReadOnly);
             return ClientCnxnSocketFragilityTest.this.cnxn;
         }
     }
diff --git 
a/zookeeper-server/src/test/java/org/apache/zookeeper/ClientReconnectTest.java 
b/zookeeper-server/src/test/java/org/apache/zookeeper/ClientReconnectTest.java
index e77283c..41f52ea 100644
--- 
a/zookeeper-server/src/test/java/org/apache/zookeeper/ClientReconnectTest.java
+++ 
b/zookeeper-server/src/test/java/org/apache/zookeeper/ClientReconnectTest.java
@@ -66,8 +66,14 @@ public class ClientReconnectTest extends ZKTestCase {
         sc = SocketChannel.open();
 
         ClientCnxnSocketNIO nioCnxn = new MockCnxn();
-        ClientWatchManager watcher = mock(ClientWatchManager.class);
-        ClientCnxn clientCnxn = new ClientCnxn("tmp", hostProvider, 5000, zk, 
watcher, nioCnxn, false);
+        ClientCnxn clientCnxn = new ClientCnxn(
+            "tmp",
+            hostProvider,
+            5000,
+            zk.getClientConfig(),
+            DummyWatcher.INSTANCE,
+            nioCnxn,
+            false);
         clientCnxn.start();
         countDownLatch.await(5000, TimeUnit.MILLISECONDS);
         assertTrue(countDownLatch.getCount() == 0);
diff --git 
a/zookeeper-server/src/test/java/org/apache/zookeeper/ClientRequestTimeoutTest.java
 
b/zookeeper-server/src/test/java/org/apache/zookeeper/ClientRequestTimeoutTest.java
index 7c3bf51..d4f95ae 100644
--- 
a/zookeeper-server/src/test/java/org/apache/zookeeper/ClientRequestTimeoutTest.java
+++ 
b/zookeeper-server/src/test/java/org/apache/zookeeper/ClientRequestTimeoutTest.java
@@ -25,6 +25,7 @@ import static org.junit.Assert.fail;
 import java.io.IOException;
 import org.apache.zookeeper.ZooDefs.Ids;
 import org.apache.zookeeper.client.HostProvider;
+import org.apache.zookeeper.client.ZKClientConfig;
 import org.apache.zookeeper.server.quorum.QuorumPeerTestBase;
 import org.apache.zookeeper.test.ClientBase;
 import org.apache.zookeeper.test.ClientBase.CountdownWatcher;
@@ -110,15 +111,23 @@ public class ClientRequestTimeoutTest extends 
QuorumPeerTestBase {
 
     class CustomClientCnxn extends ClientCnxn {
 
-        public CustomClientCnxn(
+        CustomClientCnxn(
             String chrootPath,
             HostProvider hostProvider,
             int sessionTimeout,
-            ZooKeeper zooKeeper,
-            ClientWatchManager watcher,
+            ZKClientConfig clientConfig,
+            Watcher defaultWatcher,
             ClientCnxnSocket clientCnxnSocket,
-            boolean canBeReadOnly) throws IOException {
-            super(chrootPath, hostProvider, sessionTimeout, zooKeeper, 
watcher, clientCnxnSocket, canBeReadOnly);
+            boolean canBeReadOnly
+        ) throws IOException {
+            super(
+                chrootPath,
+                hostProvider,
+                sessionTimeout,
+                clientConfig,
+                defaultWatcher,
+                clientCnxnSocket,
+                canBeReadOnly);
         }
 
         @Override
@@ -140,15 +149,23 @@ public class ClientRequestTimeoutTest extends 
QuorumPeerTestBase {
         }
 
         @Override
-        protected ClientCnxn createConnection(
+        ClientCnxn createConnection(
             String chrootPath,
             HostProvider hostProvider,
             int sessionTimeout,
-            ZooKeeper zooKeeper,
-            ClientWatchManager watcher,
+            ZKClientConfig clientConfig,
+            Watcher defaultWatcher,
             ClientCnxnSocket clientCnxnSocket,
-            boolean canBeReadOnly) throws IOException {
-            return new CustomClientCnxn(chrootPath, hostProvider, 
sessionTimeout, zooKeeper, watcher, clientCnxnSocket, canBeReadOnly);
+            boolean canBeReadOnly
+        ) throws IOException {
+            return new CustomClientCnxn(
+                chrootPath,
+                hostProvider,
+                sessionTimeout,
+                clientConfig,
+                defaultWatcher,
+                clientCnxnSocket,
+                canBeReadOnly);
         }
 
     }
diff --git 
a/zookeeper-server/src/test/java/org/apache/zookeeper/RemoveWatchesTest.java 
b/zookeeper-server/src/test/java/org/apache/zookeeper/RemoveWatchesTest.java
index 30f1558..8bb91e3 100644
--- a/zookeeper-server/src/test/java/org/apache/zookeeper/RemoveWatchesTest.java
+++ b/zookeeper-server/src/test/java/org/apache/zookeeper/RemoveWatchesTest.java
@@ -18,12 +18,16 @@
 
 package org.apache.zookeeper;
 
+import static org.hamcrest.CoreMatchers.is;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertThat;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.spy;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -37,11 +41,9 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 import org.apache.commons.collections.CollectionUtils;
 import org.apache.zookeeper.KeeperException.Code;
-import org.apache.zookeeper.KeeperException.NoWatcherException;
 import org.apache.zookeeper.Watcher.Event.EventType;
 import org.apache.zookeeper.Watcher.WatcherType;
 import org.apache.zookeeper.ZooDefs.Ids;
-import org.apache.zookeeper.client.ZKClientConfig;
 import org.apache.zookeeper.server.ServerCnxn;
 import org.apache.zookeeper.test.ClientBase;
 import org.junit.Test;
@@ -657,7 +659,9 @@ public class RemoveWatchesTest extends ClientBase {
     @Test(timeout = 90000)
     public void testNoWatcherServerException() throws InterruptedException, 
IOException, TimeoutException {
         CountdownWatcher watcher = new CountdownWatcher();
-        MyZooKeeper zk = new MyZooKeeper(hostPort, CONNECTION_TIMEOUT, 
watcher);
+        ZooKeeper zk = spy(new ZooKeeper(hostPort, CONNECTION_TIMEOUT, 
watcher));
+        MyWatchManager watchManager = new MyWatchManager(false, watcher);
+        doReturn(watchManager).when(zk).getWatchManager();
         boolean nw = false;
 
         watcher.waitForConnected(CONNECTION_TIMEOUT);
@@ -670,8 +674,8 @@ public class RemoveWatchesTest extends ClientBase {
             }
         }
 
-        assertTrue("Server didn't return NOWATCHER", zk.getRemoveWatchesRC() 
== Code.NOWATCHER.intValue());
-        assertTrue("NoWatcherException didn't happen", nw);
+        assertThat("Server didn't return NOWATCHER", 
watchManager.lastReturnCode, is(Code.NOWATCHER.intValue()));
+        assertThat("NoWatcherException didn't happen", nw, is(true));
     }
 
     /**
@@ -920,54 +924,32 @@ public class RemoveWatchesTest extends ClientBase {
         assertEquals("Received watch notification after removal!", 2, 
watchCount.getCount());
     }
 
-    /* a mocked ZK class that doesn't do client-side verification
-     * before/after calling removeWatches */
-    private class MyZooKeeper extends ZooKeeper {
+    private static class MyWatchManager extends ZKWatchManager {
 
-        class MyWatchManager extends ZKWatchManager {
+        int lastReturnCode;
 
-            public MyWatchManager(boolean disableAutoWatchReset) {
-                super(disableAutoWatchReset);
-            }
-
-            public int lastrc;
-
-            /* Pretend that any watcher exists */
-            void containsWatcher(String path, Watcher watcher, WatcherType 
watcherType) throws NoWatcherException {
-            }
-
-            /* save the return error code by the server */
-            protected boolean removeWatches(
-                Map<String, Set<Watcher>> pathVsWatcher,
-                Watcher watcher,
-                String path,
-                boolean local,
-                int rc,
-                Set<Watcher> removedWatchers) throws KeeperException {
-                lastrc = rc;
-                return false;
-            }
-
-        }
-
-        public MyZooKeeper(String hp, int timeout, Watcher watcher) throws 
IOException {
-            super(hp, timeout, watcher, false);
+        MyWatchManager(boolean disableAutoWatchReset, Watcher defaultWatcher) {
+            super(disableAutoWatchReset, defaultWatcher);
         }
 
-        private MyWatchManager myWatchManager;
-
-        protected ZKWatchManager defaultWatchManager() {
-            myWatchManager = new 
MyWatchManager(getClientConfig().getBoolean(ZKClientConfig.DISABLE_AUTO_WATCH_RESET));
-            return myWatchManager;
+        void containsWatcher(String path, Watcher watcher, WatcherType 
watcherType) {
+            // prevent contains watcher
         }
 
-        public int getRemoveWatchesRC() {
-            return myWatchManager.lastrc;
+        @Override
+        protected boolean removeWatches(
+            Map<String, Set<Watcher>> pathVsWatcher,
+            Watcher watcher,
+            String path,
+            boolean local,
+            int rc,
+            Set<Watcher> removedWatchers) {
+            lastReturnCode = rc;
+            return false;
         }
-
     }
 
-    private class MyWatcher implements Watcher {
+    private static class MyWatcher implements Watcher {
 
         private final String path;
         private String eventPath;
diff --git 
a/zookeeper-server/src/test/java/org/apache/zookeeper/TestableZooKeeper.java 
b/zookeeper-server/src/test/java/org/apache/zookeeper/TestableZooKeeper.java
index a02127a..7f9e41e 100644
--- a/zookeeper-server/src/test/java/org/apache/zookeeper/TestableZooKeeper.java
+++ b/zookeeper-server/src/test/java/org/apache/zookeeper/TestableZooKeeper.java
@@ -20,12 +20,10 @@ package org.apache.zookeeper;
 
 import java.io.IOException;
 import java.net.SocketAddress;
-import java.util.List;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 import org.apache.jute.Record;
 import org.apache.zookeeper.admin.ZooKeeperAdmin;
-import org.apache.zookeeper.client.HostProvider;
 import org.apache.zookeeper.proto.ReplyHeader;
 import org.apache.zookeeper.proto.RequestHeader;
 
@@ -35,76 +33,12 @@ public class TestableZooKeeper extends ZooKeeperAdmin {
         super(host, sessionTimeout, watcher);
     }
 
-    class TestableClientCnxn extends ClientCnxn {
-
-        TestableClientCnxn(
-            String chrootPath,
-            HostProvider hostProvider,
-            int sessionTimeout,
-            ZooKeeper zooKeeper,
-            ClientWatchManager watcher,
-            ClientCnxnSocket clientCnxnSocket,
-            boolean canBeReadOnly) throws IOException {
-            super(chrootPath,
-                  hostProvider,
-                  sessionTimeout,
-                  zooKeeper,
-                  watcher,
-                  clientCnxnSocket,
-                  0,
-                  new byte[16],
-                  canBeReadOnly);
-        }
-
-        void setXid(int newXid) {
-            xid = newXid;
-        }
-
-        int checkXid() {
-            return xid;
-        }
-
-    }
-
-    protected ClientCnxn createConnection(
-        String chrootPath,
-        HostProvider hostProvider,
-        int sessionTimeout,
-        ZooKeeper zooKeeper,
-        ClientWatchManager watcher,
-        ClientCnxnSocket clientCnxnSocket,
-        boolean canBeReadOnly) throws IOException {
-        return new TestableClientCnxn(
-            chrootPath,
-            hostProvider,
-            sessionTimeout,
-            this,
-            watcher,
-            clientCnxnSocket,
-            canBeReadOnly);
-    }
-
     public void setXid(int xid) {
-        ((TestableClientCnxn) cnxn).setXid(xid);
+        cnxn.xid = xid;
     }
 
     public int checkXid() {
-        return ((TestableClientCnxn) cnxn).checkXid();
-    }
-
-    @Override
-    public List<String> getChildWatches() {
-        return super.getChildWatches();
-    }
-
-    @Override
-    public List<String> getDataWatches() {
-        return super.getDataWatches();
-    }
-
-    @Override
-    public List<String> getExistWatches() {
-        return super.getExistWatches();
+        return cnxn.xid;
     }
 
     /**

Reply via email to