This is an automated email from the ASF dual-hosted git repository. eolivelli pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/zookeeper.git
The following commit(s) were added to refs/heads/master by this push: new d99c9e8 ZOOKEEPER-837: Eliminate cycle dependency between ClientCnxn and ZooKeeper d99c9e8 is described below commit d99c9e8b701ebdbd3f0098d4d81696df97432262 Author: tison <wander4...@gmail.com> AuthorDate: Sun May 17 15:13:37 2020 +0200 ZOOKEEPER-837: Eliminate cycle dependency between ClientCnxn and ZooKeeper 1. Extract ZKWatchManager to single file 2. Move ZKWatchManager instance to ClientCnxn in order to eliminate cycle dependency 3. let `ZooKeeper` syncs a copy of default watcher, in order to reduce dependencies to `getWatchManager()` Author: tison <wander4...@gmail.com> Reviewers: Enrico Olivelli <eolive...@apache.org>, Andor Molnar <an...@apache.org> Closes #1095 from TisonKun/ZOOKEEPER-837 --- .../main/java/org/apache/zookeeper/ClientCnxn.java | 110 ++--- .../org/apache/zookeeper/WatchDeregistration.java | 1 - .../java/org/apache/zookeeper/ZKWatchManager.java | 455 +++++++++++++++++++ .../main/java/org/apache/zookeeper/ZooKeeper.java | 504 +++------------------ .../zookeeper/ClientCnxnSocketFragilityTest.java | 35 +- .../org/apache/zookeeper/ClientReconnectTest.java | 10 +- .../apache/zookeeper/ClientRequestTimeoutTest.java | 37 +- .../org/apache/zookeeper/RemoveWatchesTest.java | 70 ++- .../org/apache/zookeeper/TestableZooKeeper.java | 70 +-- 9 files changed, 651 insertions(+), 641 deletions(-) diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/ClientCnxn.java b/zookeeper-server/src/main/java/org/apache/zookeeper/ClientCnxn.java index c87d3cb..87094b8 100644 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/ClientCnxn.java +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/ClientCnxn.java @@ -165,13 +165,11 @@ public class ClientCnxn { private final int sessionTimeout; - private final ZooKeeper zooKeeper; - - private final ClientWatchManager watcher; + private final ZKWatchManager watchManager; private long sessionId; - private byte[] sessionPasswd = new byte[16]; + private byte[] sessionPasswd; /** * If true, the connection is allowed to go to r-o mode. This field's value @@ -224,6 +222,10 @@ public class ClientCnxn { */ private long requestTimeout; + ZKWatchManager getWatcherManager() { + return watchManager; + } + public long getSessionId() { return sessionId; } @@ -362,35 +364,29 @@ public class ClientCnxn { * established until needed. The start() instance method must be called * subsequent to construction. * - * @param chrootPath - the chroot of this client. Should be removed from this Class in ZOOKEEPER-838 - * @param hostProvider - * the list of ZooKeeper servers to connect to - * @param sessionTimeout - * the timeout for connections. - * @param zooKeeper - * the zookeeper object that this connection is related to. - * @param watcher watcher for this connection - * @param clientCnxnSocket - * the socket implementation used (e.g. NIO/Netty) - * @param canBeReadOnly - * whether the connection is allowed to go to read-only - * mode in case of partitioning - * @throws IOException + * @param chrootPath the chroot of this client. Should be removed from this Class in ZOOKEEPER-838 + * @param hostProvider the list of ZooKeeper servers to connect to + * @param sessionTimeout the timeout for connections. + * @param clientConfig the client configuration. + * @param defaultWatcher default watcher for this connection + * @param clientCnxnSocket the socket implementation used (e.g. NIO/Netty) + * @param canBeReadOnly whether the connection is allowed to go to read-only mode in case of partitioning */ public ClientCnxn( String chrootPath, HostProvider hostProvider, int sessionTimeout, - ZooKeeper zooKeeper, - ClientWatchManager watcher, + ZKClientConfig clientConfig, + Watcher defaultWatcher, ClientCnxnSocket clientCnxnSocket, - boolean canBeReadOnly) throws IOException { + boolean canBeReadOnly + ) throws IOException { this( chrootPath, hostProvider, sessionTimeout, - zooKeeper, - watcher, + clientConfig, + defaultWatcher, clientCnxnSocket, 0, new byte[16], @@ -402,48 +398,45 @@ public class ClientCnxn { * established until needed. The start() instance method must be called * subsequent to construction. * - * @param chrootPath - the chroot of this client. Should be removed from this Class in ZOOKEEPER-838 - * @param hostProvider - * the list of ZooKeeper servers to connect to - * @param sessionTimeout - * the timeout for connections. - * @param zooKeeper - * the zookeeper object that this connection is related to. - * @param watcher watcher for this connection - * @param clientCnxnSocket - * the socket implementation used (e.g. NIO/Netty) + * @param chrootPath the chroot of this client. Should be removed from this Class in ZOOKEEPER-838 + * @param hostProvider the list of ZooKeeper servers to connect to + * @param sessionTimeout the timeout for connections. + * @param clientConfig the client configuration. + * @param defaultWatcher default watcher for this connection + * @param clientCnxnSocket the socket implementation used (e.g. NIO/Netty) * @param sessionId session id if re-establishing session * @param sessionPasswd session passwd if re-establishing session - * @param canBeReadOnly - * whether the connection is allowed to go to read-only - * mode in case of partitioning + * @param canBeReadOnly whether the connection is allowed to go to read-only mode in case of partitioning * @throws IOException in cases of broken network */ public ClientCnxn( String chrootPath, HostProvider hostProvider, int sessionTimeout, - ZooKeeper zooKeeper, - ClientWatchManager watcher, + ZKClientConfig clientConfig, + Watcher defaultWatcher, ClientCnxnSocket clientCnxnSocket, long sessionId, byte[] sessionPasswd, - boolean canBeReadOnly) throws IOException { - this.zooKeeper = zooKeeper; - this.watcher = watcher; + boolean canBeReadOnly + ) throws IOException { + this.chrootPath = chrootPath; + this.hostProvider = hostProvider; + this.sessionTimeout = sessionTimeout; + this.clientConfig = clientConfig; this.sessionId = sessionId; this.sessionPasswd = sessionPasswd; - this.sessionTimeout = sessionTimeout; - this.hostProvider = hostProvider; - this.chrootPath = chrootPath; + this.readOnly = canBeReadOnly; + + this.watchManager = new ZKWatchManager( + clientConfig.getBoolean(ZKClientConfig.DISABLE_AUTO_WATCH_RESET), + defaultWatcher); - connectTimeout = sessionTimeout / hostProvider.size(); - readTimeout = sessionTimeout * 2 / 3; - readOnly = canBeReadOnly; + this.connectTimeout = sessionTimeout / hostProvider.size(); + this.readTimeout = sessionTimeout * 2 / 3; - sendThread = new SendThread(clientCnxnSocket); - eventThread = new EventThread(); - this.clientConfig = zooKeeper.getClientConfig(); + this.sendThread = new SendThread(clientCnxnSocket); + this.eventThread = new EventThread(); initRequestTimeout(); } @@ -506,10 +499,9 @@ public class ClientCnxn { final Set<Watcher> watchers; if (materializedWatchers == null) { // materialize the watchers based on the event - watchers = watcher.materialize(event.getState(), event.getType(), event.getPath()); + watchers = watchManager.materialize(event.getState(), event.getType(), event.getPath()); } else { - watchers = new HashSet<Watcher>(); - watchers.addAll(materializedWatchers); + watchers = new HashSet<>(materializedWatchers); } WatcherSetEventPair pair = new WatcherSetEventPair(watchers, event); // queue the pair (watch set & event) for later processing @@ -1007,14 +999,12 @@ public class ClientCnxn { ConnectRequest conReq = new ConnectRequest(0, lastZxid, sessionTimeout, sessId, sessionPasswd); // We add backwards since we are pushing into the front // Only send if there's a pending watch - // TODO: here we have the only remaining use of zooKeeper in - // this class. It's to be eliminated! if (!clientConfig.getBoolean(ZKClientConfig.DISABLE_AUTO_WATCH_RESET)) { - List<String> dataWatches = zooKeeper.getDataWatches(); - List<String> existWatches = zooKeeper.getExistWatches(); - List<String> childWatches = zooKeeper.getChildWatches(); - List<String> persistentWatches = zooKeeper.getPersistentWatches(); - List<String> persistentRecursiveWatches = zooKeeper.getPersistentRecursiveWatches(); + List<String> dataWatches = watchManager.getDataWatchList(); + List<String> existWatches = watchManager.getExistWatchList(); + List<String> childWatches = watchManager.getChildWatchList(); + List<String> persistentWatches = watchManager.getPersistentWatchList(); + List<String> persistentRecursiveWatches = watchManager.getPersistentRecursiveWatchList(); if (!dataWatches.isEmpty() || !existWatches.isEmpty() || !childWatches.isEmpty() || !persistentWatches.isEmpty() || !persistentRecursiveWatches.isEmpty()) { Iterator<String> dataWatchesIter = prependChroot(dataWatches).iterator(); diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/WatchDeregistration.java b/zookeeper-server/src/main/java/org/apache/zookeeper/WatchDeregistration.java index 16c7f84..710f47b 100644 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/WatchDeregistration.java +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/WatchDeregistration.java @@ -22,7 +22,6 @@ import java.util.Map; import java.util.Set; import org.apache.zookeeper.Watcher.Event.EventType; import org.apache.zookeeper.Watcher.WatcherType; -import org.apache.zookeeper.ZooKeeper.ZKWatchManager; /** * Handles the special case of removing watches which has registered for a diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/ZKWatchManager.java b/zookeeper-server/src/main/java/org/apache/zookeeper/ZKWatchManager.java new file mode 100644 index 0000000..95a07f0 --- /dev/null +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/ZKWatchManager.java @@ -0,0 +1,455 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zookeeper; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import org.apache.zookeeper.server.watch.PathParentIterator; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Manage watchers and handle events generated by the {@link ClientCnxn} object. + * + * This class is intended to be packaged-private so that it doesn't serve + * as part of ZooKeeper client API. + */ +class ZKWatchManager implements ClientWatchManager { + + private static final Logger LOG = LoggerFactory.getLogger(ZKWatchManager.class); + + private final Map<String, Set<Watcher>> dataWatches = new HashMap<>(); + private final Map<String, Set<Watcher>> existWatches = new HashMap<>(); + private final Map<String, Set<Watcher>> childWatches = new HashMap<>(); + private final Map<String, Set<Watcher>> persistentWatches = new HashMap<>(); + private final Map<String, Set<Watcher>> persistentRecursiveWatches = new HashMap<>(); + private final boolean disableAutoWatchReset; + + private volatile Watcher defaultWatcher; + + ZKWatchManager(boolean disableAutoWatchReset, Watcher defaultWatcher) { + this.disableAutoWatchReset = disableAutoWatchReset; + this.defaultWatcher = defaultWatcher; + } + + void setDefaultWatcher(Watcher defaultWatcher) { + this.defaultWatcher = defaultWatcher; + } + + Watcher getDefaultWatcher() { + return defaultWatcher; + } + + List<String> getDataWatchList() { + synchronized (dataWatches) { + return new ArrayList<>(dataWatches.keySet()); + } + } + + List<String> getChildWatchList() { + synchronized (childWatches) { + return new ArrayList<>(childWatches.keySet()); + } + } + + List<String> getExistWatchList() { + synchronized (existWatches) { + return new ArrayList<>(existWatches.keySet()); + } + } + + List<String> getPersistentWatchList() { + synchronized (persistentWatches) { + return new ArrayList<>(persistentWatches.keySet()); + } + } + + List<String> getPersistentRecursiveWatchList() { + synchronized (persistentRecursiveWatches) { + return new ArrayList<>(persistentRecursiveWatches.keySet()); + } + } + + Map<String, Set<Watcher>> getDataWatches() { + return dataWatches; + } + + Map<String, Set<Watcher>> getExistWatches() { + return existWatches; + } + + Map<String, Set<Watcher>> getChildWatches() { + return childWatches; + } + + Map<String, Set<Watcher>> getPersistentWatches() { + return persistentWatches; + } + + Map<String, Set<Watcher>> getPersistentRecursiveWatches() { + return persistentRecursiveWatches; + } + + private void addTo(Set<Watcher> from, Set<Watcher> to) { + if (from != null) { + to.addAll(from); + } + } + + public Map<Watcher.Event.EventType, Set<Watcher>> removeWatcher( + String clientPath, + Watcher watcher, + Watcher.WatcherType watcherType, + boolean local, + int rc + ) throws KeeperException { + // Validate the provided znode path contains the given watcher of + // watcherType + containsWatcher(clientPath, watcher, watcherType); + + Map<Watcher.Event.EventType, Set<Watcher>> removedWatchers = new HashMap<>(); + HashSet<Watcher> childWatchersToRem = new HashSet<>(); + removedWatchers.put(Watcher.Event.EventType.ChildWatchRemoved, childWatchersToRem); + HashSet<Watcher> dataWatchersToRem = new HashSet<>(); + removedWatchers.put(Watcher.Event.EventType.DataWatchRemoved, dataWatchersToRem); + HashSet<Watcher> persistentWatchersToRem = new HashSet<>(); + removedWatchers.put(Watcher.Event.EventType.PersistentWatchRemoved, persistentWatchersToRem); + boolean removedWatcher = false; + switch (watcherType) { + case Children: { + synchronized (childWatches) { + removedWatcher = removeWatches(childWatches, watcher, clientPath, local, rc, childWatchersToRem); + } + break; + } + case Data: { + synchronized (dataWatches) { + removedWatcher = removeWatches(dataWatches, watcher, clientPath, local, rc, dataWatchersToRem); + } + + synchronized (existWatches) { + boolean removedDataWatcher = removeWatches(existWatches, watcher, clientPath, local, rc, dataWatchersToRem); + removedWatcher |= removedDataWatcher; + } + break; + } + case Any: { + synchronized (childWatches) { + removedWatcher = removeWatches(childWatches, watcher, clientPath, local, rc, childWatchersToRem); + } + + synchronized (dataWatches) { + boolean removedDataWatcher = removeWatches(dataWatches, watcher, clientPath, local, rc, dataWatchersToRem); + removedWatcher |= removedDataWatcher; + } + + synchronized (existWatches) { + boolean removedDataWatcher = removeWatches(existWatches, watcher, clientPath, local, rc, dataWatchersToRem); + removedWatcher |= removedDataWatcher; + } + + synchronized (persistentWatches) { + boolean removedPersistentWatcher = removeWatches(persistentWatches, + watcher, clientPath, local, rc, persistentWatchersToRem); + removedWatcher |= removedPersistentWatcher; + } + + synchronized (persistentRecursiveWatches) { + boolean removedPersistentRecursiveWatcher = removeWatches(persistentRecursiveWatches, + watcher, clientPath, local, rc, persistentWatchersToRem); + removedWatcher |= removedPersistentRecursiveWatcher; + } + } + } + // Watcher function doesn't exists for the specified params + if (!removedWatcher) { + throw new KeeperException.NoWatcherException(clientPath); + } + return removedWatchers; + } + + private boolean contains(String path, Watcher watcherObj, Map<String, Set<Watcher>> pathVsWatchers) { + boolean watcherExists = true; + if (pathVsWatchers == null || pathVsWatchers.size() == 0) { + watcherExists = false; + } else { + Set<Watcher> watchers = pathVsWatchers.get(path); + if (watchers == null) { + watcherExists = false; + } else if (watcherObj == null) { + watcherExists = watchers.size() > 0; + } else { + watcherExists = watchers.contains(watcherObj); + } + } + return watcherExists; + } + + /** + * Validate the provided znode path contains the given watcher and + * watcherType + * + * @param path + * - client path + * @param watcher + * - watcher object reference + * @param watcherType + * - type of the watcher + * @throws KeeperException.NoWatcherException + */ + void containsWatcher(String path, Watcher watcher, Watcher.WatcherType watcherType) throws + KeeperException.NoWatcherException { + boolean containsWatcher = false; + switch (watcherType) { + case Children: { + synchronized (childWatches) { + containsWatcher = contains(path, watcher, childWatches); + } + + synchronized (persistentWatches) { + boolean contains_temp = contains(path, watcher, + persistentWatches); + containsWatcher |= contains_temp; + } + + synchronized (persistentRecursiveWatches) { + boolean contains_temp = contains(path, watcher, + persistentRecursiveWatches); + containsWatcher |= contains_temp; + } + break; + } + case Data: { + synchronized (dataWatches) { + containsWatcher = contains(path, watcher, dataWatches); + } + + synchronized (existWatches) { + boolean contains_temp = contains(path, watcher, existWatches); + containsWatcher |= contains_temp; + } + + synchronized (persistentWatches) { + boolean contains_temp = contains(path, watcher, + persistentWatches); + containsWatcher |= contains_temp; + } + + synchronized (persistentRecursiveWatches) { + boolean contains_temp = contains(path, watcher, + persistentRecursiveWatches); + containsWatcher |= contains_temp; + } + break; + } + case Any: { + synchronized (childWatches) { + containsWatcher = contains(path, watcher, childWatches); + } + + synchronized (dataWatches) { + boolean contains_temp = contains(path, watcher, dataWatches); + containsWatcher |= contains_temp; + } + + synchronized (existWatches) { + boolean contains_temp = contains(path, watcher, existWatches); + containsWatcher |= contains_temp; + } + + synchronized (persistentWatches) { + boolean contains_temp = contains(path, watcher, + persistentWatches); + containsWatcher |= contains_temp; + } + + synchronized (persistentRecursiveWatches) { + boolean contains_temp = contains(path, watcher, + persistentRecursiveWatches); + containsWatcher |= contains_temp; + } + } + } + // Watcher function doesn't exists for the specified params + if (!containsWatcher) { + throw new KeeperException.NoWatcherException(path); + } + } + + protected boolean removeWatches( + Map<String, Set<Watcher>> pathVsWatcher, + Watcher watcher, + String path, + boolean local, + int rc, + Set<Watcher> removedWatchers) throws KeeperException { + if (!local && rc != KeeperException.Code.OK.intValue()) { + throw KeeperException.create(KeeperException.Code.get(rc), path); + } + boolean success = false; + // When local flag is true, remove watchers for the given path + // irrespective of rc. Otherwise shouldn't remove watchers locally + // when sees failure from server. + if (rc == KeeperException.Code.OK.intValue() || (local && rc != KeeperException.Code.OK.intValue())) { + // Remove all the watchers for the given path + if (watcher == null) { + Set<Watcher> pathWatchers = pathVsWatcher.remove(path); + if (pathWatchers != null) { + // found path watchers + removedWatchers.addAll(pathWatchers); + success = true; + } + } else { + Set<Watcher> watchers = pathVsWatcher.get(path); + if (watchers != null) { + if (watchers.remove(watcher)) { + // found path watcher + removedWatchers.add(watcher); + // cleanup <path vs watchlist> + if (watchers.size() <= 0) { + pathVsWatcher.remove(path); + } + success = true; + } + } + } + } + return success; + } + + /* (non-Javadoc) + * @see org.apache.zookeeper.ClientWatchManager#materialize(Event.KeeperState, + * Event.EventType, java.lang.String) + */ + @Override + public Set<Watcher> materialize( + Watcher.Event.KeeperState state, + Watcher.Event.EventType type, + String clientPath + ) { + final Set<Watcher> result = new HashSet<>(); + + switch (type) { + case None: + if (defaultWatcher != null) { + result.add(defaultWatcher); + } + + boolean clear = disableAutoWatchReset && state != Watcher.Event.KeeperState.SyncConnected; + synchronized (dataWatches) { + for (Set<Watcher> ws : dataWatches.values()) { + result.addAll(ws); + } + if (clear) { + dataWatches.clear(); + } + } + + synchronized (existWatches) { + for (Set<Watcher> ws : existWatches.values()) { + result.addAll(ws); + } + if (clear) { + existWatches.clear(); + } + } + + synchronized (childWatches) { + for (Set<Watcher> ws : childWatches.values()) { + result.addAll(ws); + } + if (clear) { + childWatches.clear(); + } + } + + synchronized (persistentWatches) { + for (Set<Watcher> ws: persistentWatches.values()) { + result.addAll(ws); + } + } + + synchronized (persistentRecursiveWatches) { + for (Set<Watcher> ws: persistentRecursiveWatches.values()) { + result.addAll(ws); + } + } + + return result; + case NodeDataChanged: + case NodeCreated: + synchronized (dataWatches) { + addTo(dataWatches.remove(clientPath), result); + } + synchronized (existWatches) { + addTo(existWatches.remove(clientPath), result); + } + addPersistentWatches(clientPath, result); + break; + case NodeChildrenChanged: + synchronized (childWatches) { + addTo(childWatches.remove(clientPath), result); + } + addPersistentWatches(clientPath, result); + break; + case NodeDeleted: + synchronized (dataWatches) { + addTo(dataWatches.remove(clientPath), result); + } + // TODO This shouldn't be needed, but just in case + synchronized (existWatches) { + Set<Watcher> list = existWatches.remove(clientPath); + if (list != null) { + addTo(list, result); + LOG.warn("We are triggering an exists watch for delete! Shouldn't happen!"); + } + } + synchronized (childWatches) { + addTo(childWatches.remove(clientPath), result); + } + addPersistentWatches(clientPath, result); + break; + default: + String errorMsg = String.format( + "Unhandled watch event type %s with state %s on path %s", + type, + state, + clientPath); + LOG.error(errorMsg); + throw new RuntimeException(errorMsg); + } + + return result; + } + + private void addPersistentWatches(String clientPath, Set<Watcher> result) { + synchronized (persistentWatches) { + addTo(persistentWatches.get(clientPath), result); + } + synchronized (persistentRecursiveWatches) { + for (String path : PathParentIterator.forAll(clientPath).asIterable()) { + addTo(persistentRecursiveWatches.get(path), result); + } + } + } +} diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/ZooKeeper.java b/zookeeper-server/src/main/java/org/apache/zookeeper/ZooKeeper.java index 0210493..201cf20 100644 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/ZooKeeper.java +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/ZooKeeper.java @@ -24,7 +24,6 @@ import java.net.InetSocketAddress; import java.net.SocketAddress; import java.util.ArrayList; import java.util.Collection; -import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; @@ -40,10 +39,7 @@ import org.apache.zookeeper.AsyncCallback.MultiCallback; import org.apache.zookeeper.AsyncCallback.StatCallback; import org.apache.zookeeper.AsyncCallback.StringCallback; import org.apache.zookeeper.AsyncCallback.VoidCallback; -import org.apache.zookeeper.KeeperException.Code; -import org.apache.zookeeper.KeeperException.NoWatcherException; import org.apache.zookeeper.OpResult.ErrorResult; -import org.apache.zookeeper.Watcher.Event.EventType; import org.apache.zookeeper.Watcher.WatcherType; import org.apache.zookeeper.client.ConnectStringParser; import org.apache.zookeeper.client.HostProvider; @@ -85,7 +81,6 @@ import org.apache.zookeeper.proto.SyncRequest; import org.apache.zookeeper.proto.SyncResponse; import org.apache.zookeeper.server.DataTree; import org.apache.zookeeper.server.EphemeralType; -import org.apache.zookeeper.server.watch.PathParentIterator; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -231,8 +226,6 @@ public class ZooKeeper implements AutoCloseable { return cnxn.zooKeeperSaslClient; } - protected final ZKWatchManager watchManager; - private final ZKClientConfig clientConfig; public ZKClientConfig getClientConfig() { @@ -240,407 +233,37 @@ public class ZooKeeper implements AutoCloseable { } protected List<String> getDataWatches() { - synchronized (watchManager.dataWatches) { - List<String> rc = new ArrayList<String>(watchManager.dataWatches.keySet()); - return rc; - } + return getWatchManager().getDataWatchList(); } + protected List<String> getExistWatches() { - synchronized (watchManager.existWatches) { - List<String> rc = new ArrayList<String>(watchManager.existWatches.keySet()); - return rc; - } + return getWatchManager().getExistWatchList(); } + protected List<String> getChildWatches() { - synchronized (watchManager.childWatches) { - List<String> rc = new ArrayList<String>(watchManager.childWatches.keySet()); - return rc; - } + return getWatchManager().getChildWatchList(); } + protected List<String> getPersistentWatches() { - synchronized (watchManager.persistentWatches) { - List<String> rc = new ArrayList<String>(watchManager.persistentWatches.keySet()); - return rc; - } + return getWatchManager().getPersistentWatchList(); } + protected List<String> getPersistentRecursiveWatches() { - synchronized (watchManager.persistentRecursiveWatches) { - List<String> rc = new ArrayList<String>(watchManager.persistentRecursiveWatches.keySet()); - return rc; - } + return getWatchManager().getPersistentRecursiveWatchList(); } - /** - * Manage watchers and handle events generated by the ClientCnxn object. - * - * We are implementing this as a nested class of ZooKeeper so that - * the public methods will not be exposed as part of the ZooKeeper client - * API. - */ - static class ZKWatchManager implements ClientWatchManager { - - private final Map<String, Set<Watcher>> dataWatches = new HashMap<String, Set<Watcher>>(); - private final Map<String, Set<Watcher>> existWatches = new HashMap<String, Set<Watcher>>(); - private final Map<String, Set<Watcher>> childWatches = new HashMap<String, Set<Watcher>>(); - private final Map<String, Set<Watcher>> persistentWatches = new HashMap<String, Set<Watcher>>(); - private final Map<String, Set<Watcher>> persistentRecursiveWatches = new HashMap<String, Set<Watcher>>(); - private boolean disableAutoWatchReset; - - ZKWatchManager(boolean disableAutoWatchReset) { - this.disableAutoWatchReset = disableAutoWatchReset; - } - - protected volatile Watcher defaultWatcher; - - private void addTo(Set<Watcher> from, Set<Watcher> to) { - if (from != null) { - to.addAll(from); - } - } - - public Map<EventType, Set<Watcher>> removeWatcher( - String clientPath, - Watcher watcher, - WatcherType watcherType, - boolean local, - int rc) throws KeeperException { - // Validate the provided znode path contains the given watcher of - // watcherType - containsWatcher(clientPath, watcher, watcherType); - - Map<EventType, Set<Watcher>> removedWatchers = new HashMap<>(); - HashSet<Watcher> childWatchersToRem = new HashSet<>(); - removedWatchers.put(EventType.ChildWatchRemoved, childWatchersToRem); - HashSet<Watcher> dataWatchersToRem = new HashSet<>(); - removedWatchers.put(EventType.DataWatchRemoved, dataWatchersToRem); - HashSet<Watcher> persistentWatchersToRem = new HashSet<>(); - removedWatchers.put(EventType.PersistentWatchRemoved, persistentWatchersToRem); - boolean removedWatcher = false; - switch (watcherType) { - case Children: { - synchronized (childWatches) { - removedWatcher = removeWatches(childWatches, watcher, clientPath, local, rc, childWatchersToRem); - } - break; - } - case Data: { - synchronized (dataWatches) { - removedWatcher = removeWatches(dataWatches, watcher, clientPath, local, rc, dataWatchersToRem); - } - - synchronized (existWatches) { - boolean removedDataWatcher = removeWatches(existWatches, watcher, clientPath, local, rc, dataWatchersToRem); - removedWatcher |= removedDataWatcher; - } - break; - } - case Any: { - synchronized (childWatches) { - removedWatcher = removeWatches(childWatches, watcher, clientPath, local, rc, childWatchersToRem); - } - - synchronized (dataWatches) { - boolean removedDataWatcher = removeWatches(dataWatches, watcher, clientPath, local, rc, dataWatchersToRem); - removedWatcher |= removedDataWatcher; - } - - synchronized (existWatches) { - boolean removedDataWatcher = removeWatches(existWatches, watcher, clientPath, local, rc, dataWatchersToRem); - removedWatcher |= removedDataWatcher; - } - - synchronized (persistentWatches) { - boolean removedPersistentWatcher = removeWatches(persistentWatches, - watcher, clientPath, local, rc, persistentWatchersToRem); - removedWatcher |= removedPersistentWatcher; - } - - synchronized (persistentRecursiveWatches) { - boolean removedPersistentRecursiveWatcher = removeWatches(persistentRecursiveWatches, - watcher, clientPath, local, rc, persistentWatchersToRem); - removedWatcher |= removedPersistentRecursiveWatcher; - } - } - } - // Watcher function doesn't exists for the specified params - if (!removedWatcher) { - throw new KeeperException.NoWatcherException(clientPath); - } - return removedWatchers; - } - - private boolean contains(String path, Watcher watcherObj, Map<String, Set<Watcher>> pathVsWatchers) { - boolean watcherExists = true; - if (pathVsWatchers == null || pathVsWatchers.size() == 0) { - watcherExists = false; - } else { - Set<Watcher> watchers = pathVsWatchers.get(path); - if (watchers == null) { - watcherExists = false; - } else if (watcherObj == null) { - watcherExists = watchers.size() > 0; - } else { - watcherExists = watchers.contains(watcherObj); - } - } - return watcherExists; - } - - /** - * Validate the provided znode path contains the given watcher and - * watcherType - * - * @param path - * - client path - * @param watcher - * - watcher object reference - * @param watcherType - * - type of the watcher - * @throws NoWatcherException - */ - void containsWatcher(String path, Watcher watcher, WatcherType watcherType) throws NoWatcherException { - boolean containsWatcher = false; - switch (watcherType) { - case Children: { - synchronized (childWatches) { - containsWatcher = contains(path, watcher, childWatches); - } - - synchronized (persistentWatches) { - boolean contains_temp = contains(path, watcher, - persistentWatches); - containsWatcher |= contains_temp; - } - - synchronized (persistentRecursiveWatches) { - boolean contains_temp = contains(path, watcher, - persistentRecursiveWatches); - containsWatcher |= contains_temp; - } - break; - } - case Data: { - synchronized (dataWatches) { - containsWatcher = contains(path, watcher, dataWatches); - } - - synchronized (existWatches) { - boolean contains_temp = contains(path, watcher, existWatches); - containsWatcher |= contains_temp; - } - - synchronized (persistentWatches) { - boolean contains_temp = contains(path, watcher, - persistentWatches); - containsWatcher |= contains_temp; - } - - synchronized (persistentRecursiveWatches) { - boolean contains_temp = contains(path, watcher, - persistentRecursiveWatches); - containsWatcher |= contains_temp; - } - break; - } - case Any: { - synchronized (childWatches) { - containsWatcher = contains(path, watcher, childWatches); - } - - synchronized (dataWatches) { - boolean contains_temp = contains(path, watcher, dataWatches); - containsWatcher |= contains_temp; - } - - synchronized (existWatches) { - boolean contains_temp = contains(path, watcher, existWatches); - containsWatcher |= contains_temp; - } - - synchronized (persistentWatches) { - boolean contains_temp = contains(path, watcher, - persistentWatches); - containsWatcher |= contains_temp; - } - - synchronized (persistentRecursiveWatches) { - boolean contains_temp = contains(path, watcher, - persistentRecursiveWatches); - containsWatcher |= contains_temp; - } - } - } - // Watcher function doesn't exists for the specified params - if (!containsWatcher) { - throw new KeeperException.NoWatcherException(path); - } - } - - protected boolean removeWatches( - Map<String, Set<Watcher>> pathVsWatcher, - Watcher watcher, - String path, - boolean local, - int rc, - Set<Watcher> removedWatchers) throws KeeperException { - if (!local && rc != Code.OK.intValue()) { - throw KeeperException.create(KeeperException.Code.get(rc), path); - } - boolean success = false; - // When local flag is true, remove watchers for the given path - // irrespective of rc. Otherwise shouldn't remove watchers locally - // when sees failure from server. - if (rc == Code.OK.intValue() || (local && rc != Code.OK.intValue())) { - // Remove all the watchers for the given path - if (watcher == null) { - Set<Watcher> pathWatchers = pathVsWatcher.remove(path); - if (pathWatchers != null) { - // found path watchers - removedWatchers.addAll(pathWatchers); - success = true; - } - } else { - Set<Watcher> watchers = pathVsWatcher.get(path); - if (watchers != null) { - if (watchers.remove(watcher)) { - // found path watcher - removedWatchers.add(watcher); - // cleanup <path vs watchlist> - if (watchers.size() <= 0) { - pathVsWatcher.remove(path); - } - success = true; - } - } - } - } - return success; - } - - /* (non-Javadoc) - * @see org.apache.zookeeper.ClientWatchManager#materialize(Event.KeeperState, - * Event.EventType, java.lang.String) - */ - @Override - public Set<Watcher> materialize( - Watcher.Event.KeeperState state, - Watcher.Event.EventType type, - String clientPath - ) { - final Set<Watcher> result = new HashSet<>(); - - switch (type) { - case None: - if (defaultWatcher != null) { - result.add(defaultWatcher); - } - - boolean clear = disableAutoWatchReset && state != Watcher.Event.KeeperState.SyncConnected; - synchronized (dataWatches) { - for (Set<Watcher> ws : dataWatches.values()) { - result.addAll(ws); - } - if (clear) { - dataWatches.clear(); - } - } - - synchronized (existWatches) { - for (Set<Watcher> ws : existWatches.values()) { - result.addAll(ws); - } - if (clear) { - existWatches.clear(); - } - } - - synchronized (childWatches) { - for (Set<Watcher> ws : childWatches.values()) { - result.addAll(ws); - } - if (clear) { - childWatches.clear(); - } - } - - synchronized (persistentWatches) { - for (Set<Watcher> ws: persistentWatches.values()) { - result.addAll(ws); - } - } - - synchronized (persistentRecursiveWatches) { - for (Set<Watcher> ws: persistentRecursiveWatches.values()) { - result.addAll(ws); - } - } - - return result; - case NodeDataChanged: - case NodeCreated: - synchronized (dataWatches) { - addTo(dataWatches.remove(clientPath), result); - } - synchronized (existWatches) { - addTo(existWatches.remove(clientPath), result); - } - addPersistentWatches(clientPath, result); - break; - case NodeChildrenChanged: - synchronized (childWatches) { - addTo(childWatches.remove(clientPath), result); - } - addPersistentWatches(clientPath, result); - break; - case NodeDeleted: - synchronized (dataWatches) { - addTo(dataWatches.remove(clientPath), result); - } - // TODO This shouldn't be needed, but just in case - synchronized (existWatches) { - Set<Watcher> list = existWatches.remove(clientPath); - if (list != null) { - addTo(list, result); - LOG.warn("We are triggering an exists watch for delete! Shouldn't happen!"); - } - } - synchronized (childWatches) { - addTo(childWatches.remove(clientPath), result); - } - addPersistentWatches(clientPath, result); - break; - default: - String errorMsg = String.format( - "Unhandled watch event type %s with state %s on path %s", - type, - state, - clientPath); - LOG.error(errorMsg); - throw new RuntimeException(errorMsg); - } - - return result; - } - - private void addPersistentWatches(String clientPath, Set<Watcher> result) { - synchronized (persistentWatches) { - addTo(persistentWatches.get(clientPath), result); - } - synchronized (persistentRecursiveWatches) { - for (String path : PathParentIterator.forAll(clientPath).asIterable()) { - addTo(persistentRecursiveWatches.get(path), result); - } - } - } + ZKWatchManager getWatchManager() { + return cnxn.getWatcherManager(); } /** * Register a watcher for a particular path. */ - public abstract class WatchRegistration { + public abstract static class WatchRegistration { private Watcher watcher; private String clientPath; + public WatchRegistration(Watcher watcher, String clientPath) { this.watcher = watcher; this.clientPath = clientPath; @@ -689,7 +312,7 @@ public class ZooKeeper implements AutoCloseable { @Override protected Map<String, Set<Watcher>> getWatches(int rc) { - return rc == 0 ? watchManager.dataWatches : watchManager.existWatches; + return rc == 0 ? getWatchManager().getDataWatches() : getWatchManager().getExistWatches(); } @Override @@ -707,7 +330,7 @@ public class ZooKeeper implements AutoCloseable { @Override protected Map<String, Set<Watcher>> getWatches(int rc) { - return watchManager.dataWatches; + return getWatchManager().getDataWatches(); } } @@ -720,7 +343,7 @@ public class ZooKeeper implements AutoCloseable { @Override protected Map<String, Set<Watcher>> getWatches(int rc) { - return watchManager.childWatches; + return getWatchManager().getChildWatches(); } } @@ -737,9 +360,9 @@ public class ZooKeeper implements AutoCloseable { protected Map<String, Set<Watcher>> getWatches(int rc) { switch (mode) { case PERSISTENT: - return watchManager.persistentWatches; + return getWatchManager().getPersistentWatches(); case PERSISTENT_RECURSIVE: - return watchManager.persistentRecursiveWatches; + return getWatchManager().getPersistentRecursiveWatches(); } throw new IllegalArgumentException("Mode not supported: " + mode); } @@ -989,7 +612,7 @@ public class ZooKeeper implements AutoCloseable { * connects to one in read-only mode, i.e. read requests are * allowed while write requests are not. It continues seeking for * majority in the background. - * @param aHostProvider + * @param hostProvider * use this as HostProvider to enable custom behaviour. * @param clientConfig * (added in 3.5.2) passing this conf object gives each client the flexibility of @@ -1004,49 +627,45 @@ public class ZooKeeper implements AutoCloseable { int sessionTimeout, Watcher watcher, boolean canBeReadOnly, - HostProvider aHostProvider, - ZKClientConfig clientConfig) throws IOException { + HostProvider hostProvider, + ZKClientConfig clientConfig + ) throws IOException { LOG.info( "Initiating client connection, connectString={} sessionTimeout={} watcher={}", connectString, sessionTimeout, watcher); - if (clientConfig == null) { - clientConfig = new ZKClientConfig(); - } - this.clientConfig = clientConfig; - watchManager = defaultWatchManager(); - watchManager.defaultWatcher = watcher; + this.clientConfig = clientConfig != null ? clientConfig : new ZKClientConfig(); + this.hostProvider = hostProvider; ConnectStringParser connectStringParser = new ConnectStringParser(connectString); - hostProvider = aHostProvider; cnxn = createConnection( connectStringParser.getChrootPath(), hostProvider, sessionTimeout, - this, - watchManager, + this.clientConfig, + watcher, getClientCnxnSocket(), canBeReadOnly); cnxn.start(); } - // @VisibleForTesting - protected ClientCnxn createConnection( + ClientCnxn createConnection( String chrootPath, HostProvider hostProvider, int sessionTimeout, - ZooKeeper zooKeeper, - ClientWatchManager watcher, + ZKClientConfig clientConfig, + Watcher defaultWatcher, ClientCnxnSocket clientCnxnSocket, - boolean canBeReadOnly) throws IOException { + boolean canBeReadOnly + ) throws IOException { return new ClientCnxn( chrootPath, hostProvider, sessionTimeout, - this, - watchManager, + clientConfig, + defaultWatcher, clientCnxnSocket, canBeReadOnly); } @@ -1383,7 +1002,7 @@ public class ZooKeeper implements AutoCloseable { * connects to one in read-only mode, i.e. read requests are * allowed while write requests are not. It continues seeking for * majority in the background. - * @param aHostProvider + * @param hostProvider * use this as HostProvider to enable custom behaviour. * @param clientConfig * (added in 3.5.2) passing this conf object gives each client the flexibility of @@ -1400,7 +1019,7 @@ public class ZooKeeper implements AutoCloseable { long sessionId, byte[] sessionPasswd, boolean canBeReadOnly, - HostProvider aHostProvider, + HostProvider hostProvider, ZKClientConfig clientConfig) throws IOException { LOG.info( "Initiating client connection, connectString={} " @@ -1411,22 +1030,16 @@ public class ZooKeeper implements AutoCloseable { Long.toHexString(sessionId), (sessionPasswd == null ? "<null>" : "<hidden>")); - if (clientConfig == null) { - clientConfig = new ZKClientConfig(); - } - this.clientConfig = clientConfig; - watchManager = defaultWatchManager(); - watchManager.defaultWatcher = watcher; - + this.clientConfig = clientConfig != null ? clientConfig : new ZKClientConfig(); ConnectStringParser connectStringParser = new ConnectStringParser(connectString); - hostProvider = aHostProvider; + this.hostProvider = hostProvider; cnxn = new ClientCnxn( connectStringParser.getChrootPath(), hostProvider, sessionTimeout, - this, - watchManager, + this.clientConfig, + watcher, getClientCnxnSocket(), sessionId, sessionPasswd, @@ -1523,11 +1136,6 @@ public class ZooKeeper implements AutoCloseable { return new ZooKeeperTestable(cnxn); } - /* Useful for testing watch handling behavior */ - protected ZKWatchManager defaultWatchManager() { - return new ZKWatchManager(getClientConfig().getBoolean(ZKClientConfig.DISABLE_AUTO_WATCH_RESET)); - } - /** * The session id for this ZooKeeper client instance. The value returned is * not valid until the client connects to a server and may change after a @@ -1582,11 +1190,9 @@ public class ZooKeeper implements AutoCloseable { /** * Specify the default watcher for the connection (overrides the one * specified during construction). - * - * @param watcher */ public synchronized void register(Watcher watcher) { - watchManager.defaultWatcher = watcher; + getWatchManager().setDefaultWatcher(watcher); } /** @@ -3213,9 +2819,11 @@ public class ZooKeeper implements AutoCloseable { * error code. * @since 3.6.0 */ - public void addWatch(String basePath, AddWatchMode mode) - throws KeeperException, InterruptedException { - addWatch(basePath, watchManager.defaultWatcher, mode); + public void addWatch( + String basePath, + AddWatchMode mode + ) throws KeeperException, InterruptedException { + addWatch(basePath, getWatchManager().getDefaultWatcher(), mode); } /** @@ -3229,8 +2837,12 @@ public class ZooKeeper implements AutoCloseable { * @throws IllegalArgumentException if an invalid path is specified * @since 3.6.0 */ - public void addWatch(String basePath, Watcher watcher, AddWatchMode mode, - VoidCallback cb, Object ctx) { + public void addWatch( + String basePath, + Watcher watcher, AddWatchMode mode, + VoidCallback cb, + Object ctx + ) { PathUtils.validatePath(basePath); String serverPath = prependChroot(basePath); @@ -3251,9 +2863,8 @@ public class ZooKeeper implements AutoCloseable { * @throws IllegalArgumentException if an invalid path is specified * @since 3.6.0 */ - public void addWatch(String basePath, AddWatchMode mode, - VoidCallback cb, Object ctx) { - addWatch(basePath, watchManager.defaultWatcher, mode, cb, ctx); + public void addWatch(String basePath, AddWatchMode mode, VoidCallback cb, Object ctx) { + addWatch(basePath, getWatchManager().getDefaultWatcher(), mode, cb, ctx); } private void validateWatcher(Watcher watcher) { @@ -3271,7 +2882,7 @@ public class ZooKeeper implements AutoCloseable { PathUtils.validatePath(path); final String clientPath = path; final String serverPath = prependChroot(clientPath); - WatchDeregistration wcb = new WatchDeregistration(clientPath, watcher, watcherType, local, watchManager); + WatchDeregistration wcb = new WatchDeregistration(clientPath, watcher, watcherType, local, getWatchManager()); RequestHeader h = new RequestHeader(); h.setType(opCode); @@ -3294,7 +2905,7 @@ public class ZooKeeper implements AutoCloseable { PathUtils.validatePath(path); final String clientPath = path; final String serverPath = prependChroot(clientPath); - WatchDeregistration wcb = new WatchDeregistration(clientPath, watcher, watcherType, local, watchManager); + WatchDeregistration wcb = new WatchDeregistration(clientPath, watcher, watcherType, local, getWatchManager()); RequestHeader h = new RequestHeader(); h.setType(opCode); @@ -3424,8 +3035,9 @@ public class ZooKeeper implements AutoCloseable { */ private Watcher getDefaultWatcher(boolean required) { if (required) { - if (watchManager.defaultWatcher != null) { - return watchManager.defaultWatcher; + final Watcher defaultWatcher = getWatchManager().getDefaultWatcher(); + if (defaultWatcher != null) { + return defaultWatcher; } else { throw new IllegalStateException("Default watcher is required, but it is null."); } diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/ClientCnxnSocketFragilityTest.java b/zookeeper-server/src/test/java/org/apache/zookeeper/ClientCnxnSocketFragilityTest.java index 2c7f6bd..5ed43fc 100644 --- a/zookeeper-server/src/test/java/org/apache/zookeeper/ClientCnxnSocketFragilityTest.java +++ b/zookeeper-server/src/test/java/org/apache/zookeeper/ClientCnxnSocketFragilityTest.java @@ -281,11 +281,19 @@ public class ClientCnxnSocketFragilityTest extends QuorumPeerTestBase { String chrootPath, HostProvider hostProvider, int sessionTimeout, - ZooKeeper zooKeeper, - ClientWatchManager watcher, + ZKClientConfig zkClientConfig, + Watcher defaultWatcher, ClientCnxnSocket clientCnxnSocket, - boolean canBeReadOnly) throws IOException { - super(chrootPath, hostProvider, sessionTimeout, zooKeeper, watcher, clientCnxnSocket, canBeReadOnly); + boolean canBeReadOnly + ) throws IOException { + super( + chrootPath, + hostProvider, + sessionTimeout, + zkClientConfig, + defaultWatcher, + clientCnxnSocket, + canBeReadOnly); } void attemptClose() { @@ -344,18 +352,25 @@ public class ClientCnxnSocketFragilityTest extends QuorumPeerTestBase { return cnxn.getState().isAlive(); } - @Override - protected ClientCnxn createConnection( + ClientCnxn createConnection( String chrootPath, HostProvider hostProvider, int sessionTimeout, - ZooKeeper zooKeeper, - ClientWatchManager watcher, + ZKClientConfig clientConfig, + Watcher defaultWatcher, ClientCnxnSocket clientCnxnSocket, - boolean canBeReadOnly) throws IOException { + boolean canBeReadOnly + ) throws IOException { Assert.assertTrue(clientCnxnSocket instanceof FragileClientCnxnSocketNIO); socket = (FragileClientCnxnSocketNIO) clientCnxnSocket; - ClientCnxnSocketFragilityTest.this.cnxn = new CustomClientCnxn(chrootPath, hostProvider, sessionTimeout, zooKeeper, watcher, clientCnxnSocket, canBeReadOnly); + ClientCnxnSocketFragilityTest.this.cnxn = new CustomClientCnxn( + chrootPath, + hostProvider, + sessionTimeout, + clientConfig, + defaultWatcher, + clientCnxnSocket, + canBeReadOnly); return ClientCnxnSocketFragilityTest.this.cnxn; } } diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/ClientReconnectTest.java b/zookeeper-server/src/test/java/org/apache/zookeeper/ClientReconnectTest.java index e77283c..41f52ea 100644 --- a/zookeeper-server/src/test/java/org/apache/zookeeper/ClientReconnectTest.java +++ b/zookeeper-server/src/test/java/org/apache/zookeeper/ClientReconnectTest.java @@ -66,8 +66,14 @@ public class ClientReconnectTest extends ZKTestCase { sc = SocketChannel.open(); ClientCnxnSocketNIO nioCnxn = new MockCnxn(); - ClientWatchManager watcher = mock(ClientWatchManager.class); - ClientCnxn clientCnxn = new ClientCnxn("tmp", hostProvider, 5000, zk, watcher, nioCnxn, false); + ClientCnxn clientCnxn = new ClientCnxn( + "tmp", + hostProvider, + 5000, + zk.getClientConfig(), + DummyWatcher.INSTANCE, + nioCnxn, + false); clientCnxn.start(); countDownLatch.await(5000, TimeUnit.MILLISECONDS); assertTrue(countDownLatch.getCount() == 0); diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/ClientRequestTimeoutTest.java b/zookeeper-server/src/test/java/org/apache/zookeeper/ClientRequestTimeoutTest.java index 7c3bf51..d4f95ae 100644 --- a/zookeeper-server/src/test/java/org/apache/zookeeper/ClientRequestTimeoutTest.java +++ b/zookeeper-server/src/test/java/org/apache/zookeeper/ClientRequestTimeoutTest.java @@ -25,6 +25,7 @@ import static org.junit.Assert.fail; import java.io.IOException; import org.apache.zookeeper.ZooDefs.Ids; import org.apache.zookeeper.client.HostProvider; +import org.apache.zookeeper.client.ZKClientConfig; import org.apache.zookeeper.server.quorum.QuorumPeerTestBase; import org.apache.zookeeper.test.ClientBase; import org.apache.zookeeper.test.ClientBase.CountdownWatcher; @@ -110,15 +111,23 @@ public class ClientRequestTimeoutTest extends QuorumPeerTestBase { class CustomClientCnxn extends ClientCnxn { - public CustomClientCnxn( + CustomClientCnxn( String chrootPath, HostProvider hostProvider, int sessionTimeout, - ZooKeeper zooKeeper, - ClientWatchManager watcher, + ZKClientConfig clientConfig, + Watcher defaultWatcher, ClientCnxnSocket clientCnxnSocket, - boolean canBeReadOnly) throws IOException { - super(chrootPath, hostProvider, sessionTimeout, zooKeeper, watcher, clientCnxnSocket, canBeReadOnly); + boolean canBeReadOnly + ) throws IOException { + super( + chrootPath, + hostProvider, + sessionTimeout, + clientConfig, + defaultWatcher, + clientCnxnSocket, + canBeReadOnly); } @Override @@ -140,15 +149,23 @@ public class ClientRequestTimeoutTest extends QuorumPeerTestBase { } @Override - protected ClientCnxn createConnection( + ClientCnxn createConnection( String chrootPath, HostProvider hostProvider, int sessionTimeout, - ZooKeeper zooKeeper, - ClientWatchManager watcher, + ZKClientConfig clientConfig, + Watcher defaultWatcher, ClientCnxnSocket clientCnxnSocket, - boolean canBeReadOnly) throws IOException { - return new CustomClientCnxn(chrootPath, hostProvider, sessionTimeout, zooKeeper, watcher, clientCnxnSocket, canBeReadOnly); + boolean canBeReadOnly + ) throws IOException { + return new CustomClientCnxn( + chrootPath, + hostProvider, + sessionTimeout, + clientConfig, + defaultWatcher, + clientCnxnSocket, + canBeReadOnly); } } diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/RemoveWatchesTest.java b/zookeeper-server/src/test/java/org/apache/zookeeper/RemoveWatchesTest.java index 30f1558..8bb91e3 100644 --- a/zookeeper-server/src/test/java/org/apache/zookeeper/RemoveWatchesTest.java +++ b/zookeeper-server/src/test/java/org/apache/zookeeper/RemoveWatchesTest.java @@ -18,12 +18,16 @@ package org.apache.zookeeper; +import static org.hamcrest.CoreMatchers.is; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertThat; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; +import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.spy; import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; @@ -37,11 +41,9 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import org.apache.commons.collections.CollectionUtils; import org.apache.zookeeper.KeeperException.Code; -import org.apache.zookeeper.KeeperException.NoWatcherException; import org.apache.zookeeper.Watcher.Event.EventType; import org.apache.zookeeper.Watcher.WatcherType; import org.apache.zookeeper.ZooDefs.Ids; -import org.apache.zookeeper.client.ZKClientConfig; import org.apache.zookeeper.server.ServerCnxn; import org.apache.zookeeper.test.ClientBase; import org.junit.Test; @@ -657,7 +659,9 @@ public class RemoveWatchesTest extends ClientBase { @Test(timeout = 90000) public void testNoWatcherServerException() throws InterruptedException, IOException, TimeoutException { CountdownWatcher watcher = new CountdownWatcher(); - MyZooKeeper zk = new MyZooKeeper(hostPort, CONNECTION_TIMEOUT, watcher); + ZooKeeper zk = spy(new ZooKeeper(hostPort, CONNECTION_TIMEOUT, watcher)); + MyWatchManager watchManager = new MyWatchManager(false, watcher); + doReturn(watchManager).when(zk).getWatchManager(); boolean nw = false; watcher.waitForConnected(CONNECTION_TIMEOUT); @@ -670,8 +674,8 @@ public class RemoveWatchesTest extends ClientBase { } } - assertTrue("Server didn't return NOWATCHER", zk.getRemoveWatchesRC() == Code.NOWATCHER.intValue()); - assertTrue("NoWatcherException didn't happen", nw); + assertThat("Server didn't return NOWATCHER", watchManager.lastReturnCode, is(Code.NOWATCHER.intValue())); + assertThat("NoWatcherException didn't happen", nw, is(true)); } /** @@ -920,54 +924,32 @@ public class RemoveWatchesTest extends ClientBase { assertEquals("Received watch notification after removal!", 2, watchCount.getCount()); } - /* a mocked ZK class that doesn't do client-side verification - * before/after calling removeWatches */ - private class MyZooKeeper extends ZooKeeper { + private static class MyWatchManager extends ZKWatchManager { - class MyWatchManager extends ZKWatchManager { + int lastReturnCode; - public MyWatchManager(boolean disableAutoWatchReset) { - super(disableAutoWatchReset); - } - - public int lastrc; - - /* Pretend that any watcher exists */ - void containsWatcher(String path, Watcher watcher, WatcherType watcherType) throws NoWatcherException { - } - - /* save the return error code by the server */ - protected boolean removeWatches( - Map<String, Set<Watcher>> pathVsWatcher, - Watcher watcher, - String path, - boolean local, - int rc, - Set<Watcher> removedWatchers) throws KeeperException { - lastrc = rc; - return false; - } - - } - - public MyZooKeeper(String hp, int timeout, Watcher watcher) throws IOException { - super(hp, timeout, watcher, false); + MyWatchManager(boolean disableAutoWatchReset, Watcher defaultWatcher) { + super(disableAutoWatchReset, defaultWatcher); } - private MyWatchManager myWatchManager; - - protected ZKWatchManager defaultWatchManager() { - myWatchManager = new MyWatchManager(getClientConfig().getBoolean(ZKClientConfig.DISABLE_AUTO_WATCH_RESET)); - return myWatchManager; + void containsWatcher(String path, Watcher watcher, WatcherType watcherType) { + // prevent contains watcher } - public int getRemoveWatchesRC() { - return myWatchManager.lastrc; + @Override + protected boolean removeWatches( + Map<String, Set<Watcher>> pathVsWatcher, + Watcher watcher, + String path, + boolean local, + int rc, + Set<Watcher> removedWatchers) { + lastReturnCode = rc; + return false; } - } - private class MyWatcher implements Watcher { + private static class MyWatcher implements Watcher { private final String path; private String eventPath; diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/TestableZooKeeper.java b/zookeeper-server/src/test/java/org/apache/zookeeper/TestableZooKeeper.java index a02127a..7f9e41e 100644 --- a/zookeeper-server/src/test/java/org/apache/zookeeper/TestableZooKeeper.java +++ b/zookeeper-server/src/test/java/org/apache/zookeeper/TestableZooKeeper.java @@ -20,12 +20,10 @@ package org.apache.zookeeper; import java.io.IOException; import java.net.SocketAddress; -import java.util.List; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import org.apache.jute.Record; import org.apache.zookeeper.admin.ZooKeeperAdmin; -import org.apache.zookeeper.client.HostProvider; import org.apache.zookeeper.proto.ReplyHeader; import org.apache.zookeeper.proto.RequestHeader; @@ -35,76 +33,12 @@ public class TestableZooKeeper extends ZooKeeperAdmin { super(host, sessionTimeout, watcher); } - class TestableClientCnxn extends ClientCnxn { - - TestableClientCnxn( - String chrootPath, - HostProvider hostProvider, - int sessionTimeout, - ZooKeeper zooKeeper, - ClientWatchManager watcher, - ClientCnxnSocket clientCnxnSocket, - boolean canBeReadOnly) throws IOException { - super(chrootPath, - hostProvider, - sessionTimeout, - zooKeeper, - watcher, - clientCnxnSocket, - 0, - new byte[16], - canBeReadOnly); - } - - void setXid(int newXid) { - xid = newXid; - } - - int checkXid() { - return xid; - } - - } - - protected ClientCnxn createConnection( - String chrootPath, - HostProvider hostProvider, - int sessionTimeout, - ZooKeeper zooKeeper, - ClientWatchManager watcher, - ClientCnxnSocket clientCnxnSocket, - boolean canBeReadOnly) throws IOException { - return new TestableClientCnxn( - chrootPath, - hostProvider, - sessionTimeout, - this, - watcher, - clientCnxnSocket, - canBeReadOnly); - } - public void setXid(int xid) { - ((TestableClientCnxn) cnxn).setXid(xid); + cnxn.xid = xid; } public int checkXid() { - return ((TestableClientCnxn) cnxn).checkXid(); - } - - @Override - public List<String> getChildWatches() { - return super.getChildWatches(); - } - - @Override - public List<String> getDataWatches() { - return super.getDataWatches(); - } - - @Override - public List<String> getExistWatches() { - return super.getExistWatches(); + return cnxn.xid; } /**