This is an automated email from the ASF dual-hosted git repository.
eolivelli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/zookeeper.git
The following commit(s) were added to refs/heads/master by this push:
new 5536393 ZOOKEEPER-1416 - Persistent, recursive watchers
5536393 is described below
commit 553639378d5cf86c2488afff4586e5e4cce38061
Author: randgalt <[email protected]>
AuthorDate: Fri Nov 8 17:30:25 2019 +0100
ZOOKEEPER-1416 - Persistent, recursive watchers
### Background
Note: this is a port of https://github.com/apache/zookeeper/pull/136
Implementation for a persistent, recursive watch addition for ZK. These
watches are set via a new method, addPersistentWatch() and are removed via the
existing watcher removal methods. Persistent, recursive watches have these
characteristics: a) Once set, they do not auto-remove when triggered; b) they
trigger for all event types (child, data, etc.) on the node they are registered
for and any child znode recursively; c) they are efficiently implemented by
using the existing watch inte [...]
### Implementation Details
- A new enum manages the different "modes" for watchers: `WatcherMode`.
- For traditional, "standard" watchers, the code path is almost exactly the
same. There is very little overhead other than a bit of extra checks in
`WatchManager`
- Given how this is implemented it was difficult to add support when
`WatchManagerOptimized` is used. I'm open to adding it for that version but it
will take work. We should consider not supporting persistent/recursive watchers
when WatchManagerOptimized is used. I notice that `WatchManagerOptimized` is
not even mentioned in the docs.
- The mode for a given watcher/path pair is held in a map inside of
`WatcherModeManager`. The absence of an entry means Standard. This way, there's
no overhead for old, standard watchers.
- `PathParentIterator` is the "meat" of the implementation. Rather than set
watchers on every ZNode implied by a recursive watcher. WatchManager passes any
paths it processes through PathParentIterator which iterates up each parent
znode looking for watchers.
- The remainder of the changes are scaffolding to match how other watchers
are used as well as Jute/API changes to set persistent/recursive watchers
### Testing
The tests were written years ago. I think they're comprehensive but
reviewers should pay attention to anything that was missed. There is much
ZooKeeper knowledge that's only in the heads of ZK committers.
- `PersistentWatcherTest` - tests persistent, non-recursive watchers
- `PersistentRecursiveWatcherTest` - tests persistent, recursive watchers
- `PathParentIteratorTest`- exercises edges of PathParentIterator
Author: randgalt <[email protected]>
Reviewers: Enrico Olivelli <eolivelli@apache,org>, Norbert Kalmar
<[email protected]>, Andor Molnár <[email protected]>, Justin Mao Ling
<[email protected]>
Closes #1106 from Randgalt/ZOOKEEPER-1416
---
.../src/main/resources/markdown/zookeeperOver.md | 5 +
.../resources/markdown/zookeeperProgrammers.md | 27 ++-
zookeeper-jute/src/main/resources/zookeeper.jute | 12 ++
.../java/org/apache/zookeeper/AddWatchMode.java | 67 +++++++
.../main/java/org/apache/zookeeper/ClientCnxn.java | 36 +++-
.../main/java/org/apache/zookeeper/Watcher.java | 5 +-
.../main/java/org/apache/zookeeper/ZooDefs.java | 11 ++
.../main/java/org/apache/zookeeper/ZooKeeper.java | 204 ++++++++++++++++++++
.../java/org/apache/zookeeper/ZooKeeperMain.java | 2 +
.../org/apache/zookeeper/cli/AddWatchCommand.java | 87 +++++++++
.../java/org/apache/zookeeper/server/DataTree.java | 18 +-
.../zookeeper/server/FinalRequestProcessor.java | 33 +++-
.../zookeeper/server/PrepRequestProcessor.java | 2 +
.../java/org/apache/zookeeper/server/Request.java | 5 +
.../org/apache/zookeeper/server/ZKDatabase.java | 21 +-
.../server/util/RequestPathMetricsCollector.java | 2 +
.../zookeeper/server/watch/IWatchManager.java | 24 +++
.../zookeeper/server/watch/PathParentIterator.java | 106 +++++++++++
.../zookeeper/server/watch/WatchManager.java | 142 ++++++++++----
.../apache/zookeeper/server/watch/WatcherMode.java | 56 ++++++
.../zookeeper/server/watch/WatcherModeManager.java | 96 ++++++++++
.../server/watch/PathParentIteratorTest.java | 84 ++++++++
.../server/watch/RecursiveWatchQtyTest.java | 197 +++++++++++++++++++
.../zookeeper/server/watch/WatchManagerTest.java | 2 +-
.../test/PersistentRecursiveWatcherTest.java | 174 +++++++++++++++++
.../zookeeper/test/PersistentWatcherTest.java | 211 +++++++++++++++++++++
.../zookeeper/test/UnsupportedAddWatcherTest.java | 124 ++++++++++++
27 files changed, 1699 insertions(+), 54 deletions(-)
diff --git a/zookeeper-docs/src/main/resources/markdown/zookeeperOver.md
b/zookeeper-docs/src/main/resources/markdown/zookeeperOver.md
index 1c00609..4c60a3d 100644
--- a/zookeeper-docs/src/main/resources/markdown/zookeeperOver.md
+++ b/zookeeper-docs/src/main/resources/markdown/zookeeperOver.md
@@ -146,6 +146,11 @@ receives a packet saying that the znode has changed. If the
connection between the client and one of the ZooKeeper servers is
broken, the client will receive a local notification.
+**New in 3.6.0:** Clients can also set
+permanent, recursive watches on a znode that are not removed when triggered
+and that trigger for changes on the registered znode as well as any children
+znodes recursively.
+
<a name="Guarantees"></a>
### Guarantees
diff --git a/zookeeper-docs/src/main/resources/markdown/zookeeperProgrammers.md
b/zookeeper-docs/src/main/resources/markdown/zookeeperProgrammers.md
index 03e83c0..08d30c8 100644
--- a/zookeeper-docs/src/main/resources/markdown/zookeeperProgrammers.md
+++ b/zookeeper-docs/src/main/resources/markdown/zookeeperProgrammers.md
@@ -32,6 +32,7 @@ limitations under the License.
* [ZooKeeper Sessions](#ch_zkSessions)
* [ZooKeeper Watches](#ch_zkWatches)
* [Semantics of Watches](#sc_WatchSemantics)
+ * [Persistent, Recursive Watches](#sc_WatchPersistentRecursive)
* [Remove Watches](#sc_WatchRemoval)
* [What ZooKeeper Guarantees about Watches](#sc_WatchGuarantees)
* [Things to Remember about Watches](#sc_WatchRememberThese)
@@ -640,6 +641,11 @@ general this all occurs transparently. There is one case
where a watch
may be missed: a watch for the existence of a znode not yet created will
be missed if the znode is created and deleted while disconnected.
+**New in 3.6.0:** Clients can also set
+permanent, recursive watches on a znode that are not removed when triggered
+and that trigger for changes on the registered znode as well as any children
+znodes recursively.
+
<a name="sc_WatchSemantics"></a>
### Semantics of Watches
@@ -657,6 +663,21 @@ the events that a watch can trigger and the calls that
enable them:
* **Child event:**
Enabled with a call to getChildren.
+<a name="sc_WatchPersistentRecursive"></a>
+
+### Persistent, Recursive Watches
+
+**New in 3.6.0:** There is now a variation on the standard
+watch described above whereby you can set a watch that does not get removed
when triggered.
+Additionally, these watches trigger the event types *NodeCreated*,
*NodeDeleted*, and *NodeDataChanged*
+and, optionally, recursively for all znodes starting at the znode that the
watch is registered for. Note
+that *NodeChildrenChanged* events are not triggered for persistent recursive
watches as it would be redundant.
+
+Persistent watches are set using the method *addWatch()*. The triggering
semantics and guarantees
+(other than one-time triggering) are the same as standard watches. The only
exception regarding events is that
+recursive persistent watchers never trigger child changed events as they are
redundant.
+Persistent watches are removed using *removeWatches()* with watcher type
*WatcherType.Any*.
+
<a name="sc_WatchRemoval"></a>
### Remove Watches
@@ -671,6 +692,8 @@ successful watch removal.
Watcher which was added with a call to getChildren.
* **Data Remove event:**
Watcher which was added with a call to exists or getData.
+* **Persistent Remove event:**
+ Watcher which was added with a call to add a persistent watch.
<a name="sc_WatchGuarantees"></a>
@@ -693,11 +716,11 @@ guarantees:
### Things to Remember about Watches
-* Watches are one time triggers; if you get a watch event and
+* Standard watches are one time triggers; if you get a watch event and
you want to get notified of future changes, you must set another
watch.
-* Because watches are one time triggers and there is latency
+* Because standard watches are one time triggers and there is latency
between getting the event and sending a new request to get a watch
you cannot reliably see every change that happens to a node in
ZooKeeper. Be prepared to handle the case where the znode changes
diff --git a/zookeeper-jute/src/main/resources/zookeeper.jute
b/zookeeper-jute/src/main/resources/zookeeper.jute
index 8310664..6d55365 100644
--- a/zookeeper-jute/src/main/resources/zookeeper.jute
+++ b/zookeeper-jute/src/main/resources/zookeeper.jute
@@ -73,6 +73,14 @@ module org.apache.zookeeper.proto {
vector<ustring>existWatches;
vector<ustring>childWatches;
}
+ class SetWatches2 {
+ long relativeZxid;
+ vector<ustring>dataWatches;
+ vector<ustring>existWatches;
+ vector<ustring>childWatches;
+ vector<ustring>persistentWatches;
+ vector<ustring>persistentRecursiveWatches;
+ }
class RequestHeader {
int xid;
int type;
@@ -180,6 +188,10 @@ module org.apache.zookeeper.proto {
class SetACLResponse {
org.apache.zookeeper.data.Stat stat;
}
+ class AddWatchRequest {
+ ustring path;
+ int mode;
+ }
class WatcherEvent {
int type; // event type
int state; // state of the Keeper client runtime
diff --git
a/zookeeper-server/src/main/java/org/apache/zookeeper/AddWatchMode.java
b/zookeeper-server/src/main/java/org/apache/zookeeper/AddWatchMode.java
new file mode 100644
index 0000000..0f339c1
--- /dev/null
+++ b/zookeeper-server/src/main/java/org/apache/zookeeper/AddWatchMode.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper;
+
+/**
+ * Modes available to {@link ZooKeeper#addWatch(String, Watcher, AddWatchMode)}
+ */
+public enum AddWatchMode {
+ /**
+ * <p>
+ * Set a watcher on the given path that does not get removed when
triggered (i.e. it stays active
+ * until it is removed). This watcher
+ * is triggered for both data and child events. To remove the watcher, use
+ * <tt>removeWatches()</tt> with <tt>WatcherType.Any</tt>. The watcher
behaves as if you placed an exists() watch and
+ * a getData() watch on the ZNode at the given path.
+ * </p>
+ */
+ PERSISTENT(ZooDefs.AddWatchModes.persistent),
+
+ /**
+ * <p>
+ * Set a watcher on the given path that: a) does not get removed when
triggered (i.e. it stays active
+ * until it is removed); b) applies not only to the registered path but
all child paths recursively. This watcher
+ * is triggered for both data and child events. To remove the watcher, use
+ * <tt>removeWatches()</tt> with <tt>WatcherType.Any</tt>
+ * </p>
+ *
+ * <p>
+ * The watcher behaves as if you placed an exists() watch and
+ * a getData() watch on the ZNode at the given path <strong>and</strong>
any ZNodes that are children
+ * of the given path including children added later.
+ * </p>
+ *
+ * <p>
+ * NOTE: when there are active recursive watches there is a small
performance decrease as all segments
+ * of ZNode paths must be checked for watch triggering.
+ * </p>
+ */
+ PERSISTENT_RECURSIVE(ZooDefs.AddWatchModes.persistentRecursive)
+ ;
+
+ public int getMode() {
+ return mode;
+ }
+
+ private final int mode;
+
+ AddWatchMode(int mode) {
+ this.mode = mode;
+ }
+}
diff --git
a/zookeeper-server/src/main/java/org/apache/zookeeper/ClientCnxn.java
b/zookeeper-server/src/main/java/org/apache/zookeeper/ClientCnxn.java
index b0f7b07..3713646 100644
--- a/zookeeper-server/src/main/java/org/apache/zookeeper/ClientCnxn.java
+++ b/zookeeper-server/src/main/java/org/apache/zookeeper/ClientCnxn.java
@@ -86,6 +86,7 @@ import org.apache.zookeeper.proto.RequestHeader;
import org.apache.zookeeper.proto.SetACLResponse;
import org.apache.zookeeper.proto.SetDataResponse;
import org.apache.zookeeper.proto.SetWatches;
+import org.apache.zookeeper.proto.SetWatches2;
import org.apache.zookeeper.proto.WatcherEvent;
import org.apache.zookeeper.server.ByteBufferInputStream;
import org.apache.zookeeper.server.ZooKeeperThread;
@@ -990,16 +991,24 @@ public class ClientCnxn {
List<String> dataWatches = zooKeeper.getDataWatches();
List<String> existWatches = zooKeeper.getExistWatches();
List<String> childWatches = zooKeeper.getChildWatches();
- if (!dataWatches.isEmpty() || !existWatches.isEmpty() ||
!childWatches.isEmpty()) {
+ List<String> persistentWatches =
zooKeeper.getPersistentWatches();
+ List<String> persistentRecursiveWatches =
zooKeeper.getPersistentRecursiveWatches();
+ if (!dataWatches.isEmpty() || !existWatches.isEmpty() ||
!childWatches.isEmpty()
+ || !persistentWatches.isEmpty() ||
!persistentRecursiveWatches.isEmpty()) {
Iterator<String> dataWatchesIter =
prependChroot(dataWatches).iterator();
Iterator<String> existWatchesIter =
prependChroot(existWatches).iterator();
Iterator<String> childWatchesIter =
prependChroot(childWatches).iterator();
+ Iterator<String> persistentWatchesIter =
prependChroot(persistentWatches).iterator();
+ Iterator<String> persistentRecursiveWatchesIter =
prependChroot(persistentRecursiveWatches).iterator();
long setWatchesLastZxid = lastZxid;
- while (dataWatchesIter.hasNext() ||
existWatchesIter.hasNext() || childWatchesIter.hasNext()) {
+ while (dataWatchesIter.hasNext() ||
existWatchesIter.hasNext() || childWatchesIter.hasNext()
+ || persistentWatchesIter.hasNext() ||
persistentRecursiveWatchesIter.hasNext()) {
List<String> dataWatchesBatch = new
ArrayList<String>();
List<String> existWatchesBatch = new
ArrayList<String>();
List<String> childWatchesBatch = new
ArrayList<String>();
+ List<String> persistentWatchesBatch = new
ArrayList<String>();
+ List<String> persistentRecursiveWatchesBatch = new
ArrayList<String>();
int batchLength = 0;
// Note, we may exceed our max length by a bit when we
add the last
@@ -1015,15 +1024,32 @@ public class ClientCnxn {
} else if (childWatchesIter.hasNext()) {
watch = childWatchesIter.next();
childWatchesBatch.add(watch);
+ } else if (persistentWatchesIter.hasNext()) {
+ watch = persistentWatchesIter.next();
+ persistentWatchesBatch.add(watch);
+ } else if
(persistentRecursiveWatchesIter.hasNext()) {
+ watch = persistentRecursiveWatchesIter.next();
+ persistentRecursiveWatchesBatch.add(watch);
} else {
break;
}
batchLength += watch.length();
}
- SetWatches sw = new SetWatches(setWatchesLastZxid,
dataWatchesBatch, existWatchesBatch, childWatchesBatch);
- RequestHeader header = new RequestHeader(-8,
OpCode.setWatches);
- Packet packet = new Packet(header, new ReplyHeader(),
sw, null, null);
+ Record record;
+ int opcode;
+ if (persistentWatchesBatch.isEmpty() &&
persistentRecursiveWatchesBatch.isEmpty()) {
+ // maintain compatibility with older servers - if
no persistent/recursive watchers
+ // are used, use the old version of SetWatches
+ record = new SetWatches(setWatchesLastZxid,
dataWatchesBatch, existWatchesBatch, childWatchesBatch);
+ opcode = OpCode.setWatches;
+ } else {
+ record = new SetWatches2(setWatchesLastZxid,
dataWatchesBatch, existWatchesBatch,
+ childWatchesBatch, persistentWatchesBatch,
persistentRecursiveWatchesBatch);
+ opcode = OpCode.setWatches2;
+ }
+ RequestHeader header = new RequestHeader(-8, opcode);
+ Packet packet = new Packet(header, new ReplyHeader(),
record, null, null);
outgoingQueue.addFirst(packet);
}
}
diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/Watcher.java
b/zookeeper-server/src/main/java/org/apache/zookeeper/Watcher.java
index db81fdf..ab4b654 100644
--- a/zookeeper-server/src/main/java/org/apache/zookeeper/Watcher.java
+++ b/zookeeper-server/src/main/java/org/apache/zookeeper/Watcher.java
@@ -143,7 +143,8 @@ public interface Watcher {
NodeDataChanged(3),
NodeChildrenChanged(4),
DataWatchRemoved(5),
- ChildWatchRemoved(6);
+ ChildWatchRemoved(6),
+ PersistentWatchRemoved (7);
private final int intValue; // Integer representation of value
// for sending over wire
@@ -172,6 +173,8 @@ public interface Watcher {
return EventType.DataWatchRemoved;
case 6:
return EventType.ChildWatchRemoved;
+ case 7:
+ return EventType.PersistentWatchRemoved;
default:
throw new RuntimeException("Invalid integer value for
conversion to EventType");
diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/ZooDefs.java
b/zookeeper-server/src/main/java/org/apache/zookeeper/ZooDefs.java
index db176f5..a12e580 100644
--- a/zookeeper-server/src/main/java/org/apache/zookeeper/ZooDefs.java
+++ b/zookeeper-server/src/main/java/org/apache/zookeeper/ZooDefs.java
@@ -89,6 +89,10 @@ public class ZooDefs {
int getAllChildrenNumber = 104;
+ int setWatches2 = 105;
+
+ int addWatch = 106;
+
int createSession = -10;
int closeSession = -11;
@@ -148,6 +152,13 @@ public class ZooDefs {
}
+ @InterfaceAudience.Public
+ public interface AddWatchModes {
+ int persistent = 0; // matches AddWatchMode.PERSISTENT
+
+ int persistentRecursive = 1; // matches
AddWatchMode.PERSISTENT_RECURSIVE
+ }
+
public static final String[] opNames = {"notification", "create",
"delete", "exists", "getData", "setData", "getACL", "setACL", "getChildren",
"getChildren2", "getMaxChildren", "setMaxChildren", "ping", "reconfig",
"getConfig"};
}
diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/ZooKeeper.java
b/zookeeper-server/src/main/java/org/apache/zookeeper/ZooKeeper.java
index 7f3b848..f6f165d 100644
--- a/zookeeper-server/src/main/java/org/apache/zookeeper/ZooKeeper.java
+++ b/zookeeper-server/src/main/java/org/apache/zookeeper/ZooKeeper.java
@@ -53,12 +53,14 @@ import org.apache.zookeeper.client.ZooKeeperSaslClient;
import org.apache.zookeeper.common.PathUtils;
import org.apache.zookeeper.data.ACL;
import org.apache.zookeeper.data.Stat;
+import org.apache.zookeeper.proto.AddWatchRequest;
import org.apache.zookeeper.proto.CheckWatchesRequest;
import org.apache.zookeeper.proto.Create2Response;
import org.apache.zookeeper.proto.CreateRequest;
import org.apache.zookeeper.proto.CreateResponse;
import org.apache.zookeeper.proto.CreateTTLRequest;
import org.apache.zookeeper.proto.DeleteRequest;
+import org.apache.zookeeper.proto.ErrorResponse;
import org.apache.zookeeper.proto.ExistsRequest;
import org.apache.zookeeper.proto.GetACLRequest;
import org.apache.zookeeper.proto.GetACLResponse;
@@ -83,6 +85,7 @@ import org.apache.zookeeper.proto.SyncRequest;
import org.apache.zookeeper.proto.SyncResponse;
import org.apache.zookeeper.server.DataTree;
import org.apache.zookeeper.server.EphemeralType;
+import org.apache.zookeeper.server.watch.PathParentIterator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -254,6 +257,18 @@ public class ZooKeeper implements AutoCloseable {
return rc;
}
}
+ protected List<String> getPersistentWatches() {
+ synchronized (watchManager.persistentWatches) {
+ List<String> rc = new
ArrayList<String>(watchManager.persistentWatches.keySet());
+ return rc;
+ }
+ }
+ protected List<String> getPersistentRecursiveWatches() {
+ synchronized (watchManager.persistentRecursiveWatches) {
+ List<String> rc = new
ArrayList<String>(watchManager.persistentRecursiveWatches.keySet());
+ return rc;
+ }
+ }
/**
* Manage watchers and handle events generated by the ClientCnxn object.
@@ -267,6 +282,8 @@ public class ZooKeeper implements AutoCloseable {
private final Map<String, Set<Watcher>> dataWatches = new
HashMap<String, Set<Watcher>>();
private final Map<String, Set<Watcher>> existWatches = new
HashMap<String, Set<Watcher>>();
private final Map<String, Set<Watcher>> childWatches = new
HashMap<String, Set<Watcher>>();
+ private final Map<String, Set<Watcher>> persistentWatches = new
HashMap<String, Set<Watcher>>();
+ private final Map<String, Set<Watcher>> persistentRecursiveWatches =
new HashMap<String, Set<Watcher>>();
private boolean disableAutoWatchReset;
ZKWatchManager(boolean disableAutoWatchReset) {
@@ -296,6 +313,8 @@ public class ZooKeeper implements AutoCloseable {
removedWatchers.put(EventType.ChildWatchRemoved,
childWatchersToRem);
HashSet<Watcher> dataWatchersToRem = new HashSet<>();
removedWatchers.put(EventType.DataWatchRemoved, dataWatchersToRem);
+ HashSet<Watcher> persistentWatchersToRem = new HashSet<>();
+ removedWatchers.put(EventType.PersistentWatchRemoved,
persistentWatchersToRem);
boolean removedWatcher = false;
switch (watcherType) {
case Children: {
@@ -324,10 +343,23 @@ public class ZooKeeper implements AutoCloseable {
boolean removedDataWatcher = removeWatches(dataWatches,
watcher, clientPath, local, rc, dataWatchersToRem);
removedWatcher |= removedDataWatcher;
}
+
synchronized (existWatches) {
boolean removedDataWatcher = removeWatches(existWatches,
watcher, clientPath, local, rc, dataWatchersToRem);
removedWatcher |= removedDataWatcher;
}
+
+ synchronized (persistentWatches) {
+ boolean removedPersistentWatcher =
removeWatches(persistentWatches,
+ watcher, clientPath, local, rc,
persistentWatchersToRem);
+ removedWatcher |= removedPersistentWatcher;
+ }
+
+ synchronized (persistentRecursiveWatches) {
+ boolean removedPersistentRecursiveWatcher =
removeWatches(persistentRecursiveWatches,
+ watcher, clientPath, local, rc,
persistentWatchersToRem);
+ removedWatcher |= removedPersistentRecursiveWatcher;
+ }
}
}
// Watcher function doesn't exists for the specified params
@@ -373,6 +405,18 @@ public class ZooKeeper implements AutoCloseable {
synchronized (childWatches) {
containsWatcher = contains(path, watcher, childWatches);
}
+
+ synchronized (persistentWatches) {
+ boolean contains_temp = contains(path, watcher,
+ persistentWatches);
+ containsWatcher |= contains_temp;
+ }
+
+ synchronized (persistentRecursiveWatches) {
+ boolean contains_temp = contains(path, watcher,
+ persistentRecursiveWatches);
+ containsWatcher |= contains_temp;
+ }
break;
}
case Data: {
@@ -384,6 +428,18 @@ public class ZooKeeper implements AutoCloseable {
boolean contains_temp = contains(path, watcher,
existWatches);
containsWatcher |= contains_temp;
}
+
+ synchronized (persistentWatches) {
+ boolean contains_temp = contains(path, watcher,
+ persistentWatches);
+ containsWatcher |= contains_temp;
+ }
+
+ synchronized (persistentRecursiveWatches) {
+ boolean contains_temp = contains(path, watcher,
+ persistentRecursiveWatches);
+ containsWatcher |= contains_temp;
+ }
break;
}
case Any: {
@@ -395,10 +451,23 @@ public class ZooKeeper implements AutoCloseable {
boolean contains_temp = contains(path, watcher,
dataWatches);
containsWatcher |= contains_temp;
}
+
synchronized (existWatches) {
boolean contains_temp = contains(path, watcher,
existWatches);
containsWatcher |= contains_temp;
}
+
+ synchronized (persistentWatches) {
+ boolean contains_temp = contains(path, watcher,
+ persistentWatches);
+ containsWatcher |= contains_temp;
+ }
+
+ synchronized (persistentRecursiveWatches) {
+ boolean contains_temp = contains(path, watcher,
+ persistentRecursiveWatches);
+ containsWatcher |= contains_temp;
+ }
}
}
// Watcher function doesn't exists for the specified params
@@ -490,6 +559,18 @@ public class ZooKeeper implements AutoCloseable {
}
}
+ synchronized (persistentWatches) {
+ for (Set<Watcher> ws: persistentWatches.values()) {
+ result.addAll(ws);
+ }
+ }
+
+ synchronized (persistentRecursiveWatches) {
+ for (Set<Watcher> ws: persistentRecursiveWatches.values())
{
+ result.addAll(ws);
+ }
+ }
+
return result;
case NodeDataChanged:
case NodeCreated:
@@ -499,11 +580,13 @@ public class ZooKeeper implements AutoCloseable {
synchronized (existWatches) {
addTo(existWatches.remove(clientPath), result);
}
+ addPersistentWatches(clientPath, result);
break;
case NodeChildrenChanged:
synchronized (childWatches) {
addTo(childWatches.remove(clientPath), result);
}
+ addPersistentWatches(clientPath, result);
break;
case NodeDeleted:
synchronized (dataWatches) {
@@ -520,6 +603,7 @@ public class ZooKeeper implements AutoCloseable {
synchronized (childWatches) {
addTo(childWatches.remove(clientPath), result);
}
+ addPersistentWatches(clientPath, result);
break;
default:
String errorMsg = String.format(
@@ -534,6 +618,16 @@ public class ZooKeeper implements AutoCloseable {
return result;
}
+ private void addPersistentWatches(String clientPath, Set<Watcher>
result) {
+ synchronized (persistentWatches) {
+ addTo(persistentWatches.get(clientPath), result);
+ }
+ synchronized (persistentRecursiveWatches) {
+ for (String path :
PathParentIterator.forAll(clientPath).asIterable()) {
+ addTo(persistentRecursiveWatches.get(path), result);
+ }
+ }
+ }
}
/**
@@ -627,6 +721,31 @@ public class ZooKeeper implements AutoCloseable {
}
+ class AddWatchRegistration extends WatchRegistration {
+ private final AddWatchMode mode;
+
+ public AddWatchRegistration(Watcher watcher, String clientPath,
AddWatchMode mode) {
+ super(watcher, clientPath);
+ this.mode = mode;
+ }
+
+ @Override
+ protected Map<String, Set<Watcher>> getWatches(int rc) {
+ switch (mode) {
+ case PERSISTENT:
+ return watchManager.persistentWatches;
+ case PERSISTENT_RECURSIVE:
+ return watchManager.persistentRecursiveWatches;
+ }
+ throw new IllegalArgumentException("Mode not supported: " + mode);
+ }
+
+ @Override
+ protected boolean shouldAddWatch(int rc) {
+ return rc == 0 || rc == KeeperException.Code.NONODE.intValue();
+ }
+ }
+
@InterfaceAudience.Public
public enum States {
CONNECTING,
@@ -3035,6 +3154,91 @@ public class ZooKeeper implements AutoCloseable {
removeWatches(ZooDefs.OpCode.removeWatches, path, null, watcherType,
local, cb, ctx);
}
+ /**
+ * Add a watch to the given znode using the given mode. Note: not all
+ * watch types can be set with this method. Only the modes available
+ * in {@link AddWatchMode} can be set with this method.
+ *
+ * @param basePath the path that the watcher applies to
+ * @param watcher the watcher
+ * @param mode type of watcher to add
+ * @throws InterruptedException If the server transaction is interrupted.
+ * @throws KeeperException If the server signals an error with a non-zero
+ * error code.
+ * @since 3.6.0
+ */
+ public void addWatch(String basePath, Watcher watcher, AddWatchMode mode)
+ throws KeeperException, InterruptedException {
+ PathUtils.validatePath(basePath);
+ String serverPath = prependChroot(basePath);
+
+ RequestHeader h = new RequestHeader();
+ h.setType(ZooDefs.OpCode.addWatch);
+ AddWatchRequest request = new AddWatchRequest(serverPath,
mode.getMode());
+ ReplyHeader r = cnxn.submitRequest(h, request, new ErrorResponse(),
+ new AddWatchRegistration(watcher, basePath, mode));
+ if (r.getErr() != 0) {
+ throw KeeperException.create(KeeperException.Code.get(r.getErr()),
+ basePath);
+ }
+ }
+
+ /**
+ * Add a watch to the given znode using the given mode. Note: not all
+ * watch types can be set with this method. Only the modes available
+ * in {@link AddWatchMode} can be set with this method. In this version of
the method,
+ * the default watcher is used
+ *
+ * @param basePath the path that the watcher applies to
+ * @param mode type of watcher to add
+ * @throws InterruptedException If the server transaction is interrupted.
+ * @throws KeeperException If the server signals an error with a non-zero
+ * error code.
+ * @since 3.6.0
+ */
+ public void addWatch(String basePath, AddWatchMode mode)
+ throws KeeperException, InterruptedException {
+ addWatch(basePath, watchManager.defaultWatcher, mode);
+ }
+
+ /**
+ * Async version of {@link #addWatch(String, Watcher, AddWatchMode)} (see
it for details)
+ *
+ * @param basePath the path that the watcher applies to
+ * @param watcher the watcher
+ * @param mode type of watcher to add
+ * @param cb a handler for the callback
+ * @param ctx context to be provided to the callback
+ * @throws IllegalArgumentException if an invalid path is specified
+ * @since 3.6.0
+ */
+ public void addWatch(String basePath, Watcher watcher, AddWatchMode mode,
+ VoidCallback cb, Object ctx) {
+ PathUtils.validatePath(basePath);
+ String serverPath = prependChroot(basePath);
+
+ RequestHeader h = new RequestHeader();
+ h.setType(ZooDefs.OpCode.addWatch);
+ AddWatchRequest request = new AddWatchRequest(serverPath,
mode.getMode());
+ cnxn.queuePacket(h, new ReplyHeader(), request, new ErrorResponse(),
cb,
+ basePath, serverPath, ctx, new AddWatchRegistration(watcher,
basePath, mode));
+ }
+
+ /**
+ * Async version of {@link #addWatch(String, AddWatchMode)} (see it for
details)
+ *
+ * @param basePath the path that the watcher applies to
+ * @param mode type of watcher to add
+ * @param cb a handler for the callback
+ * @param ctx context to be provided to the callback
+ * @throws IllegalArgumentException if an invalid path is specified
+ * @since 3.6.0
+ */
+ public void addWatch(String basePath, AddWatchMode mode,
+ VoidCallback cb, Object ctx) {
+ addWatch(basePath, watchManager.defaultWatcher, mode, cb, ctx);
+ }
+
private void validateWatcher(Watcher watcher) {
if (watcher == null) {
throw new IllegalArgumentException("Invalid Watcher, shouldn't be
null!");
diff --git
a/zookeeper-server/src/main/java/org/apache/zookeeper/ZooKeeperMain.java
b/zookeeper-server/src/main/java/org/apache/zookeeper/ZooKeeperMain.java
index c9f49a1..857b16f 100644
--- a/zookeeper-server/src/main/java/org/apache/zookeeper/ZooKeeperMain.java
+++ b/zookeeper-server/src/main/java/org/apache/zookeeper/ZooKeeperMain.java
@@ -38,6 +38,7 @@ import java.util.regex.Pattern;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.zookeeper.admin.ZooKeeperAdmin;
import org.apache.zookeeper.cli.AddAuthCommand;
+import org.apache.zookeeper.cli.AddWatchCommand;
import org.apache.zookeeper.cli.CliCommand;
import org.apache.zookeeper.cli.CliException;
import org.apache.zookeeper.cli.CloseCommand;
@@ -123,6 +124,7 @@ public class ZooKeeperMain {
new GetEphemeralsCommand().addToMap(commandMapCli);
new GetAllChildrenNumberCommand().addToMap(commandMapCli);
new VersionCommand().addToMap(commandMapCli);
+ new AddWatchCommand().addToMap(commandMapCli);
// add all to commandMap
for (Entry<String, CliCommand> entry : commandMapCli.entrySet()) {
diff --git
a/zookeeper-server/src/main/java/org/apache/zookeeper/cli/AddWatchCommand.java
b/zookeeper-server/src/main/java/org/apache/zookeeper/cli/AddWatchCommand.java
new file mode 100644
index 0000000..1e34b10
--- /dev/null
+++
b/zookeeper-server/src/main/java/org/apache/zookeeper/cli/AddWatchCommand.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper.cli;
+
+import java.util.Arrays;
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.Options;
+import org.apache.commons.cli.ParseException;
+import org.apache.commons.cli.Parser;
+import org.apache.commons.cli.PosixParser;
+import org.apache.zookeeper.AddWatchMode;
+import org.apache.zookeeper.KeeperException;
+
+/**
+ * addWatch command for cli.
+ * Matches the ZooKeeper API addWatch()
+ */
+public class AddWatchCommand extends CliCommand {
+
+ private static final Options options = new Options();
+ private static final AddWatchMode defaultMode =
AddWatchMode.PERSISTENT_RECURSIVE;
+
+ private CommandLine cl;
+ private AddWatchMode mode = defaultMode;
+
+ static {
+ options.addOption("m", true, "");
+ }
+
+ public AddWatchCommand() {
+ super("addWatch", "[-m mode] path # optional mode is one of "
+ + Arrays.toString(AddWatchMode.values()) + " - default is " +
defaultMode.name());
+ }
+
+ @Override
+ public CliCommand parse(String[] cmdArgs) throws CliParseException {
+ Parser parser = new PosixParser();
+ try {
+ cl = parser.parse(options, cmdArgs);
+ } catch (ParseException ex) {
+ throw new CliParseException(ex);
+ }
+ if (cl.getArgs().length != 2) {
+ throw new CliParseException(getUsageStr());
+ }
+
+ if (cl.hasOption("m")) {
+ try {
+ mode =
AddWatchMode.valueOf(cl.getOptionValue("m").toUpperCase());
+ } catch (IllegalArgumentException e) {
+ throw new CliParseException(getUsageStr());
+ }
+ }
+
+ return this;
+ }
+
+ @Override
+ public boolean exec() throws CliException {
+ String path = cl.getArgs()[1];
+ try {
+ zk.addWatch(path, mode);
+ } catch (KeeperException | InterruptedException ex) {
+ throw new CliWrapperException(ex);
+ }
+
+ return false;
+
+ }
+
+}
diff --git
a/zookeeper-server/src/main/java/org/apache/zookeeper/server/DataTree.java
b/zookeeper-server/src/main/java/org/apache/zookeeper/server/DataTree.java
index a9b08b0..766949e 100644
--- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/DataTree.java
+++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/DataTree.java
@@ -58,6 +58,7 @@ import org.apache.zookeeper.data.Stat;
import org.apache.zookeeper.data.StatPersisted;
import org.apache.zookeeper.server.watch.IWatchManager;
import org.apache.zookeeper.server.watch.WatchManagerFactory;
+import org.apache.zookeeper.server.watch.WatcherMode;
import org.apache.zookeeper.server.watch.WatcherOrBitSet;
import org.apache.zookeeper.server.watch.WatchesPathReport;
import org.apache.zookeeper.server.watch.WatchesReport;
@@ -701,6 +702,12 @@ public class DataTree {
}
}
+ public void addWatch(String basePath, Watcher watcher, int mode) {
+ WatcherMode watcherMode = WatcherMode.fromZooDef(mode);
+ dataWatches.addWatch(basePath, watcher, watcherMode);
+ childWatches.addWatch(basePath, watcher, watcherMode);
+ }
+
public byte[] getData(String path, Stat stat, Watcher watcher) throws
KeeperException.NoNodeException {
DataNode n = nodes.get(path);
byte[] data = null;
@@ -1499,7 +1506,8 @@ public class DataTree {
childWatches.removeWatcher(watcher);
}
- public void setWatches(long relativeZxid, List<String> dataWatches,
List<String> existWatches, List<String> childWatches, Watcher watcher) {
+ public void setWatches(long relativeZxid, List<String> dataWatches,
List<String> existWatches, List<String> childWatches,
+ List<String> persistentWatches, List<String>
persistentRecursiveWatches, Watcher watcher) {
for (String path : dataWatches) {
DataNode node = getNode(path);
WatchedEvent e = null;
@@ -1529,6 +1537,14 @@ public class DataTree {
this.childWatches.addWatch(path, watcher);
}
}
+ for (String path : persistentWatches) {
+ this.childWatches.addWatch(path, watcher, WatcherMode.PERSISTENT);
+ this.dataWatches.addWatch(path, watcher, WatcherMode.PERSISTENT);
+ }
+ for (String path : persistentRecursiveWatches) {
+ this.childWatches.addWatch(path, watcher,
WatcherMode.PERSISTENT_RECURSIVE);
+ this.dataWatches.addWatch(path, watcher,
WatcherMode.PERSISTENT_RECURSIVE);
+ }
}
/**
diff --git
a/zookeeper-server/src/main/java/org/apache/zookeeper/server/FinalRequestProcessor.java
b/zookeeper-server/src/main/java/org/apache/zookeeper/server/FinalRequestProcessor.java
index 93d70d8..79a6977 100644
---
a/zookeeper-server/src/main/java/org/apache/zookeeper/server/FinalRequestProcessor.java
+++
b/zookeeper-server/src/main/java/org/apache/zookeeper/server/FinalRequestProcessor.java
@@ -21,6 +21,7 @@ package org.apache.zookeeper.server;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.List;
import java.util.Locale;
import java.util.Set;
@@ -47,9 +48,11 @@ import org.apache.zookeeper.common.Time;
import org.apache.zookeeper.data.ACL;
import org.apache.zookeeper.data.Id;
import org.apache.zookeeper.data.Stat;
+import org.apache.zookeeper.proto.AddWatchRequest;
import org.apache.zookeeper.proto.CheckWatchesRequest;
import org.apache.zookeeper.proto.Create2Response;
import org.apache.zookeeper.proto.CreateResponse;
+import org.apache.zookeeper.proto.ErrorResponse;
import org.apache.zookeeper.proto.ExistsRequest;
import org.apache.zookeeper.proto.ExistsResponse;
import org.apache.zookeeper.proto.GetACLRequest;
@@ -69,6 +72,7 @@ import org.apache.zookeeper.proto.ReplyHeader;
import org.apache.zookeeper.proto.SetACLResponse;
import org.apache.zookeeper.proto.SetDataResponse;
import org.apache.zookeeper.proto.SetWatches;
+import org.apache.zookeeper.proto.SetWatches2;
import org.apache.zookeeper.proto.SyncRequest;
import org.apache.zookeeper.proto.SyncResponse;
import org.apache.zookeeper.server.DataTree.ProcessTxnResult;
@@ -365,7 +369,7 @@ public class FinalRequestProcessor implements
RequestProcessor {
case OpCode.setWatches: {
lastOp = "SETW";
SetWatches setWatches = new SetWatches();
- // TODO We really should NOT need this!!!!
+ // TODO we really should not need this
request.request.rewind();
ByteBufferInputStream.byteBuffer2Record(request.request,
setWatches);
long relativeZxid = setWatches.getRelativeZxid();
@@ -375,9 +379,36 @@ public class FinalRequestProcessor implements
RequestProcessor {
setWatches.getDataWatches(),
setWatches.getExistWatches(),
setWatches.getChildWatches(),
+ Collections.emptyList(),
+ Collections.emptyList(),
cnxn);
break;
}
+ case OpCode.setWatches2: {
+ lastOp = "STW2";
+ SetWatches2 setWatches = new SetWatches2();
+ // TODO we really should not need this
+ request.request.rewind();
+ ByteBufferInputStream.byteBuffer2Record(request.request,
setWatches);
+ long relativeZxid = setWatches.getRelativeZxid();
+ zks.getZKDatabase().setWatches(relativeZxid,
+ setWatches.getDataWatches(),
+ setWatches.getExistWatches(),
+ setWatches.getChildWatches(),
+ setWatches.getPersistentWatches(),
+ setWatches.getPersistentRecursiveWatches(),
+ cnxn);
+ break;
+ }
+ case OpCode.addWatch: {
+ lastOp = "ADDW";
+ AddWatchRequest addWatcherRequest = new AddWatchRequest();
+ ByteBufferInputStream.byteBuffer2Record(request.request,
+ addWatcherRequest);
+ zks.getZKDatabase().addWatch(addWatcherRequest.getPath(),
cnxn, addWatcherRequest.getMode());
+ rsp = new ErrorResponse(0);
+ break;
+ }
case OpCode.getACL: {
lastOp = "GETA";
GetACLRequest getACLRequest = new GetACLRequest();
diff --git
a/zookeeper-server/src/main/java/org/apache/zookeeper/server/PrepRequestProcessor.java
b/zookeeper-server/src/main/java/org/apache/zookeeper/server/PrepRequestProcessor.java
index c393684..70d989a 100644
---
a/zookeeper-server/src/main/java/org/apache/zookeeper/server/PrepRequestProcessor.java
+++
b/zookeeper-server/src/main/java/org/apache/zookeeper/server/PrepRequestProcessor.java
@@ -797,10 +797,12 @@ public class PrepRequestProcessor extends
ZooKeeperCriticalThread implements Req
case OpCode.getChildren2:
case OpCode.ping:
case OpCode.setWatches:
+ case OpCode.setWatches2:
case OpCode.checkWatches:
case OpCode.removeWatches:
case OpCode.getEphemerals:
case OpCode.multiRead:
+ case OpCode.addWatch:
zks.sessionTracker.checkSession(request.sessionId,
request.getOwner());
break;
default:
diff --git
a/zookeeper-server/src/main/java/org/apache/zookeeper/server/Request.java
b/zookeeper-server/src/main/java/org/apache/zookeeper/server/Request.java
index bab2194..122f0ca 100644
--- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/Request.java
+++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/Request.java
@@ -243,9 +243,11 @@ public class Request {
case OpCode.setACL:
case OpCode.setData:
case OpCode.setWatches:
+ case OpCode.setWatches2:
case OpCode.sync:
case OpCode.checkWatches:
case OpCode.removeWatches:
+ case OpCode.addWatch:
return true;
default:
return false;
@@ -334,6 +336,8 @@ public class Request {
return "auth";
case OpCode.setWatches:
return "setWatches";
+ case OpCode.setWatches2:
+ return "setWatches2";
case OpCode.sasl:
return "sasl";
case OpCode.getEphemerals:
@@ -364,6 +368,7 @@ public class Request {
String path = "n/a";
if (type != OpCode.createSession
&& type != OpCode.setWatches
+ && type != OpCode.setWatches2
&& type != OpCode.closeSession
&& request != null
&& request.remaining() >= 4) {
diff --git
a/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZKDatabase.java
b/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZKDatabase.java
index a753b8a..c9c6d54 100644
--- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZKDatabase.java
+++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZKDatabase.java
@@ -516,10 +516,27 @@ public class ZKDatabase {
* @param dataWatches the data watches the client wants to reset
* @param existWatches the exists watches the client wants to reset
* @param childWatches the child watches the client wants to reset
+ * @param persistentWatches the persistent watches the client wants to
reset
+ * @param persistentRecursiveWatches the persistent recursive watches the
client wants to reset
* @param watcher the watcher function
*/
- public void setWatches(long relativeZxid, List<String> dataWatches,
List<String> existWatches, List<String> childWatches, Watcher watcher) {
- dataTree.setWatches(relativeZxid, dataWatches, existWatches,
childWatches, watcher);
+ public void setWatches(long relativeZxid, List<String> dataWatches,
List<String> existWatches, List<String> childWatches,
+ List<String> persistentWatches, List<String>
persistentRecursiveWatches, Watcher watcher) {
+ dataTree.setWatches(relativeZxid, dataWatches, existWatches,
childWatches, persistentWatches, persistentRecursiveWatches, watcher);
+ }
+
+ /**
+ * Add a watch
+ *
+ * @param basePath
+ * watch base
+ * @param watcher
+ * the watcher
+ * @param mode
+ * a mode from ZooDefs.AddWatchModes
+ */
+ public void addWatch(String basePath, Watcher watcher, int mode) {
+ dataTree.addWatch(basePath, watcher, mode);
}
/**
diff --git
a/zookeeper-server/src/main/java/org/apache/zookeeper/server/util/RequestPathMetricsCollector.java
b/zookeeper-server/src/main/java/org/apache/zookeeper/server/util/RequestPathMetricsCollector.java
index 9ef430c..f3ec1fc 100644
---
a/zookeeper-server/src/main/java/org/apache/zookeeper/server/util/RequestPathMetricsCollector.java
+++
b/zookeeper-server/src/main/java/org/apache/zookeeper/server/util/RequestPathMetricsCollector.java
@@ -32,6 +32,7 @@ import static org.apache.zookeeper.ZooDefs.OpCode.getData;
import static org.apache.zookeeper.ZooDefs.OpCode.removeWatches;
import static org.apache.zookeeper.ZooDefs.OpCode.setACL;
import static org.apache.zookeeper.ZooDefs.OpCode.setData;
+import static org.apache.zookeeper.ZooDefs.OpCode.setWatches2;
import static org.apache.zookeeper.ZooDefs.OpCode.sync;
import java.io.PrintWriter;
import java.util.Arrays;
@@ -131,6 +132,7 @@ public class RequestPathMetricsCollector {
requestsMap.put(Request.op2String(getChildren2), new
PathStatsQueue(getChildren2));
requestsMap.put(Request.op2String(checkWatches), new
PathStatsQueue(checkWatches));
requestsMap.put(Request.op2String(removeWatches), new
PathStatsQueue(removeWatches));
+ requestsMap.put(Request.op2String(setWatches2), new
PathStatsQueue(setWatches2));
requestsMap.put(Request.op2String(sync), new PathStatsQueue(sync));
this.immutableRequestsMap =
java.util.Collections.unmodifiableMap(requestsMap);
}
diff --git
a/zookeeper-server/src/main/java/org/apache/zookeeper/server/watch/IWatchManager.java
b/zookeeper-server/src/main/java/org/apache/zookeeper/server/watch/IWatchManager.java
index 286c7db..1bc44c8 100644
---
a/zookeeper-server/src/main/java/org/apache/zookeeper/server/watch/IWatchManager.java
+++
b/zookeeper-server/src/main/java/org/apache/zookeeper/server/watch/IWatchManager.java
@@ -35,6 +35,22 @@ public interface IWatchManager {
boolean addWatch(String path, Watcher watcher);
/**
+ * Add watch to specific path.
+ *
+ * @param path znode path
+ * @param watcher watcher object reference
+ * @param watcherMode the watcher mode to use
+ *
+ * @return true if the watcher added is not already present
+ */
+ default boolean addWatch(String path, Watcher watcher, WatcherMode
watcherMode) {
+ if (watcherMode == WatcherMode.DEFAULT_WATCHER_MODE) {
+ return addWatch(path, watcher);
+ }
+ throw new UnsupportedOperationException(); // custom implementations
must defeat this
+ }
+
+ /**
* Checks the specified watcher exists for the given path.
*
* @param path znode path
@@ -129,4 +145,12 @@ public interface IWatchManager {
*/
void dumpWatches(PrintWriter pwriter, boolean byPath);
+ /**
+ * Return the current number of recursive watchers
+ *
+ * @return qty
+ */
+ default int getRecursiveWatchQty() {
+ return 0;
+ }
}
diff --git
a/zookeeper-server/src/main/java/org/apache/zookeeper/server/watch/PathParentIterator.java
b/zookeeper-server/src/main/java/org/apache/zookeeper/server/watch/PathParentIterator.java
new file mode 100644
index 0000000..a6aa8cd
--- /dev/null
+++
b/zookeeper-server/src/main/java/org/apache/zookeeper/server/watch/PathParentIterator.java
@@ -0,0 +1,106 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper.server.watch;
+
+import java.util.Iterator;
+import java.util.NoSuchElementException;
+
+/**
+ * Iterates over a ZooKeeper path. Each iteration goes up one parent path.
Thus, the
+ * effect of the iterator is to iterate over the initial path and then all of
its parents.
+ */
+public class PathParentIterator implements Iterator<String> {
+ private String path;
+ private final int maxLevel;
+ private int level = -1;
+
+ /**
+ * Return a new PathParentIterator that iterates from the
+ * given path to all parents.
+ *
+ * @param path initial path
+ */
+ public static PathParentIterator forAll(String path) {
+ return new PathParentIterator(path, Integer.MAX_VALUE);
+ }
+
+ /**
+ * Return a new PathParentIterator that only returns the given path - i.e.
+ * does not iterate to parent paths.
+ *
+ * @param path initial path
+ */
+ public static PathParentIterator forPathOnly(String path) {
+ return new PathParentIterator(path, 0);
+ }
+
+ private PathParentIterator(String path, int maxLevel) {
+ // NOTE: asserts that the path has already been validated
+ this.path = path;
+ this.maxLevel = maxLevel;
+ }
+
+ /**
+ * Return an Iterable view so that this Iterator can be used in for each
+ * statements. IMPORTANT: the returned Iterable is single use only
+ * @return Iterable
+ */
+ public Iterable<String> asIterable() {
+ return () -> PathParentIterator.this;
+ }
+
+ @Override
+ public boolean hasNext() {
+ return !path.isEmpty() && (level < maxLevel);
+ }
+
+ /**
+ * Returns true if this iterator is currently at a parent path as opposed
+ * to the initial path given to the constructor
+ *
+ * @return true/false
+ */
+ public boolean atParentPath() {
+ return level > 0;
+ }
+
+ @Override
+ public String next() {
+ if (!hasNext()) {
+ throw new NoSuchElementException();
+ }
+
+ String localPath = path;
+ ++level;
+ if (path.equals("/")) {
+ path = "";
+ } else {
+ path = path.substring(0, path.lastIndexOf('/'));
+ if (path.length() == 0) {
+ path = "/";
+ }
+ }
+ return localPath;
+ }
+
+ @Override
+ public void remove() {
+ throw new UnsupportedOperationException();
+ }
+}
\ No newline at end of file
diff --git
a/zookeeper-server/src/main/java/org/apache/zookeeper/server/watch/WatchManager.java
b/zookeeper-server/src/main/java/org/apache/zookeeper/server/watch/WatchManager.java
index 39327fc..c5b1330 100644
---
a/zookeeper-server/src/main/java/org/apache/zookeeper/server/watch/WatchManager.java
+++
b/zookeeper-server/src/main/java/org/apache/zookeeper/server/watch/WatchManager.java
@@ -21,6 +21,7 @@ package org.apache.zookeeper.server.watch;
import java.io.PrintWriter;
import java.util.HashMap;
import java.util.HashSet;
+import java.util.Iterator;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
@@ -42,9 +43,11 @@ public class WatchManager implements IWatchManager {
private static final Logger LOG =
LoggerFactory.getLogger(WatchManager.class);
- private final Map<String, Set<Watcher>> watchTable = new HashMap<String,
Set<Watcher>>();
+ private final Map<String, Set<Watcher>> watchTable = new HashMap<>();
- private final Map<Watcher, Set<String>> watch2Paths = new HashMap<Watcher,
Set<String>>();
+ private final Map<Watcher, Set<String>> watch2Paths = new HashMap<>();
+
+ private final WatcherModeManager watcherModeManager = new
WatcherModeManager();
@Override
public synchronized int size() {
@@ -55,12 +58,17 @@ public class WatchManager implements IWatchManager {
return result;
}
- boolean isDeadWatcher(Watcher watcher) {
+ private boolean isDeadWatcher(Watcher watcher) {
return watcher instanceof ServerCnxn && ((ServerCnxn)
watcher).isStale();
}
@Override
- public synchronized boolean addWatch(String path, Watcher watcher) {
+ public boolean addWatch(String path, Watcher watcher) {
+ return addWatch(path, watcher, WatcherMode.DEFAULT_WATCHER_MODE);
+ }
+
+ @Override
+ public synchronized boolean addWatch(String path, Watcher watcher,
WatcherMode watcherMode) {
if (isDeadWatcher(watcher)) {
LOG.debug("Ignoring addWatch with closed cnxn");
return false;
@@ -71,7 +79,7 @@ public class WatchManager implements IWatchManager {
// don't waste memory if there are few watches on a node
// rehash when the 4th entry is added, doubling size thereafter
// seems like a good compromise
- list = new HashSet<Watcher>(4);
+ list = new HashSet<>(4);
watchTable.put(path, list);
}
list.add(watcher);
@@ -79,9 +87,12 @@ public class WatchManager implements IWatchManager {
Set<String> paths = watch2Paths.get(watcher);
if (paths == null) {
// cnxns typically have many watches, so use default cap here
- paths = new HashSet<String>();
+ paths = new HashSet<>();
watch2Paths.put(watcher, paths);
}
+
+ watcherModeManager.setWatcherMode(watcher, path, watcherMode);
+
return paths.add(path);
}
@@ -99,6 +110,7 @@ public class WatchManager implements IWatchManager {
watchTable.remove(p);
}
}
+ watcherModeManager.removeWatcher(watcher, p);
}
}
@@ -110,22 +122,45 @@ public class WatchManager implements IWatchManager {
@Override
public WatcherOrBitSet triggerWatch(String path, EventType type,
WatcherOrBitSet supress) {
WatchedEvent e = new WatchedEvent(type, KeeperState.SyncConnected,
path);
- Set<Watcher> watchers;
+ Set<Watcher> watchers = new HashSet<>();
+ PathParentIterator pathParentIterator = getPathParentIterator(path);
synchronized (this) {
- watchers = watchTable.remove(path);
- if (watchers == null || watchers.isEmpty()) {
- if (LOG.isTraceEnabled()) {
- ZooTrace.logTraceMessage(LOG,
ZooTrace.EVENT_DELIVERY_TRACE_MASK, "No watchers for " + path);
+ for (String localPath : pathParentIterator.asIterable()) {
+ Set<Watcher> thisWatchers = watchTable.get(localPath);
+ if (thisWatchers == null || thisWatchers.isEmpty()) {
+ continue;
}
- return null;
- }
- for (Watcher w : watchers) {
- Set<String> paths = watch2Paths.get(w);
- if (paths != null) {
- paths.remove(path);
+ Iterator<Watcher> iterator = thisWatchers.iterator();
+ while (iterator.hasNext()) {
+ Watcher watcher = iterator.next();
+ WatcherMode watcherMode =
watcherModeManager.getWatcherMode(watcher, localPath);
+ if (watcherMode.isRecursive()) {
+ if (type != EventType.NodeChildrenChanged) {
+ watchers.add(watcher);
+ }
+ } else if (!pathParentIterator.atParentPath()) {
+ watchers.add(watcher);
+ if (!watcherMode.isPersistent()) {
+ iterator.remove();
+ Set<String> paths = watch2Paths.get(watcher);
+ if (paths != null) {
+ paths.remove(localPath);
+ }
+ }
+ }
+ }
+ if (thisWatchers.isEmpty()) {
+ watchTable.remove(localPath);
}
}
}
+ if (watchers.isEmpty()) {
+ if (LOG.isTraceEnabled()) {
+ ZooTrace.logTraceMessage(LOG,
ZooTrace.EVENT_DELIVERY_TRACE_MASK, "No watchers for " + path);
+ }
+ return null;
+ }
+
for (Watcher w : watchers) {
if (supress != null && supress.contains(w)) {
continue;
@@ -134,24 +169,24 @@ public class WatchManager implements IWatchManager {
}
switch (type) {
- case NodeCreated:
-
ServerMetrics.getMetrics().NODE_CREATED_WATCHER.add(watchers.size());
- break;
-
- case NodeDeleted:
-
ServerMetrics.getMetrics().NODE_DELETED_WATCHER.add(watchers.size());
- break;
-
- case NodeDataChanged:
-
ServerMetrics.getMetrics().NODE_CHANGED_WATCHER.add(watchers.size());
- break;
-
- case NodeChildrenChanged:
-
ServerMetrics.getMetrics().NODE_CHILDREN_WATCHER.add(watchers.size());
- break;
- default:
- // Other types not logged.
- break;
+ case NodeCreated:
+
ServerMetrics.getMetrics().NODE_CREATED_WATCHER.add(watchers.size());
+ break;
+
+ case NodeDeleted:
+
ServerMetrics.getMetrics().NODE_DELETED_WATCHER.add(watchers.size());
+ break;
+
+ case NodeDataChanged:
+
ServerMetrics.getMetrics().NODE_CHANGED_WATCHER.add(watchers.size());
+ break;
+
+ case NodeChildrenChanged:
+
ServerMetrics.getMetrics().NODE_CHILDREN_WATCHER.add(watchers.size());
+ break;
+ default:
+ // Other types not logged.
+ break;
}
return new WatcherOrBitSet(watchers);
@@ -197,8 +232,20 @@ public class WatchManager implements IWatchManager {
@Override
public synchronized boolean containsWatcher(String path, Watcher watcher) {
- Set<String> paths = watch2Paths.get(watcher);
- return paths != null && paths.contains(path);
+ WatcherMode watcherMode = watcherModeManager.getWatcherMode(watcher,
path);
+ PathParentIterator pathParentIterator = getPathParentIterator(path);
+ for (String localPath : pathParentIterator.asIterable()) {
+ Set<Watcher> watchers = watchTable.get(localPath);
+ if (!pathParentIterator.atParentPath()) {
+ if (watchers != null) {
+ return true; // at the leaf node, all watcher types
match
+ }
+ }
+ if (watcherMode.isRecursive()) {
+ return true;
+ }
+ }
+ return false;
}
@Override
@@ -217,15 +264,17 @@ public class WatchManager implements IWatchManager {
watchTable.remove(path);
}
+ watcherModeManager.removeWatcher(watcher, path);
+
return true;
}
@Override
public synchronized WatchesReport getWatches() {
- Map<Long, Set<String>> id2paths = new HashMap<Long, Set<String>>();
+ Map<Long, Set<String>> id2paths = new HashMap<>();
for (Entry<Watcher, Set<String>> e : watch2Paths.entrySet()) {
Long id = ((ServerCnxn) e.getKey()).getSessionId();
- Set<String> paths = new HashSet<String>(e.getValue());
+ Set<String> paths = new HashSet<>(e.getValue());
id2paths.put(id, paths);
}
return new WatchesReport(id2paths);
@@ -233,9 +282,9 @@ public class WatchManager implements IWatchManager {
@Override
public synchronized WatchesPathReport getWatchesByPath() {
- Map<String, Set<Long>> path2ids = new HashMap<String, Set<Long>>();
+ Map<String, Set<Long>> path2ids = new HashMap<>();
for (Entry<String, Set<Watcher>> e : watchTable.entrySet()) {
- Set<Long> ids = new HashSet<Long>(e.getValue().size());
+ Set<Long> ids = new HashSet<>(e.getValue().size());
path2ids.put(e.getKey(), ids);
for (Watcher watcher : e.getValue()) {
ids.add(((ServerCnxn) watcher).getSessionId());
@@ -256,4 +305,15 @@ public class WatchManager implements IWatchManager {
@Override
public void shutdown() { /* do nothing */ }
+ @Override
+ public int getRecursiveWatchQty() {
+ return watcherModeManager.getRecursiveQty();
+ }
+
+ private PathParentIterator getPathParentIterator(String path) {
+ if (watcherModeManager.getRecursiveQty() == 0) {
+ return PathParentIterator.forPathOnly(path);
+ }
+ return PathParentIterator.forAll(path);
+ }
}
diff --git
a/zookeeper-server/src/main/java/org/apache/zookeeper/server/watch/WatcherMode.java
b/zookeeper-server/src/main/java/org/apache/zookeeper/server/watch/WatcherMode.java
new file mode 100644
index 0000000..b8a1dda
--- /dev/null
+++
b/zookeeper-server/src/main/java/org/apache/zookeeper/server/watch/WatcherMode.java
@@ -0,0 +1,56 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper.server.watch;
+
+import org.apache.zookeeper.ZooDefs;
+
+public enum WatcherMode {
+ STANDARD(false, false),
+ PERSISTENT(true, false),
+ PERSISTENT_RECURSIVE(true, true)
+ ;
+
+ public static final WatcherMode DEFAULT_WATCHER_MODE =
WatcherMode.STANDARD;
+
+ public static WatcherMode fromZooDef(int mode) {
+ switch (mode) {
+ case ZooDefs.AddWatchModes.persistent:
+ return PERSISTENT;
+ case ZooDefs.AddWatchModes.persistentRecursive:
+ return PERSISTENT_RECURSIVE;
+ }
+ throw new IllegalArgumentException("Unsupported mode: " + mode);
+ }
+
+ private final boolean isPersistent;
+ private final boolean isRecursive;
+
+ WatcherMode(boolean isPersistent, boolean isRecursive) {
+ this.isPersistent = isPersistent;
+ this.isRecursive = isRecursive;
+ }
+
+ public boolean isPersistent() {
+ return isPersistent;
+ }
+
+ public boolean isRecursive() {
+ return isRecursive;
+ }
+}
diff --git
a/zookeeper-server/src/main/java/org/apache/zookeeper/server/watch/WatcherModeManager.java
b/zookeeper-server/src/main/java/org/apache/zookeeper/server/watch/WatcherModeManager.java
new file mode 100644
index 0000000..c1a8225
--- /dev/null
+++
b/zookeeper-server/src/main/java/org/apache/zookeeper/server/watch/WatcherModeManager.java
@@ -0,0 +1,96 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper.server.watch;
+
+import java.util.Map;
+import java.util.Objects;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.zookeeper.Watcher;
+
+class WatcherModeManager {
+ private final Map<Key, WatcherMode> watcherModes = new
ConcurrentHashMap<>();
+ private final AtomicInteger recursiveQty = new AtomicInteger(0);
+
+ private static class Key {
+ private final Watcher watcher;
+ private final String path;
+
+ Key(Watcher watcher, String path) {
+ this.watcher = watcher;
+ this.path = path;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ Key key = (Key) o;
+ return watcher.equals(key.watcher) && path.equals(key.path);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(watcher, path);
+ }
+ }
+
+ // VisibleForTesting
+ Map<Key, WatcherMode> getWatcherModes() {
+ return watcherModes;
+ }
+
+ void setWatcherMode(Watcher watcher, String path, WatcherMode mode) {
+ if (mode == WatcherMode.DEFAULT_WATCHER_MODE) {
+ removeWatcher(watcher, path);
+ } else {
+ adjustRecursiveQty(watcherModes.put(new Key(watcher, path), mode),
mode);
+ }
+ }
+
+ WatcherMode getWatcherMode(Watcher watcher, String path) {
+ return watcherModes.getOrDefault(new Key(watcher, path),
WatcherMode.DEFAULT_WATCHER_MODE);
+ }
+
+ void removeWatcher(Watcher watcher, String path) {
+ adjustRecursiveQty(watcherModes.remove(new Key(watcher, path)),
WatcherMode.DEFAULT_WATCHER_MODE);
+ }
+
+ int getRecursiveQty() {
+ return recursiveQty.get();
+ }
+
+ // recursiveQty is an optimization to avoid having to walk the map every
time this value is needed
+ private void adjustRecursiveQty(WatcherMode oldMode, WatcherMode newMode) {
+ if (oldMode == null) {
+ oldMode = WatcherMode.DEFAULT_WATCHER_MODE;
+ }
+ if (oldMode.isRecursive() != newMode.isRecursive()) {
+ if (newMode.isRecursive()) {
+ recursiveQty.incrementAndGet();
+ } else {
+ recursiveQty.decrementAndGet();
+ }
+ }
+ }
+}
diff --git
a/zookeeper-server/src/test/java/org/apache/zookeeper/server/watch/PathParentIteratorTest.java
b/zookeeper-server/src/test/java/org/apache/zookeeper/server/watch/PathParentIteratorTest.java
new file mode 100644
index 0000000..59bb17a
--- /dev/null
+++
b/zookeeper-server/src/test/java/org/apache/zookeeper/server/watch/PathParentIteratorTest.java
@@ -0,0 +1,84 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper.server.watch;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+public class PathParentIteratorTest {
+ @Test
+ public void testRoot() {
+ PathParentIterator pathParentIterator = PathParentIterator.forAll("/");
+ Assert.assertTrue(pathParentIterator.hasNext());
+ Assert.assertFalse(pathParentIterator.atParentPath());
+ Assert.assertEquals(pathParentIterator.next(), "/");
+ Assert.assertFalse(pathParentIterator.hasNext());
+ }
+
+ @Test
+ public void test1Level() {
+ PathParentIterator pathParentIterator =
PathParentIterator.forAll("/a");
+ Assert.assertTrue(pathParentIterator.hasNext());
+ Assert.assertFalse(pathParentIterator.atParentPath());
+ Assert.assertEquals(pathParentIterator.next(), "/a");
+
+ Assert.assertTrue(pathParentIterator.hasNext());
+ Assert.assertEquals(pathParentIterator.next(), "/");
+ Assert.assertTrue(pathParentIterator.atParentPath());
+
+ Assert.assertFalse(pathParentIterator.hasNext());
+ }
+
+ @Test
+ public void testLong() {
+ PathParentIterator pathParentIterator =
PathParentIterator.forAll("/a/b/c/d");
+
+ Assert.assertTrue(pathParentIterator.hasNext());
+ Assert.assertEquals(pathParentIterator.next(), "/a/b/c/d");
+ Assert.assertFalse(pathParentIterator.atParentPath());
+
+ Assert.assertTrue(pathParentIterator.hasNext());
+ Assert.assertEquals(pathParentIterator.next(), "/a/b/c");
+ Assert.assertTrue(pathParentIterator.atParentPath());
+
+ Assert.assertTrue(pathParentIterator.hasNext());
+ Assert.assertEquals(pathParentIterator.next(), "/a/b");
+ Assert.assertTrue(pathParentIterator.atParentPath());
+
+ Assert.assertTrue(pathParentIterator.hasNext());
+ Assert.assertEquals(pathParentIterator.next(), "/a");
+ Assert.assertTrue(pathParentIterator.atParentPath());
+
+ Assert.assertTrue(pathParentIterator.hasNext());
+ Assert.assertEquals(pathParentIterator.next(), "/");
+ Assert.assertTrue(pathParentIterator.atParentPath());
+
+ Assert.assertFalse(pathParentIterator.hasNext());
+ }
+
+ @Test
+ public void testForPathOnly() {
+ PathParentIterator pathParentIterator =
PathParentIterator.forPathOnly("/a/b/c/d");
+ Assert.assertTrue(pathParentIterator.hasNext());
+ Assert.assertEquals(pathParentIterator.next(), "/a/b/c/d");
+ Assert.assertFalse(pathParentIterator.atParentPath());
+
+ Assert.assertFalse(pathParentIterator.hasNext());
+ }
+}
\ No newline at end of file
diff --git
a/zookeeper-server/src/test/java/org/apache/zookeeper/server/watch/RecursiveWatchQtyTest.java
b/zookeeper-server/src/test/java/org/apache/zookeeper/server/watch/RecursiveWatchQtyTest.java
new file mode 100644
index 0000000..067cb2a
--- /dev/null
+++
b/zookeeper-server/src/test/java/org/apache/zookeeper/server/watch/RecursiveWatchQtyTest.java
@@ -0,0 +1,197 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper.server.watch;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+import org.junit.Before;
+import org.junit.Test;
+
+public class RecursiveWatchQtyTest {
+ private WatchManager watchManager;
+
+ private static final int clientQty = 25;
+ private static final int iterations = 1000;
+
+ private static class DummyWatcher implements Watcher {
+ @Override
+ public void process(WatchedEvent event) {
+ // NOP
+ }
+ }
+
+ @Before
+ public void setup() {
+ watchManager = new WatchManager();
+ }
+
+ @Test
+ public void testRecursiveQty() {
+ WatcherModeManager manager = new WatcherModeManager();
+ DummyWatcher watcher = new DummyWatcher();
+ manager.setWatcherMode(watcher, "/a",
WatcherMode.DEFAULT_WATCHER_MODE);
+ assertEquals(0, manager.getRecursiveQty());
+ manager.setWatcherMode(watcher, "/a",
WatcherMode.PERSISTENT_RECURSIVE);
+ assertEquals(1, manager.getRecursiveQty());
+ manager.setWatcherMode(watcher, "/a/b",
WatcherMode.PERSISTENT_RECURSIVE);
+ assertEquals(2, manager.getRecursiveQty());
+ manager.setWatcherMode(watcher, "/a",
WatcherMode.PERSISTENT_RECURSIVE);
+ assertEquals(2, manager.getRecursiveQty());
+ manager.setWatcherMode(watcher, "/a/b", WatcherMode.PERSISTENT);
+ assertEquals(1, manager.getRecursiveQty());
+ manager.setWatcherMode(watcher, "/a/b",
WatcherMode.PERSISTENT_RECURSIVE);
+ assertEquals(2, manager.getRecursiveQty());
+ manager.setWatcherMode(watcher, "/a/b",
WatcherMode.DEFAULT_WATCHER_MODE);
+ assertEquals(1, manager.getRecursiveQty());
+ manager.setWatcherMode(watcher, "/a", WatcherMode.PERSISTENT);
+ assertEquals(0, manager.getRecursiveQty());
+ }
+
+ @Test
+ public void testAddRemove() {
+ Watcher watcher1 = new DummyWatcher();
+ Watcher watcher2 = new DummyWatcher();
+
+ watchManager.addWatch("/a", watcher1,
WatcherMode.PERSISTENT_RECURSIVE);
+ watchManager.addWatch("/b", watcher2,
WatcherMode.PERSISTENT_RECURSIVE);
+ assertEquals(2, watchManager.getRecursiveWatchQty());
+ assertTrue(watchManager.removeWatcher("/a", watcher1));
+ assertTrue(watchManager.removeWatcher("/b", watcher2));
+ assertEquals(0, watchManager.getRecursiveWatchQty());
+ }
+
+ @Test
+ public void testAddRemoveAlt() {
+ Watcher watcher1 = new DummyWatcher();
+ Watcher watcher2 = new DummyWatcher();
+
+ watchManager.addWatch("/a", watcher1,
WatcherMode.PERSISTENT_RECURSIVE);
+ watchManager.addWatch("/b", watcher2,
WatcherMode.PERSISTENT_RECURSIVE);
+ assertEquals(2, watchManager.getRecursiveWatchQty());
+ watchManager.removeWatcher(watcher1);
+ watchManager.removeWatcher(watcher2);
+ assertEquals(0, watchManager.getRecursiveWatchQty());
+ }
+
+ @Test
+ public void testDoubleAdd() {
+ Watcher watcher = new DummyWatcher();
+
+ watchManager.addWatch("/a", watcher, WatcherMode.PERSISTENT_RECURSIVE);
+ watchManager.addWatch("/a", watcher, WatcherMode.PERSISTENT_RECURSIVE);
+ assertEquals(1, watchManager.getRecursiveWatchQty());
+ watchManager.removeWatcher(watcher);
+ assertEquals(0, watchManager.getRecursiveWatchQty());
+ }
+
+ @Test
+ public void testSameWatcherMultiPath() {
+ Watcher watcher = new DummyWatcher();
+
+ watchManager.addWatch("/a", watcher, WatcherMode.PERSISTENT_RECURSIVE);
+ watchManager.addWatch("/a/b", watcher,
WatcherMode.PERSISTENT_RECURSIVE);
+ watchManager.addWatch("/a/b/c", watcher,
WatcherMode.PERSISTENT_RECURSIVE);
+ assertEquals(3, watchManager.getRecursiveWatchQty());
+ assertTrue(watchManager.removeWatcher("/a/b", watcher));
+ assertEquals(2, watchManager.getRecursiveWatchQty());
+ watchManager.removeWatcher(watcher);
+ assertEquals(0, watchManager.getRecursiveWatchQty());
+ }
+
+ @Test
+ public void testChangeType() {
+ Watcher watcher = new DummyWatcher();
+
+ watchManager.addWatch("/a", watcher, WatcherMode.PERSISTENT);
+ assertEquals(0, watchManager.getRecursiveWatchQty());
+ watchManager.addWatch("/a", watcher, WatcherMode.PERSISTENT_RECURSIVE);
+ assertEquals(1, watchManager.getRecursiveWatchQty());
+ watchManager.addWatch("/a", watcher, WatcherMode.STANDARD);
+ assertEquals(0, watchManager.getRecursiveWatchQty());
+ assertTrue(watchManager.removeWatcher("/a", watcher));
+ assertEquals(0, watchManager.getRecursiveWatchQty());
+ }
+
+ @Test
+ public void testRecursiveQtyConcurrency() {
+ ThreadLocalRandom random = ThreadLocalRandom.current();
+ WatcherModeManager manager = new WatcherModeManager();
+ ExecutorService threadPool = Executors.newFixedThreadPool(clientQty);
+ List<Future<?>> tasks = null;
+ CountDownLatch completedLatch = new CountDownLatch(clientQty);
+ try {
+ tasks = IntStream.range(0, clientQty)
+ .mapToObj(__ -> threadPool.submit(() -> iterate(manager,
completedLatch)))
+ .collect(Collectors.toList());
+ try {
+ completedLatch.await();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ }
+ } finally {
+ if (tasks != null) {
+ tasks.forEach(t -> t.cancel(true));
+ }
+ threadPool.shutdownNow();
+ }
+
+ int expectedRecursiveQty = (int) manager.getWatcherModes().values()
+ .stream()
+ .filter(mode -> mode == WatcherMode.PERSISTENT_RECURSIVE)
+ .count();
+ assertEquals(expectedRecursiveQty, manager.getRecursiveQty());
+ }
+
+ private void iterate(WatcherModeManager manager, CountDownLatch
completedLatch) {
+ ThreadLocalRandom random = ThreadLocalRandom.current();
+ try {
+ for (int i = 0; i < iterations; ++i) {
+ String path = "/" + random.nextInt(clientQty);
+ boolean doSet = random.nextInt(100) > 33; // 2/3 will be
sets
+ if (doSet) {
+ WatcherMode mode =
WatcherMode.values()[random.nextInt(WatcherMode.values().length)];
+ manager.setWatcherMode(new DummyWatcher(), path, mode);
+ } else {
+ manager.removeWatcher(new DummyWatcher(), path);
+ }
+
+ int sleepMillis = random.nextInt(2);
+ if (sleepMillis > 0) {
+ try {
+ Thread.sleep(sleepMillis);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ }
+ }
+ }
+ } finally {
+ completedLatch.countDown();
+ }
+ }
+}
\ No newline at end of file
diff --git
a/zookeeper-server/src/test/java/org/apache/zookeeper/server/watch/WatchManagerTest.java
b/zookeeper-server/src/test/java/org/apache/zookeeper/server/watch/WatchManagerTest.java
index 0ce0a59..e29dab9 100644
---
a/zookeeper-server/src/test/java/org/apache/zookeeper/server/watch/WatchManagerTest.java
+++
b/zookeeper-server/src/test/java/org/apache/zookeeper/server/watch/WatchManagerTest.java
@@ -49,7 +49,7 @@ public class WatchManagerTest extends ZKTestCase {
protected static final Logger LOG =
LoggerFactory.getLogger(WatchManagerTest.class);
- private static final String PATH_PREFIX = "path";
+ private static final String PATH_PREFIX = "/path";
private ConcurrentHashMap<Integer, DumbWatcher> watchers;
private Random r;
diff --git
a/zookeeper-server/src/test/java/org/apache/zookeeper/test/PersistentRecursiveWatcherTest.java
b/zookeeper-server/src/test/java/org/apache/zookeeper/test/PersistentRecursiveWatcherTest.java
new file mode 100644
index 0000000..67f19dc
--- /dev/null
+++
b/zookeeper-server/src/test/java/org/apache/zookeeper/test/PersistentRecursiveWatcherTest.java
@@ -0,0 +1,174 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper.test;
+
+import static org.apache.zookeeper.AddWatchMode.PERSISTENT_RECURSIVE;
+import java.io.IOException;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+import org.apache.zookeeper.AsyncCallback;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.ZooDefs;
+import org.apache.zookeeper.ZooKeeper;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class PersistentRecursiveWatcherTest extends ClientBase {
+ private static final Logger LOG =
LoggerFactory.getLogger(PersistentRecursiveWatcherTest.class);
+ private BlockingQueue<WatchedEvent> events;
+ private Watcher persistentWatcher;
+
+ @Override
+ @Before
+ public void setUp() throws Exception {
+ super.setUp();
+
+ events = new LinkedBlockingQueue<>();
+ persistentWatcher = event -> events.add(event);
+ }
+
+ @Test
+ public void testBasic()
+ throws IOException, InterruptedException, KeeperException {
+ try (ZooKeeper zk = createClient(new CountdownWatcher(), hostPort)) {
+ zk.addWatch("/a/b", persistentWatcher, PERSISTENT_RECURSIVE);
+ internalTestBasic(zk);
+ }
+ }
+
+ @Test
+ public void testBasicAsync()
+ throws IOException, InterruptedException, KeeperException {
+ try (ZooKeeper zk = createClient(new CountdownWatcher(), hostPort)) {
+ final CountDownLatch latch = new CountDownLatch(1);
+ AsyncCallback.VoidCallback cb = (rc, path, ctx) -> {
+ if (rc == 0) {
+ latch.countDown();
+ }
+ };
+ zk.addWatch("/a/b", persistentWatcher, PERSISTENT_RECURSIVE, cb,
null);
+ Assert.assertTrue(latch.await(5, TimeUnit.SECONDS));
+ internalTestBasic(zk);
+ }
+ }
+
+ private void internalTestBasic(ZooKeeper zk) throws KeeperException,
InterruptedException {
+ zk.create("/a", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE,
CreateMode.PERSISTENT);
+ zk.create("/a/b", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE,
CreateMode.PERSISTENT);
+ zk.create("/a/b/c", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE,
CreateMode.PERSISTENT);
+ zk.create("/a/b/c/d", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE,
CreateMode.PERSISTENT);
+ zk.create("/a/b/c/d/e", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE,
CreateMode.PERSISTENT);
+ zk.setData("/a/b/c/d/e", new byte[0], -1);
+ zk.delete("/a/b/c/d/e", -1);
+ zk.create("/a/b/c/d/e", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE,
CreateMode.PERSISTENT);
+
+ assertEvent(events, Watcher.Event.EventType.NodeCreated, "/a/b");
+ assertEvent(events, Watcher.Event.EventType.NodeCreated, "/a/b/c");
+ assertEvent(events, Watcher.Event.EventType.NodeCreated, "/a/b/c/d");
+ assertEvent(events, Watcher.Event.EventType.NodeCreated, "/a/b/c/d/e");
+ assertEvent(events, Watcher.Event.EventType.NodeDataChanged,
"/a/b/c/d/e");
+ assertEvent(events, Watcher.Event.EventType.NodeDeleted, "/a/b/c/d/e");
+ assertEvent(events, Watcher.Event.EventType.NodeCreated, "/a/b/c/d/e");
+ }
+
+ @Test
+ public void testRemoval()
+ throws IOException, InterruptedException, KeeperException {
+ try (ZooKeeper zk = createClient(new CountdownWatcher(), hostPort)) {
+ zk.addWatch("/a/b", persistentWatcher, PERSISTENT_RECURSIVE);
+ zk.create("/a", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE,
CreateMode.PERSISTENT);
+ zk.create("/a/b", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE,
CreateMode.PERSISTENT);
+ zk.create("/a/b/c", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE,
CreateMode.PERSISTENT);
+ assertEvent(events, Watcher.Event.EventType.NodeCreated, "/a/b");
+ assertEvent(events, Watcher.Event.EventType.NodeCreated, "/a/b/c");
+
+ zk.removeWatches("/a/b", persistentWatcher,
Watcher.WatcherType.Any, false);
+ zk.create("/a/b/c/d", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE,
CreateMode.PERSISTENT);
+ assertEvent(events,
Watcher.Event.EventType.PersistentWatchRemoved, "/a/b");
+ }
+ }
+
+ @Test
+ public void testDisconnect() throws Exception {
+ try (ZooKeeper zk = createClient(new CountdownWatcher(), hostPort)) {
+ zk.addWatch("/a/b", persistentWatcher, PERSISTENT_RECURSIVE);
+ stopServer();
+ assertEvent(events, Watcher.Event.EventType.None, null);
+ startServer();
+ assertEvent(events, Watcher.Event.EventType.None, null);
+ internalTestBasic(zk);
+ }
+ }
+
+ @Test
+ public void testMultiClient()
+ throws IOException, InterruptedException, KeeperException {
+ try (ZooKeeper zk1 = createClient(new CountdownWatcher(), hostPort);
ZooKeeper zk2 = createClient(new CountdownWatcher(), hostPort)) {
+
+ zk1.create("/a", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE,
CreateMode.PERSISTENT);
+ zk1.create("/a/b", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE,
CreateMode.PERSISTENT);
+ zk1.create("/a/b/c", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE,
CreateMode.PERSISTENT);
+
+ zk1.addWatch("/a/b", persistentWatcher, PERSISTENT_RECURSIVE);
+ zk1.setData("/a/b/c", "one".getBytes(), -1);
+ Thread.sleep(1000); // give some time for the event to arrive
+
+ zk2.setData("/a/b/c", "two".getBytes(), -1);
+ zk2.setData("/a/b/c", "three".getBytes(), -1);
+ zk2.setData("/a/b/c", "four".getBytes(), -1);
+
+ assertEvent(events, Watcher.Event.EventType.NodeDataChanged,
"/a/b/c");
+ assertEvent(events, Watcher.Event.EventType.NodeDataChanged,
"/a/b/c");
+ assertEvent(events, Watcher.Event.EventType.NodeDataChanged,
"/a/b/c");
+ assertEvent(events, Watcher.Event.EventType.NodeDataChanged,
"/a/b/c");
+ }
+ }
+
+ @Test
+ public void testRootWatcher()
+ throws IOException, InterruptedException, KeeperException {
+ try (ZooKeeper zk = createClient(new CountdownWatcher(), hostPort)) {
+ zk.addWatch("/", persistentWatcher, PERSISTENT_RECURSIVE);
+ zk.create("/a", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE,
CreateMode.PERSISTENT);
+ zk.create("/a/b", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE,
CreateMode.PERSISTENT);
+ zk.create("/b", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE,
CreateMode.PERSISTENT);
+ zk.create("/b/c", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE,
CreateMode.PERSISTENT);
+ assertEvent(events, Watcher.Event.EventType.NodeCreated, "/a");
+ assertEvent(events, Watcher.Event.EventType.NodeCreated, "/a/b");
+ assertEvent(events, Watcher.Event.EventType.NodeCreated, "/b");
+ assertEvent(events, Watcher.Event.EventType.NodeCreated, "/b/c");
+ }
+ }
+
+ private void assertEvent(BlockingQueue<WatchedEvent> events,
Watcher.Event.EventType eventType, String path)
+ throws InterruptedException {
+ WatchedEvent event = events.poll(5, TimeUnit.SECONDS);
+ Assert.assertNotNull(event);
+ Assert.assertEquals(eventType, event.getType());
+ Assert.assertEquals(path, event.getPath());
+ }
+}
\ No newline at end of file
diff --git
a/zookeeper-server/src/test/java/org/apache/zookeeper/test/PersistentWatcherTest.java
b/zookeeper-server/src/test/java/org/apache/zookeeper/test/PersistentWatcherTest.java
new file mode 100644
index 0000000..bffa8e0
--- /dev/null
+++
b/zookeeper-server/src/test/java/org/apache/zookeeper/test/PersistentWatcherTest.java
@@ -0,0 +1,211 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper.test;
+
+import static org.apache.zookeeper.AddWatchMode.PERSISTENT;
+import java.io.IOException;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+import org.apache.zookeeper.AsyncCallback;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.ZooDefs;
+import org.apache.zookeeper.ZooKeeper;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class PersistentWatcherTest extends ClientBase {
+ private static final Logger LOG =
LoggerFactory.getLogger(PersistentWatcherTest.class);
+ private BlockingQueue<WatchedEvent> events;
+ private Watcher persistentWatcher;
+
+ @Override
+ @Before
+ public void setUp() throws Exception {
+ super.setUp();
+
+ events = new LinkedBlockingQueue<>();
+ persistentWatcher = event -> events.add(event);
+ }
+
+ @Test
+ public void testBasic()
+ throws IOException, InterruptedException, KeeperException {
+ try (ZooKeeper zk = createClient(new CountdownWatcher(), hostPort)) {
+ zk.addWatch("/a/b", persistentWatcher, PERSISTENT);
+ internalTestBasic(zk);
+ }
+ }
+
+ @Test
+ public void testDefaultWatcher()
+ throws IOException, InterruptedException, KeeperException {
+ CountdownWatcher watcher = new CountdownWatcher() {
+ @Override
+ public synchronized void process(WatchedEvent event) {
+ super.process(event);
+ events.add(event);
+ }
+ };
+ try (ZooKeeper zk = createClient(watcher, hostPort)) {
+ zk.addWatch("/a/b", PERSISTENT);
+ events.clear(); // clear any events added during client connection
+ internalTestBasic(zk);
+ }
+ }
+
+ @Test
+ public void testBasicAsync()
+ throws IOException, InterruptedException, KeeperException {
+ CountdownWatcher watcher = new CountdownWatcher() {
+ @Override
+ public synchronized void process(WatchedEvent event) {
+ super.process(event);
+ events.add(event);
+ }
+ };
+ try (ZooKeeper zk = createClient(watcher, hostPort)) {
+ final CountDownLatch latch = new CountDownLatch(1);
+ AsyncCallback.VoidCallback cb = (rc, path, ctx) -> {
+ if (rc == 0) {
+ latch.countDown();
+ }
+ };
+ zk.addWatch("/a/b", persistentWatcher, PERSISTENT, cb, null);
+ Assert.assertTrue(latch.await(5, TimeUnit.SECONDS));
+ events.clear(); // clear any events added during client connection
+ internalTestBasic(zk);
+ }
+ }
+
+ @Test
+ public void testAsyncDefaultWatcher()
+ throws IOException, InterruptedException, KeeperException {
+ try (ZooKeeper zk = createClient(new CountdownWatcher(), hostPort)) {
+ final CountDownLatch latch = new CountDownLatch(1);
+ AsyncCallback.VoidCallback cb = (rc, path, ctx) -> {
+ if (rc == 0) {
+ latch.countDown();
+ }
+ };
+ zk.addWatch("/a/b", persistentWatcher, PERSISTENT, cb, null);
+ Assert.assertTrue(latch.await(5, TimeUnit.SECONDS));
+ internalTestBasic(zk);
+ }
+ }
+
+ private void internalTestBasic(ZooKeeper zk) throws KeeperException,
InterruptedException {
+ zk.create("/a", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE,
CreateMode.PERSISTENT);
+ zk.create("/a/b", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE,
CreateMode.PERSISTENT);
+ zk.create("/a/b/c", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE,
CreateMode.PERSISTENT);
+ zk.setData("/a/b", new byte[0], -1);
+ zk.delete("/a/b/c", -1);
+ zk.delete("/a/b", -1);
+ zk.create("/a/b", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE,
CreateMode.PERSISTENT);
+
+ assertEvent(events, Watcher.Event.EventType.NodeCreated, "/a/b");
+ assertEvent(events, Watcher.Event.EventType.NodeChildrenChanged,
"/a/b");
+ assertEvent(events, Watcher.Event.EventType.NodeDataChanged, "/a/b");
+ assertEvent(events, Watcher.Event.EventType.NodeChildrenChanged,
"/a/b");
+ assertEvent(events, Watcher.Event.EventType.NodeDeleted, "/a/b");
+ assertEvent(events, Watcher.Event.EventType.NodeCreated, "/a/b");
+ }
+
+ @Test
+ public void testRemoval()
+ throws IOException, InterruptedException, KeeperException {
+ try (ZooKeeper zk = createClient(new CountdownWatcher(), hostPort)) {
+ zk.addWatch("/a/b", persistentWatcher, PERSISTENT);
+ zk.create("/a", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE,
CreateMode.PERSISTENT);
+ zk.create("/a/b", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE,
CreateMode.PERSISTENT);
+ zk.create("/a/b/c", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE,
CreateMode.PERSISTENT);
+ assertEvent(events, Watcher.Event.EventType.NodeCreated, "/a/b");
+ assertEvent(events, Watcher.Event.EventType.NodeChildrenChanged,
"/a/b");
+
+ zk.removeWatches("/a/b", persistentWatcher,
Watcher.WatcherType.Any, false);
+ zk.delete("/a/b/c", -1);
+ zk.delete("/a/b", -1);
+ assertEvent(events,
Watcher.Event.EventType.PersistentWatchRemoved, "/a/b");
+ }
+ }
+
+ @Test
+ public void testDisconnect() throws Exception {
+ try (ZooKeeper zk = createClient(new CountdownWatcher(), hostPort)) {
+ zk.addWatch("/a/b", persistentWatcher, PERSISTENT);
+ stopServer();
+ assertEvent(events, Watcher.Event.EventType.None, null);
+ startServer();
+ assertEvent(events, Watcher.Event.EventType.None, null);
+ internalTestBasic(zk);
+ }
+ }
+
+ @Test
+ public void testMultiClient()
+ throws IOException, InterruptedException, KeeperException {
+ try (ZooKeeper zk1 = createClient(new CountdownWatcher(), hostPort);
+ ZooKeeper zk2 = createClient(new CountdownWatcher(), hostPort)) {
+
+ zk1.create("/a", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE,
CreateMode.PERSISTENT);
+ zk1.create("/a/b", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE,
CreateMode.PERSISTENT);
+
+ zk1.addWatch("/a/b", persistentWatcher, PERSISTENT);
+ zk1.setData("/a/b", "one".getBytes(), -1);
+ Thread.sleep(1000); // give some time for the event to arrive
+
+ zk2.setData("/a/b", "two".getBytes(), -1);
+ zk2.setData("/a/b", "three".getBytes(), -1);
+ zk2.setData("/a/b", "four".getBytes(), -1);
+
+ assertEvent(events, Watcher.Event.EventType.NodeDataChanged,
"/a/b");
+ assertEvent(events, Watcher.Event.EventType.NodeDataChanged,
"/a/b");
+ assertEvent(events, Watcher.Event.EventType.NodeDataChanged,
"/a/b");
+ assertEvent(events, Watcher.Event.EventType.NodeDataChanged,
"/a/b");
+ }
+ }
+
+ @Test
+ public void testRootWatcher()
+ throws IOException, InterruptedException, KeeperException {
+ try (ZooKeeper zk = createClient(new CountdownWatcher(), hostPort)) {
+ zk.addWatch("/", persistentWatcher, PERSISTENT);
+ zk.create("/a", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE,
CreateMode.PERSISTENT);
+ zk.setData("/a", new byte[0], -1);
+ zk.create("/b", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE,
CreateMode.PERSISTENT);
+ assertEvent(events, Watcher.Event.EventType.NodeChildrenChanged,
"/");
+ assertEvent(events, Watcher.Event.EventType.NodeChildrenChanged,
"/");
+ }
+ }
+
+ private void assertEvent(BlockingQueue<WatchedEvent> events,
Watcher.Event.EventType eventType, String path)
+ throws InterruptedException {
+ WatchedEvent event = events.poll(5, TimeUnit.SECONDS);
+ Assert.assertNotNull(event);
+ Assert.assertEquals(eventType, event.getType());
+ Assert.assertEquals(path, event.getPath());
+ }
+}
\ No newline at end of file
diff --git
a/zookeeper-server/src/test/java/org/apache/zookeeper/test/UnsupportedAddWatcherTest.java
b/zookeeper-server/src/test/java/org/apache/zookeeper/test/UnsupportedAddWatcherTest.java
new file mode 100644
index 0000000..95b5569
--- /dev/null
+++
b/zookeeper-server/src/test/java/org/apache/zookeeper/test/UnsupportedAddWatcherTest.java
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper.test;
+
+import java.io.IOException;
+import java.io.PrintWriter;
+import java.util.Collections;
+import org.apache.zookeeper.AddWatchMode;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.server.watch.IWatchManager;
+import org.apache.zookeeper.server.watch.WatchManagerFactory;
+import org.apache.zookeeper.server.watch.WatcherOrBitSet;
+import org.apache.zookeeper.server.watch.WatchesPathReport;
+import org.apache.zookeeper.server.watch.WatchesReport;
+import org.apache.zookeeper.server.watch.WatchesSummary;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+public class UnsupportedAddWatcherTest extends ClientBase {
+
+ public static class StubbedWatchManager implements IWatchManager {
+ @Override
+ public boolean addWatch(String path, Watcher watcher) {
+ return false;
+ }
+
+ @Override
+ public boolean containsWatcher(String path, Watcher watcher) {
+ return false;
+ }
+
+ @Override
+ public boolean removeWatcher(String path, Watcher watcher) {
+ return false;
+ }
+
+ @Override
+ public void removeWatcher(Watcher watcher) {
+ // NOP
+ }
+
+ @Override
+ public WatcherOrBitSet triggerWatch(String path,
Watcher.Event.EventType type) {
+ return new WatcherOrBitSet(Collections.emptySet());
+ }
+
+ @Override
+ public WatcherOrBitSet triggerWatch(String path,
Watcher.Event.EventType type, WatcherOrBitSet suppress) {
+ return new WatcherOrBitSet(Collections.emptySet());
+ }
+
+ @Override
+ public int size() {
+ return 0;
+ }
+
+ @Override
+ public void shutdown() {
+ // NOP
+ }
+
+ @Override
+ public WatchesSummary getWatchesSummary() {
+ return null;
+ }
+
+ @Override
+ public WatchesReport getWatches() {
+ return null;
+ }
+
+ @Override
+ public WatchesPathReport getWatchesByPath() {
+ return null;
+ }
+
+ @Override
+ public void dumpWatches(PrintWriter pwriter, boolean byPath) {
+ // NOP
+ }
+ }
+
+ @Before
+ public void setUp() throws Exception {
+ System.setProperty(WatchManagerFactory.ZOOKEEPER_WATCH_MANAGER_NAME,
StubbedWatchManager.class.getName());
+ super.setUp();
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ try {
+ super.tearDown();
+ } finally {
+
System.clearProperty(WatchManagerFactory.ZOOKEEPER_WATCH_MANAGER_NAME);
+ }
+ }
+
+ @Test(expected = KeeperException.MarshallingErrorException.class)
+ public void testBehavior() throws IOException, InterruptedException,
KeeperException {
+ try (ZooKeeper zk = createClient(hostPort)) {
+ // the server will generate an exception as our custom watch
manager doesn't implement
+ // the new version of addWatch()
+ zk.addWatch("/foo", event -> {},
AddWatchMode.PERSISTENT_RECURSIVE);
+ }
+ }
+}