This is an automated email from the ASF dual-hosted git repository.
eolivelli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/zookeeper.git
The following commit(s) were added to refs/heads/master by this push:
new d99c9e8 ZOOKEEPER-837: Eliminate cycle dependency between ClientCnxn
and ZooKeeper
d99c9e8 is described below
commit d99c9e8b701ebdbd3f0098d4d81696df97432262
Author: tison <[email protected]>
AuthorDate: Sun May 17 15:13:37 2020 +0200
ZOOKEEPER-837: Eliminate cycle dependency between ClientCnxn and ZooKeeper
1. Extract ZKWatchManager to single file
2. Move ZKWatchManager instance to ClientCnxn in order to eliminate cycle
dependency
3. let `ZooKeeper` syncs a copy of default watcher, in order to reduce
dependencies to `getWatchManager()`
Author: tison <[email protected]>
Reviewers: Enrico Olivelli <[email protected]>, Andor Molnar
<[email protected]>
Closes #1095 from TisonKun/ZOOKEEPER-837
---
.../main/java/org/apache/zookeeper/ClientCnxn.java | 110 ++---
.../org/apache/zookeeper/WatchDeregistration.java | 1 -
.../java/org/apache/zookeeper/ZKWatchManager.java | 455 +++++++++++++++++++
.../main/java/org/apache/zookeeper/ZooKeeper.java | 504 +++------------------
.../zookeeper/ClientCnxnSocketFragilityTest.java | 35 +-
.../org/apache/zookeeper/ClientReconnectTest.java | 10 +-
.../apache/zookeeper/ClientRequestTimeoutTest.java | 37 +-
.../org/apache/zookeeper/RemoveWatchesTest.java | 70 ++-
.../org/apache/zookeeper/TestableZooKeeper.java | 70 +--
9 files changed, 651 insertions(+), 641 deletions(-)
diff --git
a/zookeeper-server/src/main/java/org/apache/zookeeper/ClientCnxn.java
b/zookeeper-server/src/main/java/org/apache/zookeeper/ClientCnxn.java
index c87d3cb..87094b8 100644
--- a/zookeeper-server/src/main/java/org/apache/zookeeper/ClientCnxn.java
+++ b/zookeeper-server/src/main/java/org/apache/zookeeper/ClientCnxn.java
@@ -165,13 +165,11 @@ public class ClientCnxn {
private final int sessionTimeout;
- private final ZooKeeper zooKeeper;
-
- private final ClientWatchManager watcher;
+ private final ZKWatchManager watchManager;
private long sessionId;
- private byte[] sessionPasswd = new byte[16];
+ private byte[] sessionPasswd;
/**
* If true, the connection is allowed to go to r-o mode. This field's value
@@ -224,6 +222,10 @@ public class ClientCnxn {
*/
private long requestTimeout;
+ ZKWatchManager getWatcherManager() {
+ return watchManager;
+ }
+
public long getSessionId() {
return sessionId;
}
@@ -362,35 +364,29 @@ public class ClientCnxn {
* established until needed. The start() instance method must be called
* subsequent to construction.
*
- * @param chrootPath - the chroot of this client. Should be removed from
this Class in ZOOKEEPER-838
- * @param hostProvider
- * the list of ZooKeeper servers to connect to
- * @param sessionTimeout
- * the timeout for connections.
- * @param zooKeeper
- * the zookeeper object that this connection is related to.
- * @param watcher watcher for this connection
- * @param clientCnxnSocket
- * the socket implementation used (e.g. NIO/Netty)
- * @param canBeReadOnly
- * whether the connection is allowed to go to read-only
- * mode in case of partitioning
- * @throws IOException
+ * @param chrootPath the chroot of this client. Should be removed from
this Class in ZOOKEEPER-838
+ * @param hostProvider the list of ZooKeeper servers to connect to
+ * @param sessionTimeout the timeout for connections.
+ * @param clientConfig the client configuration.
+ * @param defaultWatcher default watcher for this connection
+ * @param clientCnxnSocket the socket implementation used (e.g. NIO/Netty)
+ * @param canBeReadOnly whether the connection is allowed to go to
read-only mode in case of partitioning
*/
public ClientCnxn(
String chrootPath,
HostProvider hostProvider,
int sessionTimeout,
- ZooKeeper zooKeeper,
- ClientWatchManager watcher,
+ ZKClientConfig clientConfig,
+ Watcher defaultWatcher,
ClientCnxnSocket clientCnxnSocket,
- boolean canBeReadOnly) throws IOException {
+ boolean canBeReadOnly
+ ) throws IOException {
this(
chrootPath,
hostProvider,
sessionTimeout,
- zooKeeper,
- watcher,
+ clientConfig,
+ defaultWatcher,
clientCnxnSocket,
0,
new byte[16],
@@ -402,48 +398,45 @@ public class ClientCnxn {
* established until needed. The start() instance method must be called
* subsequent to construction.
*
- * @param chrootPath - the chroot of this client. Should be removed from
this Class in ZOOKEEPER-838
- * @param hostProvider
- * the list of ZooKeeper servers to connect to
- * @param sessionTimeout
- * the timeout for connections.
- * @param zooKeeper
- * the zookeeper object that this connection is related to.
- * @param watcher watcher for this connection
- * @param clientCnxnSocket
- * the socket implementation used (e.g. NIO/Netty)
+ * @param chrootPath the chroot of this client. Should be removed from
this Class in ZOOKEEPER-838
+ * @param hostProvider the list of ZooKeeper servers to connect to
+ * @param sessionTimeout the timeout for connections.
+ * @param clientConfig the client configuration.
+ * @param defaultWatcher default watcher for this connection
+ * @param clientCnxnSocket the socket implementation used (e.g. NIO/Netty)
* @param sessionId session id if re-establishing session
* @param sessionPasswd session passwd if re-establishing session
- * @param canBeReadOnly
- * whether the connection is allowed to go to read-only
- * mode in case of partitioning
+ * @param canBeReadOnly whether the connection is allowed to go to
read-only mode in case of partitioning
* @throws IOException in cases of broken network
*/
public ClientCnxn(
String chrootPath,
HostProvider hostProvider,
int sessionTimeout,
- ZooKeeper zooKeeper,
- ClientWatchManager watcher,
+ ZKClientConfig clientConfig,
+ Watcher defaultWatcher,
ClientCnxnSocket clientCnxnSocket,
long sessionId,
byte[] sessionPasswd,
- boolean canBeReadOnly) throws IOException {
- this.zooKeeper = zooKeeper;
- this.watcher = watcher;
+ boolean canBeReadOnly
+ ) throws IOException {
+ this.chrootPath = chrootPath;
+ this.hostProvider = hostProvider;
+ this.sessionTimeout = sessionTimeout;
+ this.clientConfig = clientConfig;
this.sessionId = sessionId;
this.sessionPasswd = sessionPasswd;
- this.sessionTimeout = sessionTimeout;
- this.hostProvider = hostProvider;
- this.chrootPath = chrootPath;
+ this.readOnly = canBeReadOnly;
+
+ this.watchManager = new ZKWatchManager(
+
clientConfig.getBoolean(ZKClientConfig.DISABLE_AUTO_WATCH_RESET),
+ defaultWatcher);
- connectTimeout = sessionTimeout / hostProvider.size();
- readTimeout = sessionTimeout * 2 / 3;
- readOnly = canBeReadOnly;
+ this.connectTimeout = sessionTimeout / hostProvider.size();
+ this.readTimeout = sessionTimeout * 2 / 3;
- sendThread = new SendThread(clientCnxnSocket);
- eventThread = new EventThread();
- this.clientConfig = zooKeeper.getClientConfig();
+ this.sendThread = new SendThread(clientCnxnSocket);
+ this.eventThread = new EventThread();
initRequestTimeout();
}
@@ -506,10 +499,9 @@ public class ClientCnxn {
final Set<Watcher> watchers;
if (materializedWatchers == null) {
// materialize the watchers based on the event
- watchers = watcher.materialize(event.getState(),
event.getType(), event.getPath());
+ watchers = watchManager.materialize(event.getState(),
event.getType(), event.getPath());
} else {
- watchers = new HashSet<Watcher>();
- watchers.addAll(materializedWatchers);
+ watchers = new HashSet<>(materializedWatchers);
}
WatcherSetEventPair pair = new WatcherSetEventPair(watchers,
event);
// queue the pair (watch set & event) for later processing
@@ -1007,14 +999,12 @@ public class ClientCnxn {
ConnectRequest conReq = new ConnectRequest(0, lastZxid,
sessionTimeout, sessId, sessionPasswd);
// We add backwards since we are pushing into the front
// Only send if there's a pending watch
- // TODO: here we have the only remaining use of zooKeeper in
- // this class. It's to be eliminated!
if
(!clientConfig.getBoolean(ZKClientConfig.DISABLE_AUTO_WATCH_RESET)) {
- List<String> dataWatches = zooKeeper.getDataWatches();
- List<String> existWatches = zooKeeper.getExistWatches();
- List<String> childWatches = zooKeeper.getChildWatches();
- List<String> persistentWatches =
zooKeeper.getPersistentWatches();
- List<String> persistentRecursiveWatches =
zooKeeper.getPersistentRecursiveWatches();
+ List<String> dataWatches = watchManager.getDataWatchList();
+ List<String> existWatches = watchManager.getExistWatchList();
+ List<String> childWatches = watchManager.getChildWatchList();
+ List<String> persistentWatches =
watchManager.getPersistentWatchList();
+ List<String> persistentRecursiveWatches =
watchManager.getPersistentRecursiveWatchList();
if (!dataWatches.isEmpty() || !existWatches.isEmpty() ||
!childWatches.isEmpty()
|| !persistentWatches.isEmpty() ||
!persistentRecursiveWatches.isEmpty()) {
Iterator<String> dataWatchesIter =
prependChroot(dataWatches).iterator();
diff --git
a/zookeeper-server/src/main/java/org/apache/zookeeper/WatchDeregistration.java
b/zookeeper-server/src/main/java/org/apache/zookeeper/WatchDeregistration.java
index 16c7f84..710f47b 100644
---
a/zookeeper-server/src/main/java/org/apache/zookeeper/WatchDeregistration.java
+++
b/zookeeper-server/src/main/java/org/apache/zookeeper/WatchDeregistration.java
@@ -22,7 +22,6 @@ import java.util.Map;
import java.util.Set;
import org.apache.zookeeper.Watcher.Event.EventType;
import org.apache.zookeeper.Watcher.WatcherType;
-import org.apache.zookeeper.ZooKeeper.ZKWatchManager;
/**
* Handles the special case of removing watches which has registered for a
diff --git
a/zookeeper-server/src/main/java/org/apache/zookeeper/ZKWatchManager.java
b/zookeeper-server/src/main/java/org/apache/zookeeper/ZKWatchManager.java
new file mode 100644
index 0000000..95a07f0
--- /dev/null
+++ b/zookeeper-server/src/main/java/org/apache/zookeeper/ZKWatchManager.java
@@ -0,0 +1,455 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import org.apache.zookeeper.server.watch.PathParentIterator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Manage watchers and handle events generated by the {@link ClientCnxn}
object.
+ *
+ * This class is intended to be packaged-private so that it doesn't serve
+ * as part of ZooKeeper client API.
+ */
+class ZKWatchManager implements ClientWatchManager {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(ZKWatchManager.class);
+
+ private final Map<String, Set<Watcher>> dataWatches = new HashMap<>();
+ private final Map<String, Set<Watcher>> existWatches = new HashMap<>();
+ private final Map<String, Set<Watcher>> childWatches = new HashMap<>();
+ private final Map<String, Set<Watcher>> persistentWatches = new
HashMap<>();
+ private final Map<String, Set<Watcher>> persistentRecursiveWatches = new
HashMap<>();
+ private final boolean disableAutoWatchReset;
+
+ private volatile Watcher defaultWatcher;
+
+ ZKWatchManager(boolean disableAutoWatchReset, Watcher defaultWatcher) {
+ this.disableAutoWatchReset = disableAutoWatchReset;
+ this.defaultWatcher = defaultWatcher;
+ }
+
+ void setDefaultWatcher(Watcher defaultWatcher) {
+ this.defaultWatcher = defaultWatcher;
+ }
+
+ Watcher getDefaultWatcher() {
+ return defaultWatcher;
+ }
+
+ List<String> getDataWatchList() {
+ synchronized (dataWatches) {
+ return new ArrayList<>(dataWatches.keySet());
+ }
+ }
+
+ List<String> getChildWatchList() {
+ synchronized (childWatches) {
+ return new ArrayList<>(childWatches.keySet());
+ }
+ }
+
+ List<String> getExistWatchList() {
+ synchronized (existWatches) {
+ return new ArrayList<>(existWatches.keySet());
+ }
+ }
+
+ List<String> getPersistentWatchList() {
+ synchronized (persistentWatches) {
+ return new ArrayList<>(persistentWatches.keySet());
+ }
+ }
+
+ List<String> getPersistentRecursiveWatchList() {
+ synchronized (persistentRecursiveWatches) {
+ return new ArrayList<>(persistentRecursiveWatches.keySet());
+ }
+ }
+
+ Map<String, Set<Watcher>> getDataWatches() {
+ return dataWatches;
+ }
+
+ Map<String, Set<Watcher>> getExistWatches() {
+ return existWatches;
+ }
+
+ Map<String, Set<Watcher>> getChildWatches() {
+ return childWatches;
+ }
+
+ Map<String, Set<Watcher>> getPersistentWatches() {
+ return persistentWatches;
+ }
+
+ Map<String, Set<Watcher>> getPersistentRecursiveWatches() {
+ return persistentRecursiveWatches;
+ }
+
+ private void addTo(Set<Watcher> from, Set<Watcher> to) {
+ if (from != null) {
+ to.addAll(from);
+ }
+ }
+
+ public Map<Watcher.Event.EventType, Set<Watcher>> removeWatcher(
+ String clientPath,
+ Watcher watcher,
+ Watcher.WatcherType watcherType,
+ boolean local,
+ int rc
+ ) throws KeeperException {
+ // Validate the provided znode path contains the given watcher of
+ // watcherType
+ containsWatcher(clientPath, watcher, watcherType);
+
+ Map<Watcher.Event.EventType, Set<Watcher>> removedWatchers = new
HashMap<>();
+ HashSet<Watcher> childWatchersToRem = new HashSet<>();
+ removedWatchers.put(Watcher.Event.EventType.ChildWatchRemoved,
childWatchersToRem);
+ HashSet<Watcher> dataWatchersToRem = new HashSet<>();
+ removedWatchers.put(Watcher.Event.EventType.DataWatchRemoved,
dataWatchersToRem);
+ HashSet<Watcher> persistentWatchersToRem = new HashSet<>();
+ removedWatchers.put(Watcher.Event.EventType.PersistentWatchRemoved,
persistentWatchersToRem);
+ boolean removedWatcher = false;
+ switch (watcherType) {
+ case Children: {
+ synchronized (childWatches) {
+ removedWatcher = removeWatches(childWatches, watcher,
clientPath, local, rc, childWatchersToRem);
+ }
+ break;
+ }
+ case Data: {
+ synchronized (dataWatches) {
+ removedWatcher = removeWatches(dataWatches, watcher,
clientPath, local, rc, dataWatchersToRem);
+ }
+
+ synchronized (existWatches) {
+ boolean removedDataWatcher = removeWatches(existWatches,
watcher, clientPath, local, rc, dataWatchersToRem);
+ removedWatcher |= removedDataWatcher;
+ }
+ break;
+ }
+ case Any: {
+ synchronized (childWatches) {
+ removedWatcher = removeWatches(childWatches, watcher,
clientPath, local, rc, childWatchersToRem);
+ }
+
+ synchronized (dataWatches) {
+ boolean removedDataWatcher = removeWatches(dataWatches,
watcher, clientPath, local, rc, dataWatchersToRem);
+ removedWatcher |= removedDataWatcher;
+ }
+
+ synchronized (existWatches) {
+ boolean removedDataWatcher = removeWatches(existWatches,
watcher, clientPath, local, rc, dataWatchersToRem);
+ removedWatcher |= removedDataWatcher;
+ }
+
+ synchronized (persistentWatches) {
+ boolean removedPersistentWatcher =
removeWatches(persistentWatches,
+ watcher, clientPath, local, rc,
persistentWatchersToRem);
+ removedWatcher |= removedPersistentWatcher;
+ }
+
+ synchronized (persistentRecursiveWatches) {
+ boolean removedPersistentRecursiveWatcher =
removeWatches(persistentRecursiveWatches,
+ watcher, clientPath, local, rc,
persistentWatchersToRem);
+ removedWatcher |= removedPersistentRecursiveWatcher;
+ }
+ }
+ }
+ // Watcher function doesn't exists for the specified params
+ if (!removedWatcher) {
+ throw new KeeperException.NoWatcherException(clientPath);
+ }
+ return removedWatchers;
+ }
+
+ private boolean contains(String path, Watcher watcherObj, Map<String,
Set<Watcher>> pathVsWatchers) {
+ boolean watcherExists = true;
+ if (pathVsWatchers == null || pathVsWatchers.size() == 0) {
+ watcherExists = false;
+ } else {
+ Set<Watcher> watchers = pathVsWatchers.get(path);
+ if (watchers == null) {
+ watcherExists = false;
+ } else if (watcherObj == null) {
+ watcherExists = watchers.size() > 0;
+ } else {
+ watcherExists = watchers.contains(watcherObj);
+ }
+ }
+ return watcherExists;
+ }
+
+ /**
+ * Validate the provided znode path contains the given watcher and
+ * watcherType
+ *
+ * @param path
+ * - client path
+ * @param watcher
+ * - watcher object reference
+ * @param watcherType
+ * - type of the watcher
+ * @throws KeeperException.NoWatcherException
+ */
+ void containsWatcher(String path, Watcher watcher, Watcher.WatcherType
watcherType) throws
+ KeeperException.NoWatcherException {
+ boolean containsWatcher = false;
+ switch (watcherType) {
+ case Children: {
+ synchronized (childWatches) {
+ containsWatcher = contains(path, watcher, childWatches);
+ }
+
+ synchronized (persistentWatches) {
+ boolean contains_temp = contains(path, watcher,
+ persistentWatches);
+ containsWatcher |= contains_temp;
+ }
+
+ synchronized (persistentRecursiveWatches) {
+ boolean contains_temp = contains(path, watcher,
+ persistentRecursiveWatches);
+ containsWatcher |= contains_temp;
+ }
+ break;
+ }
+ case Data: {
+ synchronized (dataWatches) {
+ containsWatcher = contains(path, watcher, dataWatches);
+ }
+
+ synchronized (existWatches) {
+ boolean contains_temp = contains(path, watcher, existWatches);
+ containsWatcher |= contains_temp;
+ }
+
+ synchronized (persistentWatches) {
+ boolean contains_temp = contains(path, watcher,
+ persistentWatches);
+ containsWatcher |= contains_temp;
+ }
+
+ synchronized (persistentRecursiveWatches) {
+ boolean contains_temp = contains(path, watcher,
+ persistentRecursiveWatches);
+ containsWatcher |= contains_temp;
+ }
+ break;
+ }
+ case Any: {
+ synchronized (childWatches) {
+ containsWatcher = contains(path, watcher, childWatches);
+ }
+
+ synchronized (dataWatches) {
+ boolean contains_temp = contains(path, watcher, dataWatches);
+ containsWatcher |= contains_temp;
+ }
+
+ synchronized (existWatches) {
+ boolean contains_temp = contains(path, watcher, existWatches);
+ containsWatcher |= contains_temp;
+ }
+
+ synchronized (persistentWatches) {
+ boolean contains_temp = contains(path, watcher,
+ persistentWatches);
+ containsWatcher |= contains_temp;
+ }
+
+ synchronized (persistentRecursiveWatches) {
+ boolean contains_temp = contains(path, watcher,
+ persistentRecursiveWatches);
+ containsWatcher |= contains_temp;
+ }
+ }
+ }
+ // Watcher function doesn't exists for the specified params
+ if (!containsWatcher) {
+ throw new KeeperException.NoWatcherException(path);
+ }
+ }
+
+ protected boolean removeWatches(
+ Map<String, Set<Watcher>> pathVsWatcher,
+ Watcher watcher,
+ String path,
+ boolean local,
+ int rc,
+ Set<Watcher> removedWatchers) throws KeeperException {
+ if (!local && rc != KeeperException.Code.OK.intValue()) {
+ throw KeeperException.create(KeeperException.Code.get(rc), path);
+ }
+ boolean success = false;
+ // When local flag is true, remove watchers for the given path
+ // irrespective of rc. Otherwise shouldn't remove watchers locally
+ // when sees failure from server.
+ if (rc == KeeperException.Code.OK.intValue() || (local && rc !=
KeeperException.Code.OK.intValue())) {
+ // Remove all the watchers for the given path
+ if (watcher == null) {
+ Set<Watcher> pathWatchers = pathVsWatcher.remove(path);
+ if (pathWatchers != null) {
+ // found path watchers
+ removedWatchers.addAll(pathWatchers);
+ success = true;
+ }
+ } else {
+ Set<Watcher> watchers = pathVsWatcher.get(path);
+ if (watchers != null) {
+ if (watchers.remove(watcher)) {
+ // found path watcher
+ removedWatchers.add(watcher);
+ // cleanup <path vs watchlist>
+ if (watchers.size() <= 0) {
+ pathVsWatcher.remove(path);
+ }
+ success = true;
+ }
+ }
+ }
+ }
+ return success;
+ }
+
+ /* (non-Javadoc)
+ * @see
org.apache.zookeeper.ClientWatchManager#materialize(Event.KeeperState,
+ * Event.EventType,
java.lang.String)
+ */
+ @Override
+ public Set<Watcher> materialize(
+ Watcher.Event.KeeperState state,
+ Watcher.Event.EventType type,
+ String clientPath
+ ) {
+ final Set<Watcher> result = new HashSet<>();
+
+ switch (type) {
+ case None:
+ if (defaultWatcher != null) {
+ result.add(defaultWatcher);
+ }
+
+ boolean clear = disableAutoWatchReset && state !=
Watcher.Event.KeeperState.SyncConnected;
+ synchronized (dataWatches) {
+ for (Set<Watcher> ws : dataWatches.values()) {
+ result.addAll(ws);
+ }
+ if (clear) {
+ dataWatches.clear();
+ }
+ }
+
+ synchronized (existWatches) {
+ for (Set<Watcher> ws : existWatches.values()) {
+ result.addAll(ws);
+ }
+ if (clear) {
+ existWatches.clear();
+ }
+ }
+
+ synchronized (childWatches) {
+ for (Set<Watcher> ws : childWatches.values()) {
+ result.addAll(ws);
+ }
+ if (clear) {
+ childWatches.clear();
+ }
+ }
+
+ synchronized (persistentWatches) {
+ for (Set<Watcher> ws: persistentWatches.values()) {
+ result.addAll(ws);
+ }
+ }
+
+ synchronized (persistentRecursiveWatches) {
+ for (Set<Watcher> ws: persistentRecursiveWatches.values()) {
+ result.addAll(ws);
+ }
+ }
+
+ return result;
+ case NodeDataChanged:
+ case NodeCreated:
+ synchronized (dataWatches) {
+ addTo(dataWatches.remove(clientPath), result);
+ }
+ synchronized (existWatches) {
+ addTo(existWatches.remove(clientPath), result);
+ }
+ addPersistentWatches(clientPath, result);
+ break;
+ case NodeChildrenChanged:
+ synchronized (childWatches) {
+ addTo(childWatches.remove(clientPath), result);
+ }
+ addPersistentWatches(clientPath, result);
+ break;
+ case NodeDeleted:
+ synchronized (dataWatches) {
+ addTo(dataWatches.remove(clientPath), result);
+ }
+ // TODO This shouldn't be needed, but just in case
+ synchronized (existWatches) {
+ Set<Watcher> list = existWatches.remove(clientPath);
+ if (list != null) {
+ addTo(list, result);
+ LOG.warn("We are triggering an exists watch for delete!
Shouldn't happen!");
+ }
+ }
+ synchronized (childWatches) {
+ addTo(childWatches.remove(clientPath), result);
+ }
+ addPersistentWatches(clientPath, result);
+ break;
+ default:
+ String errorMsg = String.format(
+ "Unhandled watch event type %s with state %s on path %s",
+ type,
+ state,
+ clientPath);
+ LOG.error(errorMsg);
+ throw new RuntimeException(errorMsg);
+ }
+
+ return result;
+ }
+
+ private void addPersistentWatches(String clientPath, Set<Watcher> result) {
+ synchronized (persistentWatches) {
+ addTo(persistentWatches.get(clientPath), result);
+ }
+ synchronized (persistentRecursiveWatches) {
+ for (String path :
PathParentIterator.forAll(clientPath).asIterable()) {
+ addTo(persistentRecursiveWatches.get(path), result);
+ }
+ }
+ }
+}
diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/ZooKeeper.java
b/zookeeper-server/src/main/java/org/apache/zookeeper/ZooKeeper.java
index 0210493..201cf20 100644
--- a/zookeeper-server/src/main/java/org/apache/zookeeper/ZooKeeper.java
+++ b/zookeeper-server/src/main/java/org/apache/zookeeper/ZooKeeper.java
@@ -24,7 +24,6 @@ import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.util.ArrayList;
import java.util.Collection;
-import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
@@ -40,10 +39,7 @@ import org.apache.zookeeper.AsyncCallback.MultiCallback;
import org.apache.zookeeper.AsyncCallback.StatCallback;
import org.apache.zookeeper.AsyncCallback.StringCallback;
import org.apache.zookeeper.AsyncCallback.VoidCallback;
-import org.apache.zookeeper.KeeperException.Code;
-import org.apache.zookeeper.KeeperException.NoWatcherException;
import org.apache.zookeeper.OpResult.ErrorResult;
-import org.apache.zookeeper.Watcher.Event.EventType;
import org.apache.zookeeper.Watcher.WatcherType;
import org.apache.zookeeper.client.ConnectStringParser;
import org.apache.zookeeper.client.HostProvider;
@@ -85,7 +81,6 @@ import org.apache.zookeeper.proto.SyncRequest;
import org.apache.zookeeper.proto.SyncResponse;
import org.apache.zookeeper.server.DataTree;
import org.apache.zookeeper.server.EphemeralType;
-import org.apache.zookeeper.server.watch.PathParentIterator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -231,8 +226,6 @@ public class ZooKeeper implements AutoCloseable {
return cnxn.zooKeeperSaslClient;
}
- protected final ZKWatchManager watchManager;
-
private final ZKClientConfig clientConfig;
public ZKClientConfig getClientConfig() {
@@ -240,407 +233,37 @@ public class ZooKeeper implements AutoCloseable {
}
protected List<String> getDataWatches() {
- synchronized (watchManager.dataWatches) {
- List<String> rc = new
ArrayList<String>(watchManager.dataWatches.keySet());
- return rc;
- }
+ return getWatchManager().getDataWatchList();
}
+
protected List<String> getExistWatches() {
- synchronized (watchManager.existWatches) {
- List<String> rc = new
ArrayList<String>(watchManager.existWatches.keySet());
- return rc;
- }
+ return getWatchManager().getExistWatchList();
}
+
protected List<String> getChildWatches() {
- synchronized (watchManager.childWatches) {
- List<String> rc = new
ArrayList<String>(watchManager.childWatches.keySet());
- return rc;
- }
+ return getWatchManager().getChildWatchList();
}
+
protected List<String> getPersistentWatches() {
- synchronized (watchManager.persistentWatches) {
- List<String> rc = new
ArrayList<String>(watchManager.persistentWatches.keySet());
- return rc;
- }
+ return getWatchManager().getPersistentWatchList();
}
+
protected List<String> getPersistentRecursiveWatches() {
- synchronized (watchManager.persistentRecursiveWatches) {
- List<String> rc = new
ArrayList<String>(watchManager.persistentRecursiveWatches.keySet());
- return rc;
- }
+ return getWatchManager().getPersistentRecursiveWatchList();
}
- /**
- * Manage watchers and handle events generated by the ClientCnxn object.
- *
- * We are implementing this as a nested class of ZooKeeper so that
- * the public methods will not be exposed as part of the ZooKeeper client
- * API.
- */
- static class ZKWatchManager implements ClientWatchManager {
-
- private final Map<String, Set<Watcher>> dataWatches = new
HashMap<String, Set<Watcher>>();
- private final Map<String, Set<Watcher>> existWatches = new
HashMap<String, Set<Watcher>>();
- private final Map<String, Set<Watcher>> childWatches = new
HashMap<String, Set<Watcher>>();
- private final Map<String, Set<Watcher>> persistentWatches = new
HashMap<String, Set<Watcher>>();
- private final Map<String, Set<Watcher>> persistentRecursiveWatches =
new HashMap<String, Set<Watcher>>();
- private boolean disableAutoWatchReset;
-
- ZKWatchManager(boolean disableAutoWatchReset) {
- this.disableAutoWatchReset = disableAutoWatchReset;
- }
-
- protected volatile Watcher defaultWatcher;
-
- private void addTo(Set<Watcher> from, Set<Watcher> to) {
- if (from != null) {
- to.addAll(from);
- }
- }
-
- public Map<EventType, Set<Watcher>> removeWatcher(
- String clientPath,
- Watcher watcher,
- WatcherType watcherType,
- boolean local,
- int rc) throws KeeperException {
- // Validate the provided znode path contains the given watcher of
- // watcherType
- containsWatcher(clientPath, watcher, watcherType);
-
- Map<EventType, Set<Watcher>> removedWatchers = new HashMap<>();
- HashSet<Watcher> childWatchersToRem = new HashSet<>();
- removedWatchers.put(EventType.ChildWatchRemoved,
childWatchersToRem);
- HashSet<Watcher> dataWatchersToRem = new HashSet<>();
- removedWatchers.put(EventType.DataWatchRemoved, dataWatchersToRem);
- HashSet<Watcher> persistentWatchersToRem = new HashSet<>();
- removedWatchers.put(EventType.PersistentWatchRemoved,
persistentWatchersToRem);
- boolean removedWatcher = false;
- switch (watcherType) {
- case Children: {
- synchronized (childWatches) {
- removedWatcher = removeWatches(childWatches, watcher,
clientPath, local, rc, childWatchersToRem);
- }
- break;
- }
- case Data: {
- synchronized (dataWatches) {
- removedWatcher = removeWatches(dataWatches, watcher,
clientPath, local, rc, dataWatchersToRem);
- }
-
- synchronized (existWatches) {
- boolean removedDataWatcher = removeWatches(existWatches,
watcher, clientPath, local, rc, dataWatchersToRem);
- removedWatcher |= removedDataWatcher;
- }
- break;
- }
- case Any: {
- synchronized (childWatches) {
- removedWatcher = removeWatches(childWatches, watcher,
clientPath, local, rc, childWatchersToRem);
- }
-
- synchronized (dataWatches) {
- boolean removedDataWatcher = removeWatches(dataWatches,
watcher, clientPath, local, rc, dataWatchersToRem);
- removedWatcher |= removedDataWatcher;
- }
-
- synchronized (existWatches) {
- boolean removedDataWatcher = removeWatches(existWatches,
watcher, clientPath, local, rc, dataWatchersToRem);
- removedWatcher |= removedDataWatcher;
- }
-
- synchronized (persistentWatches) {
- boolean removedPersistentWatcher =
removeWatches(persistentWatches,
- watcher, clientPath, local, rc,
persistentWatchersToRem);
- removedWatcher |= removedPersistentWatcher;
- }
-
- synchronized (persistentRecursiveWatches) {
- boolean removedPersistentRecursiveWatcher =
removeWatches(persistentRecursiveWatches,
- watcher, clientPath, local, rc,
persistentWatchersToRem);
- removedWatcher |= removedPersistentRecursiveWatcher;
- }
- }
- }
- // Watcher function doesn't exists for the specified params
- if (!removedWatcher) {
- throw new KeeperException.NoWatcherException(clientPath);
- }
- return removedWatchers;
- }
-
- private boolean contains(String path, Watcher watcherObj, Map<String,
Set<Watcher>> pathVsWatchers) {
- boolean watcherExists = true;
- if (pathVsWatchers == null || pathVsWatchers.size() == 0) {
- watcherExists = false;
- } else {
- Set<Watcher> watchers = pathVsWatchers.get(path);
- if (watchers == null) {
- watcherExists = false;
- } else if (watcherObj == null) {
- watcherExists = watchers.size() > 0;
- } else {
- watcherExists = watchers.contains(watcherObj);
- }
- }
- return watcherExists;
- }
-
- /**
- * Validate the provided znode path contains the given watcher and
- * watcherType
- *
- * @param path
- * - client path
- * @param watcher
- * - watcher object reference
- * @param watcherType
- * - type of the watcher
- * @throws NoWatcherException
- */
- void containsWatcher(String path, Watcher watcher, WatcherType
watcherType) throws NoWatcherException {
- boolean containsWatcher = false;
- switch (watcherType) {
- case Children: {
- synchronized (childWatches) {
- containsWatcher = contains(path, watcher, childWatches);
- }
-
- synchronized (persistentWatches) {
- boolean contains_temp = contains(path, watcher,
- persistentWatches);
- containsWatcher |= contains_temp;
- }
-
- synchronized (persistentRecursiveWatches) {
- boolean contains_temp = contains(path, watcher,
- persistentRecursiveWatches);
- containsWatcher |= contains_temp;
- }
- break;
- }
- case Data: {
- synchronized (dataWatches) {
- containsWatcher = contains(path, watcher, dataWatches);
- }
-
- synchronized (existWatches) {
- boolean contains_temp = contains(path, watcher,
existWatches);
- containsWatcher |= contains_temp;
- }
-
- synchronized (persistentWatches) {
- boolean contains_temp = contains(path, watcher,
- persistentWatches);
- containsWatcher |= contains_temp;
- }
-
- synchronized (persistentRecursiveWatches) {
- boolean contains_temp = contains(path, watcher,
- persistentRecursiveWatches);
- containsWatcher |= contains_temp;
- }
- break;
- }
- case Any: {
- synchronized (childWatches) {
- containsWatcher = contains(path, watcher, childWatches);
- }
-
- synchronized (dataWatches) {
- boolean contains_temp = contains(path, watcher,
dataWatches);
- containsWatcher |= contains_temp;
- }
-
- synchronized (existWatches) {
- boolean contains_temp = contains(path, watcher,
existWatches);
- containsWatcher |= contains_temp;
- }
-
- synchronized (persistentWatches) {
- boolean contains_temp = contains(path, watcher,
- persistentWatches);
- containsWatcher |= contains_temp;
- }
-
- synchronized (persistentRecursiveWatches) {
- boolean contains_temp = contains(path, watcher,
- persistentRecursiveWatches);
- containsWatcher |= contains_temp;
- }
- }
- }
- // Watcher function doesn't exists for the specified params
- if (!containsWatcher) {
- throw new KeeperException.NoWatcherException(path);
- }
- }
-
- protected boolean removeWatches(
- Map<String, Set<Watcher>> pathVsWatcher,
- Watcher watcher,
- String path,
- boolean local,
- int rc,
- Set<Watcher> removedWatchers) throws KeeperException {
- if (!local && rc != Code.OK.intValue()) {
- throw KeeperException.create(KeeperException.Code.get(rc),
path);
- }
- boolean success = false;
- // When local flag is true, remove watchers for the given path
- // irrespective of rc. Otherwise shouldn't remove watchers locally
- // when sees failure from server.
- if (rc == Code.OK.intValue() || (local && rc !=
Code.OK.intValue())) {
- // Remove all the watchers for the given path
- if (watcher == null) {
- Set<Watcher> pathWatchers = pathVsWatcher.remove(path);
- if (pathWatchers != null) {
- // found path watchers
- removedWatchers.addAll(pathWatchers);
- success = true;
- }
- } else {
- Set<Watcher> watchers = pathVsWatcher.get(path);
- if (watchers != null) {
- if (watchers.remove(watcher)) {
- // found path watcher
- removedWatchers.add(watcher);
- // cleanup <path vs watchlist>
- if (watchers.size() <= 0) {
- pathVsWatcher.remove(path);
- }
- success = true;
- }
- }
- }
- }
- return success;
- }
-
- /* (non-Javadoc)
- * @see
org.apache.zookeeper.ClientWatchManager#materialize(Event.KeeperState,
- *
Event.EventType, java.lang.String)
- */
- @Override
- public Set<Watcher> materialize(
- Watcher.Event.KeeperState state,
- Watcher.Event.EventType type,
- String clientPath
- ) {
- final Set<Watcher> result = new HashSet<>();
-
- switch (type) {
- case None:
- if (defaultWatcher != null) {
- result.add(defaultWatcher);
- }
-
- boolean clear = disableAutoWatchReset && state !=
Watcher.Event.KeeperState.SyncConnected;
- synchronized (dataWatches) {
- for (Set<Watcher> ws : dataWatches.values()) {
- result.addAll(ws);
- }
- if (clear) {
- dataWatches.clear();
- }
- }
-
- synchronized (existWatches) {
- for (Set<Watcher> ws : existWatches.values()) {
- result.addAll(ws);
- }
- if (clear) {
- existWatches.clear();
- }
- }
-
- synchronized (childWatches) {
- for (Set<Watcher> ws : childWatches.values()) {
- result.addAll(ws);
- }
- if (clear) {
- childWatches.clear();
- }
- }
-
- synchronized (persistentWatches) {
- for (Set<Watcher> ws: persistentWatches.values()) {
- result.addAll(ws);
- }
- }
-
- synchronized (persistentRecursiveWatches) {
- for (Set<Watcher> ws: persistentRecursiveWatches.values())
{
- result.addAll(ws);
- }
- }
-
- return result;
- case NodeDataChanged:
- case NodeCreated:
- synchronized (dataWatches) {
- addTo(dataWatches.remove(clientPath), result);
- }
- synchronized (existWatches) {
- addTo(existWatches.remove(clientPath), result);
- }
- addPersistentWatches(clientPath, result);
- break;
- case NodeChildrenChanged:
- synchronized (childWatches) {
- addTo(childWatches.remove(clientPath), result);
- }
- addPersistentWatches(clientPath, result);
- break;
- case NodeDeleted:
- synchronized (dataWatches) {
- addTo(dataWatches.remove(clientPath), result);
- }
- // TODO This shouldn't be needed, but just in case
- synchronized (existWatches) {
- Set<Watcher> list = existWatches.remove(clientPath);
- if (list != null) {
- addTo(list, result);
- LOG.warn("We are triggering an exists watch for
delete! Shouldn't happen!");
- }
- }
- synchronized (childWatches) {
- addTo(childWatches.remove(clientPath), result);
- }
- addPersistentWatches(clientPath, result);
- break;
- default:
- String errorMsg = String.format(
- "Unhandled watch event type %s with state %s on path %s",
- type,
- state,
- clientPath);
- LOG.error(errorMsg);
- throw new RuntimeException(errorMsg);
- }
-
- return result;
- }
-
- private void addPersistentWatches(String clientPath, Set<Watcher>
result) {
- synchronized (persistentWatches) {
- addTo(persistentWatches.get(clientPath), result);
- }
- synchronized (persistentRecursiveWatches) {
- for (String path :
PathParentIterator.forAll(clientPath).asIterable()) {
- addTo(persistentRecursiveWatches.get(path), result);
- }
- }
- }
+ ZKWatchManager getWatchManager() {
+ return cnxn.getWatcherManager();
}
/**
* Register a watcher for a particular path.
*/
- public abstract class WatchRegistration {
+ public abstract static class WatchRegistration {
private Watcher watcher;
private String clientPath;
+
public WatchRegistration(Watcher watcher, String clientPath) {
this.watcher = watcher;
this.clientPath = clientPath;
@@ -689,7 +312,7 @@ public class ZooKeeper implements AutoCloseable {
@Override
protected Map<String, Set<Watcher>> getWatches(int rc) {
- return rc == 0 ? watchManager.dataWatches :
watchManager.existWatches;
+ return rc == 0 ? getWatchManager().getDataWatches() :
getWatchManager().getExistWatches();
}
@Override
@@ -707,7 +330,7 @@ public class ZooKeeper implements AutoCloseable {
@Override
protected Map<String, Set<Watcher>> getWatches(int rc) {
- return watchManager.dataWatches;
+ return getWatchManager().getDataWatches();
}
}
@@ -720,7 +343,7 @@ public class ZooKeeper implements AutoCloseable {
@Override
protected Map<String, Set<Watcher>> getWatches(int rc) {
- return watchManager.childWatches;
+ return getWatchManager().getChildWatches();
}
}
@@ -737,9 +360,9 @@ public class ZooKeeper implements AutoCloseable {
protected Map<String, Set<Watcher>> getWatches(int rc) {
switch (mode) {
case PERSISTENT:
- return watchManager.persistentWatches;
+ return getWatchManager().getPersistentWatches();
case PERSISTENT_RECURSIVE:
- return watchManager.persistentRecursiveWatches;
+ return getWatchManager().getPersistentRecursiveWatches();
}
throw new IllegalArgumentException("Mode not supported: " + mode);
}
@@ -989,7 +612,7 @@ public class ZooKeeper implements AutoCloseable {
* connects to one in read-only mode, i.e. read requests are
* allowed while write requests are not. It continues seeking
for
* majority in the background.
- * @param aHostProvider
+ * @param hostProvider
* use this as HostProvider to enable custom behaviour.
* @param clientConfig
* (added in 3.5.2) passing this conf object gives each client
the flexibility of
@@ -1004,49 +627,45 @@ public class ZooKeeper implements AutoCloseable {
int sessionTimeout,
Watcher watcher,
boolean canBeReadOnly,
- HostProvider aHostProvider,
- ZKClientConfig clientConfig) throws IOException {
+ HostProvider hostProvider,
+ ZKClientConfig clientConfig
+ ) throws IOException {
LOG.info(
"Initiating client connection, connectString={} sessionTimeout={}
watcher={}",
connectString,
sessionTimeout,
watcher);
- if (clientConfig == null) {
- clientConfig = new ZKClientConfig();
- }
- this.clientConfig = clientConfig;
- watchManager = defaultWatchManager();
- watchManager.defaultWatcher = watcher;
+ this.clientConfig = clientConfig != null ? clientConfig : new
ZKClientConfig();
+ this.hostProvider = hostProvider;
ConnectStringParser connectStringParser = new
ConnectStringParser(connectString);
- hostProvider = aHostProvider;
cnxn = createConnection(
connectStringParser.getChrootPath(),
hostProvider,
sessionTimeout,
- this,
- watchManager,
+ this.clientConfig,
+ watcher,
getClientCnxnSocket(),
canBeReadOnly);
cnxn.start();
}
- // @VisibleForTesting
- protected ClientCnxn createConnection(
+ ClientCnxn createConnection(
String chrootPath,
HostProvider hostProvider,
int sessionTimeout,
- ZooKeeper zooKeeper,
- ClientWatchManager watcher,
+ ZKClientConfig clientConfig,
+ Watcher defaultWatcher,
ClientCnxnSocket clientCnxnSocket,
- boolean canBeReadOnly) throws IOException {
+ boolean canBeReadOnly
+ ) throws IOException {
return new ClientCnxn(
chrootPath,
hostProvider,
sessionTimeout,
- this,
- watchManager,
+ clientConfig,
+ defaultWatcher,
clientCnxnSocket,
canBeReadOnly);
}
@@ -1383,7 +1002,7 @@ public class ZooKeeper implements AutoCloseable {
* connects to one in read-only mode, i.e. read requests are
* allowed while write requests are not. It continues seeking
for
* majority in the background.
- * @param aHostProvider
+ * @param hostProvider
* use this as HostProvider to enable custom behaviour.
* @param clientConfig
* (added in 3.5.2) passing this conf object gives each client
the flexibility of
@@ -1400,7 +1019,7 @@ public class ZooKeeper implements AutoCloseable {
long sessionId,
byte[] sessionPasswd,
boolean canBeReadOnly,
- HostProvider aHostProvider,
+ HostProvider hostProvider,
ZKClientConfig clientConfig) throws IOException {
LOG.info(
"Initiating client connection, connectString={} "
@@ -1411,22 +1030,16 @@ public class ZooKeeper implements AutoCloseable {
Long.toHexString(sessionId),
(sessionPasswd == null ? "<null>" : "<hidden>"));
- if (clientConfig == null) {
- clientConfig = new ZKClientConfig();
- }
- this.clientConfig = clientConfig;
- watchManager = defaultWatchManager();
- watchManager.defaultWatcher = watcher;
-
+ this.clientConfig = clientConfig != null ? clientConfig : new
ZKClientConfig();
ConnectStringParser connectStringParser = new
ConnectStringParser(connectString);
- hostProvider = aHostProvider;
+ this.hostProvider = hostProvider;
cnxn = new ClientCnxn(
connectStringParser.getChrootPath(),
hostProvider,
sessionTimeout,
- this,
- watchManager,
+ this.clientConfig,
+ watcher,
getClientCnxnSocket(),
sessionId,
sessionPasswd,
@@ -1523,11 +1136,6 @@ public class ZooKeeper implements AutoCloseable {
return new ZooKeeperTestable(cnxn);
}
- /* Useful for testing watch handling behavior */
- protected ZKWatchManager defaultWatchManager() {
- return new
ZKWatchManager(getClientConfig().getBoolean(ZKClientConfig.DISABLE_AUTO_WATCH_RESET));
- }
-
/**
* The session id for this ZooKeeper client instance. The value returned is
* not valid until the client connects to a server and may change after a
@@ -1582,11 +1190,9 @@ public class ZooKeeper implements AutoCloseable {
/**
* Specify the default watcher for the connection (overrides the one
* specified during construction).
- *
- * @param watcher
*/
public synchronized void register(Watcher watcher) {
- watchManager.defaultWatcher = watcher;
+ getWatchManager().setDefaultWatcher(watcher);
}
/**
@@ -3213,9 +2819,11 @@ public class ZooKeeper implements AutoCloseable {
* error code.
* @since 3.6.0
*/
- public void addWatch(String basePath, AddWatchMode mode)
- throws KeeperException, InterruptedException {
- addWatch(basePath, watchManager.defaultWatcher, mode);
+ public void addWatch(
+ String basePath,
+ AddWatchMode mode
+ ) throws KeeperException, InterruptedException {
+ addWatch(basePath, getWatchManager().getDefaultWatcher(), mode);
}
/**
@@ -3229,8 +2837,12 @@ public class ZooKeeper implements AutoCloseable {
* @throws IllegalArgumentException if an invalid path is specified
* @since 3.6.0
*/
- public void addWatch(String basePath, Watcher watcher, AddWatchMode mode,
- VoidCallback cb, Object ctx) {
+ public void addWatch(
+ String basePath,
+ Watcher watcher, AddWatchMode mode,
+ VoidCallback cb,
+ Object ctx
+ ) {
PathUtils.validatePath(basePath);
String serverPath = prependChroot(basePath);
@@ -3251,9 +2863,8 @@ public class ZooKeeper implements AutoCloseable {
* @throws IllegalArgumentException if an invalid path is specified
* @since 3.6.0
*/
- public void addWatch(String basePath, AddWatchMode mode,
- VoidCallback cb, Object ctx) {
- addWatch(basePath, watchManager.defaultWatcher, mode, cb, ctx);
+ public void addWatch(String basePath, AddWatchMode mode, VoidCallback cb,
Object ctx) {
+ addWatch(basePath, getWatchManager().getDefaultWatcher(), mode, cb,
ctx);
}
private void validateWatcher(Watcher watcher) {
@@ -3271,7 +2882,7 @@ public class ZooKeeper implements AutoCloseable {
PathUtils.validatePath(path);
final String clientPath = path;
final String serverPath = prependChroot(clientPath);
- WatchDeregistration wcb = new WatchDeregistration(clientPath, watcher,
watcherType, local, watchManager);
+ WatchDeregistration wcb = new WatchDeregistration(clientPath, watcher,
watcherType, local, getWatchManager());
RequestHeader h = new RequestHeader();
h.setType(opCode);
@@ -3294,7 +2905,7 @@ public class ZooKeeper implements AutoCloseable {
PathUtils.validatePath(path);
final String clientPath = path;
final String serverPath = prependChroot(clientPath);
- WatchDeregistration wcb = new WatchDeregistration(clientPath, watcher,
watcherType, local, watchManager);
+ WatchDeregistration wcb = new WatchDeregistration(clientPath, watcher,
watcherType, local, getWatchManager());
RequestHeader h = new RequestHeader();
h.setType(opCode);
@@ -3424,8 +3035,9 @@ public class ZooKeeper implements AutoCloseable {
*/
private Watcher getDefaultWatcher(boolean required) {
if (required) {
- if (watchManager.defaultWatcher != null) {
- return watchManager.defaultWatcher;
+ final Watcher defaultWatcher =
getWatchManager().getDefaultWatcher();
+ if (defaultWatcher != null) {
+ return defaultWatcher;
} else {
throw new IllegalStateException("Default watcher is required,
but it is null.");
}
diff --git
a/zookeeper-server/src/test/java/org/apache/zookeeper/ClientCnxnSocketFragilityTest.java
b/zookeeper-server/src/test/java/org/apache/zookeeper/ClientCnxnSocketFragilityTest.java
index 2c7f6bd..5ed43fc 100644
---
a/zookeeper-server/src/test/java/org/apache/zookeeper/ClientCnxnSocketFragilityTest.java
+++
b/zookeeper-server/src/test/java/org/apache/zookeeper/ClientCnxnSocketFragilityTest.java
@@ -281,11 +281,19 @@ public class ClientCnxnSocketFragilityTest extends
QuorumPeerTestBase {
String chrootPath,
HostProvider hostProvider,
int sessionTimeout,
- ZooKeeper zooKeeper,
- ClientWatchManager watcher,
+ ZKClientConfig zkClientConfig,
+ Watcher defaultWatcher,
ClientCnxnSocket clientCnxnSocket,
- boolean canBeReadOnly) throws IOException {
- super(chrootPath, hostProvider, sessionTimeout, zooKeeper,
watcher, clientCnxnSocket, canBeReadOnly);
+ boolean canBeReadOnly
+ ) throws IOException {
+ super(
+ chrootPath,
+ hostProvider,
+ sessionTimeout,
+ zkClientConfig,
+ defaultWatcher,
+ clientCnxnSocket,
+ canBeReadOnly);
}
void attemptClose() {
@@ -344,18 +352,25 @@ public class ClientCnxnSocketFragilityTest extends
QuorumPeerTestBase {
return cnxn.getState().isAlive();
}
- @Override
- protected ClientCnxn createConnection(
+ ClientCnxn createConnection(
String chrootPath,
HostProvider hostProvider,
int sessionTimeout,
- ZooKeeper zooKeeper,
- ClientWatchManager watcher,
+ ZKClientConfig clientConfig,
+ Watcher defaultWatcher,
ClientCnxnSocket clientCnxnSocket,
- boolean canBeReadOnly) throws IOException {
+ boolean canBeReadOnly
+ ) throws IOException {
Assert.assertTrue(clientCnxnSocket instanceof
FragileClientCnxnSocketNIO);
socket = (FragileClientCnxnSocketNIO) clientCnxnSocket;
- ClientCnxnSocketFragilityTest.this.cnxn = new
CustomClientCnxn(chrootPath, hostProvider, sessionTimeout, zooKeeper, watcher,
clientCnxnSocket, canBeReadOnly);
+ ClientCnxnSocketFragilityTest.this.cnxn = new CustomClientCnxn(
+ chrootPath,
+ hostProvider,
+ sessionTimeout,
+ clientConfig,
+ defaultWatcher,
+ clientCnxnSocket,
+ canBeReadOnly);
return ClientCnxnSocketFragilityTest.this.cnxn;
}
}
diff --git
a/zookeeper-server/src/test/java/org/apache/zookeeper/ClientReconnectTest.java
b/zookeeper-server/src/test/java/org/apache/zookeeper/ClientReconnectTest.java
index e77283c..41f52ea 100644
---
a/zookeeper-server/src/test/java/org/apache/zookeeper/ClientReconnectTest.java
+++
b/zookeeper-server/src/test/java/org/apache/zookeeper/ClientReconnectTest.java
@@ -66,8 +66,14 @@ public class ClientReconnectTest extends ZKTestCase {
sc = SocketChannel.open();
ClientCnxnSocketNIO nioCnxn = new MockCnxn();
- ClientWatchManager watcher = mock(ClientWatchManager.class);
- ClientCnxn clientCnxn = new ClientCnxn("tmp", hostProvider, 5000, zk,
watcher, nioCnxn, false);
+ ClientCnxn clientCnxn = new ClientCnxn(
+ "tmp",
+ hostProvider,
+ 5000,
+ zk.getClientConfig(),
+ DummyWatcher.INSTANCE,
+ nioCnxn,
+ false);
clientCnxn.start();
countDownLatch.await(5000, TimeUnit.MILLISECONDS);
assertTrue(countDownLatch.getCount() == 0);
diff --git
a/zookeeper-server/src/test/java/org/apache/zookeeper/ClientRequestTimeoutTest.java
b/zookeeper-server/src/test/java/org/apache/zookeeper/ClientRequestTimeoutTest.java
index 7c3bf51..d4f95ae 100644
---
a/zookeeper-server/src/test/java/org/apache/zookeeper/ClientRequestTimeoutTest.java
+++
b/zookeeper-server/src/test/java/org/apache/zookeeper/ClientRequestTimeoutTest.java
@@ -25,6 +25,7 @@ import static org.junit.Assert.fail;
import java.io.IOException;
import org.apache.zookeeper.ZooDefs.Ids;
import org.apache.zookeeper.client.HostProvider;
+import org.apache.zookeeper.client.ZKClientConfig;
import org.apache.zookeeper.server.quorum.QuorumPeerTestBase;
import org.apache.zookeeper.test.ClientBase;
import org.apache.zookeeper.test.ClientBase.CountdownWatcher;
@@ -110,15 +111,23 @@ public class ClientRequestTimeoutTest extends
QuorumPeerTestBase {
class CustomClientCnxn extends ClientCnxn {
- public CustomClientCnxn(
+ CustomClientCnxn(
String chrootPath,
HostProvider hostProvider,
int sessionTimeout,
- ZooKeeper zooKeeper,
- ClientWatchManager watcher,
+ ZKClientConfig clientConfig,
+ Watcher defaultWatcher,
ClientCnxnSocket clientCnxnSocket,
- boolean canBeReadOnly) throws IOException {
- super(chrootPath, hostProvider, sessionTimeout, zooKeeper,
watcher, clientCnxnSocket, canBeReadOnly);
+ boolean canBeReadOnly
+ ) throws IOException {
+ super(
+ chrootPath,
+ hostProvider,
+ sessionTimeout,
+ clientConfig,
+ defaultWatcher,
+ clientCnxnSocket,
+ canBeReadOnly);
}
@Override
@@ -140,15 +149,23 @@ public class ClientRequestTimeoutTest extends
QuorumPeerTestBase {
}
@Override
- protected ClientCnxn createConnection(
+ ClientCnxn createConnection(
String chrootPath,
HostProvider hostProvider,
int sessionTimeout,
- ZooKeeper zooKeeper,
- ClientWatchManager watcher,
+ ZKClientConfig clientConfig,
+ Watcher defaultWatcher,
ClientCnxnSocket clientCnxnSocket,
- boolean canBeReadOnly) throws IOException {
- return new CustomClientCnxn(chrootPath, hostProvider,
sessionTimeout, zooKeeper, watcher, clientCnxnSocket, canBeReadOnly);
+ boolean canBeReadOnly
+ ) throws IOException {
+ return new CustomClientCnxn(
+ chrootPath,
+ hostProvider,
+ sessionTimeout,
+ clientConfig,
+ defaultWatcher,
+ clientCnxnSocket,
+ canBeReadOnly);
}
}
diff --git
a/zookeeper-server/src/test/java/org/apache/zookeeper/RemoveWatchesTest.java
b/zookeeper-server/src/test/java/org/apache/zookeeper/RemoveWatchesTest.java
index 30f1558..8bb91e3 100644
--- a/zookeeper-server/src/test/java/org/apache/zookeeper/RemoveWatchesTest.java
+++ b/zookeeper-server/src/test/java/org/apache/zookeeper/RemoveWatchesTest.java
@@ -18,12 +18,16 @@
package org.apache.zookeeper;
+import static org.hamcrest.CoreMatchers.is;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.spy;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
@@ -37,11 +41,9 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.apache.commons.collections.CollectionUtils;
import org.apache.zookeeper.KeeperException.Code;
-import org.apache.zookeeper.KeeperException.NoWatcherException;
import org.apache.zookeeper.Watcher.Event.EventType;
import org.apache.zookeeper.Watcher.WatcherType;
import org.apache.zookeeper.ZooDefs.Ids;
-import org.apache.zookeeper.client.ZKClientConfig;
import org.apache.zookeeper.server.ServerCnxn;
import org.apache.zookeeper.test.ClientBase;
import org.junit.Test;
@@ -657,7 +659,9 @@ public class RemoveWatchesTest extends ClientBase {
@Test(timeout = 90000)
public void testNoWatcherServerException() throws InterruptedException,
IOException, TimeoutException {
CountdownWatcher watcher = new CountdownWatcher();
- MyZooKeeper zk = new MyZooKeeper(hostPort, CONNECTION_TIMEOUT,
watcher);
+ ZooKeeper zk = spy(new ZooKeeper(hostPort, CONNECTION_TIMEOUT,
watcher));
+ MyWatchManager watchManager = new MyWatchManager(false, watcher);
+ doReturn(watchManager).when(zk).getWatchManager();
boolean nw = false;
watcher.waitForConnected(CONNECTION_TIMEOUT);
@@ -670,8 +674,8 @@ public class RemoveWatchesTest extends ClientBase {
}
}
- assertTrue("Server didn't return NOWATCHER", zk.getRemoveWatchesRC()
== Code.NOWATCHER.intValue());
- assertTrue("NoWatcherException didn't happen", nw);
+ assertThat("Server didn't return NOWATCHER",
watchManager.lastReturnCode, is(Code.NOWATCHER.intValue()));
+ assertThat("NoWatcherException didn't happen", nw, is(true));
}
/**
@@ -920,54 +924,32 @@ public class RemoveWatchesTest extends ClientBase {
assertEquals("Received watch notification after removal!", 2,
watchCount.getCount());
}
- /* a mocked ZK class that doesn't do client-side verification
- * before/after calling removeWatches */
- private class MyZooKeeper extends ZooKeeper {
+ private static class MyWatchManager extends ZKWatchManager {
- class MyWatchManager extends ZKWatchManager {
+ int lastReturnCode;
- public MyWatchManager(boolean disableAutoWatchReset) {
- super(disableAutoWatchReset);
- }
-
- public int lastrc;
-
- /* Pretend that any watcher exists */
- void containsWatcher(String path, Watcher watcher, WatcherType
watcherType) throws NoWatcherException {
- }
-
- /* save the return error code by the server */
- protected boolean removeWatches(
- Map<String, Set<Watcher>> pathVsWatcher,
- Watcher watcher,
- String path,
- boolean local,
- int rc,
- Set<Watcher> removedWatchers) throws KeeperException {
- lastrc = rc;
- return false;
- }
-
- }
-
- public MyZooKeeper(String hp, int timeout, Watcher watcher) throws
IOException {
- super(hp, timeout, watcher, false);
+ MyWatchManager(boolean disableAutoWatchReset, Watcher defaultWatcher) {
+ super(disableAutoWatchReset, defaultWatcher);
}
- private MyWatchManager myWatchManager;
-
- protected ZKWatchManager defaultWatchManager() {
- myWatchManager = new
MyWatchManager(getClientConfig().getBoolean(ZKClientConfig.DISABLE_AUTO_WATCH_RESET));
- return myWatchManager;
+ void containsWatcher(String path, Watcher watcher, WatcherType
watcherType) {
+ // prevent contains watcher
}
- public int getRemoveWatchesRC() {
- return myWatchManager.lastrc;
+ @Override
+ protected boolean removeWatches(
+ Map<String, Set<Watcher>> pathVsWatcher,
+ Watcher watcher,
+ String path,
+ boolean local,
+ int rc,
+ Set<Watcher> removedWatchers) {
+ lastReturnCode = rc;
+ return false;
}
-
}
- private class MyWatcher implements Watcher {
+ private static class MyWatcher implements Watcher {
private final String path;
private String eventPath;
diff --git
a/zookeeper-server/src/test/java/org/apache/zookeeper/TestableZooKeeper.java
b/zookeeper-server/src/test/java/org/apache/zookeeper/TestableZooKeeper.java
index a02127a..7f9e41e 100644
--- a/zookeeper-server/src/test/java/org/apache/zookeeper/TestableZooKeeper.java
+++ b/zookeeper-server/src/test/java/org/apache/zookeeper/TestableZooKeeper.java
@@ -20,12 +20,10 @@ package org.apache.zookeeper;
import java.io.IOException;
import java.net.SocketAddress;
-import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.apache.jute.Record;
import org.apache.zookeeper.admin.ZooKeeperAdmin;
-import org.apache.zookeeper.client.HostProvider;
import org.apache.zookeeper.proto.ReplyHeader;
import org.apache.zookeeper.proto.RequestHeader;
@@ -35,76 +33,12 @@ public class TestableZooKeeper extends ZooKeeperAdmin {
super(host, sessionTimeout, watcher);
}
- class TestableClientCnxn extends ClientCnxn {
-
- TestableClientCnxn(
- String chrootPath,
- HostProvider hostProvider,
- int sessionTimeout,
- ZooKeeper zooKeeper,
- ClientWatchManager watcher,
- ClientCnxnSocket clientCnxnSocket,
- boolean canBeReadOnly) throws IOException {
- super(chrootPath,
- hostProvider,
- sessionTimeout,
- zooKeeper,
- watcher,
- clientCnxnSocket,
- 0,
- new byte[16],
- canBeReadOnly);
- }
-
- void setXid(int newXid) {
- xid = newXid;
- }
-
- int checkXid() {
- return xid;
- }
-
- }
-
- protected ClientCnxn createConnection(
- String chrootPath,
- HostProvider hostProvider,
- int sessionTimeout,
- ZooKeeper zooKeeper,
- ClientWatchManager watcher,
- ClientCnxnSocket clientCnxnSocket,
- boolean canBeReadOnly) throws IOException {
- return new TestableClientCnxn(
- chrootPath,
- hostProvider,
- sessionTimeout,
- this,
- watcher,
- clientCnxnSocket,
- canBeReadOnly);
- }
-
public void setXid(int xid) {
- ((TestableClientCnxn) cnxn).setXid(xid);
+ cnxn.xid = xid;
}
public int checkXid() {
- return ((TestableClientCnxn) cnxn).checkXid();
- }
-
- @Override
- public List<String> getChildWatches() {
- return super.getChildWatches();
- }
-
- @Override
- public List<String> getDataWatches() {
- return super.getDataWatches();
- }
-
- @Override
- public List<String> getExistWatches() {
- return super.getExistWatches();
+ return cnxn.xid;
}
/**