This is an automated email from the ASF dual-hosted git repository.
tison pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/zookeeper.git
The following commit(s) were added to refs/heads/master by this push:
new e8b2fbeb1 ZOOKEEPER-4472: Remove persistent watches individually
(#2006)
e8b2fbeb1 is described below
commit e8b2fbeb101e22378aa47df291cc9ca6e58e1e8d
Author: Kezhu Wang <[email protected]>
AuthorDate: Thu Jun 15 13:14:22 2023 +0800
ZOOKEEPER-4472: Remove persistent watches individually (#2006)
* ZOOKEEPER-4472: Remove persistent watches individually
ZOOKEEPER-4466 supports different watch modes one same path, but there
are no corresponding `WatcherType`s for persistent watches. Client has
to resort to `WatcherType.Any` to remove them. This could accidently
interrupt other watches.
This PR adds `WatcherType.Persistent` and `WatcherType.PersistentRecursive`
to remove persistent watches individually.
* Assert removing WatcherType::Data from AddWatchMode::Persistent is
impossible
* fixup! Assert removing WatcherType::Data from AddWatchMode::Persistent is
impossible
---
.../main/java/org/apache/zookeeper/Watcher.java | 7 +-
.../java/org/apache/zookeeper/ZKWatchManager.java | 40 +-
.../apache/zookeeper/cli/RemoveWatchesCommand.java | 6 +
.../java/org/apache/zookeeper/server/DataTree.java | 20 +-
.../org/apache/zookeeper/RemoveWatchesTest.java | 629 ++++++++++++++-------
5 files changed, 481 insertions(+), 221 deletions(-)
diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/Watcher.java
b/zookeeper-server/src/main/java/org/apache/zookeeper/Watcher.java
index ab4b65488..6347fa456 100644
--- a/zookeeper-server/src/main/java/org/apache/zookeeper/Watcher.java
+++ b/zookeeper-server/src/main/java/org/apache/zookeeper/Watcher.java
@@ -191,6 +191,8 @@ public interface Watcher {
enum WatcherType {
Children(1),
Data(2),
+ Persistent(4),
+ PersistentRecursive(5),
Any(3);
// Integer representation of value
@@ -212,7 +214,10 @@ public interface Watcher {
return WatcherType.Data;
case 3:
return WatcherType.Any;
-
+ case 4:
+ return Persistent;
+ case 5:
+ return PersistentRecursive;
default:
throw new RuntimeException("Invalid integer value for
conversion to WatcherType");
}
diff --git
a/zookeeper-server/src/main/java/org/apache/zookeeper/ZKWatchManager.java
b/zookeeper-server/src/main/java/org/apache/zookeeper/ZKWatchManager.java
index 9da424944..514da01cf 100644
--- a/zookeeper-server/src/main/java/org/apache/zookeeper/ZKWatchManager.java
+++ b/zookeeper-server/src/main/java/org/apache/zookeeper/ZKWatchManager.java
@@ -153,6 +153,18 @@ class ZKWatchManager implements ClientWatchManager {
}
break;
}
+ case Persistent: {
+ synchronized (persistentWatches) {
+ removedWatcher = removeWatches(persistentWatches, watcher,
clientPath, local, rc, persistentWatchersToRem);
+ }
+ break;
+ }
+ case PersistentRecursive: {
+ synchronized (persistentRecursiveWatches) {
+ removedWatcher = removeWatches(persistentRecursiveWatches,
watcher, clientPath, local, rc, persistentWatchersToRem);
+ }
+ break;
+ }
case Any: {
synchronized (childWatches) {
removedWatcher = removeWatches(childWatches, watcher,
clientPath, local, rc, childWatchersToRem);
@@ -225,18 +237,6 @@ class ZKWatchManager implements ClientWatchManager {
synchronized (childWatches) {
containsWatcher = contains(path, watcher, childWatches);
}
-
- synchronized (persistentWatches) {
- boolean contains_temp = contains(path, watcher,
- persistentWatches);
- containsWatcher |= contains_temp;
- }
-
- synchronized (persistentRecursiveWatches) {
- boolean contains_temp = contains(path, watcher,
- persistentRecursiveWatches);
- containsWatcher |= contains_temp;
- }
break;
}
case Data: {
@@ -248,17 +248,17 @@ class ZKWatchManager implements ClientWatchManager {
boolean contains_temp = contains(path, watcher, existWatches);
containsWatcher |= contains_temp;
}
-
+ break;
+ }
+ case Persistent: {
synchronized (persistentWatches) {
- boolean contains_temp = contains(path, watcher,
- persistentWatches);
- containsWatcher |= contains_temp;
+ containsWatcher |= contains(path, watcher, persistentWatches);
}
-
+ break;
+ }
+ case PersistentRecursive: {
synchronized (persistentRecursiveWatches) {
- boolean contains_temp = contains(path, watcher,
- persistentRecursiveWatches);
- containsWatcher |= contains_temp;
+ containsWatcher |= contains(path, watcher,
persistentRecursiveWatches);
}
break;
}
diff --git
a/zookeeper-server/src/main/java/org/apache/zookeeper/cli/RemoveWatchesCommand.java
b/zookeeper-server/src/main/java/org/apache/zookeeper/cli/RemoveWatchesCommand.java
index ddaf15fba..748f0c2f2 100644
---
a/zookeeper-server/src/main/java/org/apache/zookeeper/cli/RemoveWatchesCommand.java
+++
b/zookeeper-server/src/main/java/org/apache/zookeeper/cli/RemoveWatchesCommand.java
@@ -37,6 +37,8 @@ public class RemoveWatchesCommand extends CliCommand {
static {
options.addOption("c", false, "child watcher type");
options.addOption("d", false, "data watcher type");
+ options.addOption("p", false, "persistent watcher type");
+ options.addOption("r", false, "persistent recursive watcher type");
options.addOption("a", false, "any watcher type");
options.addOption("l", false, "remove locally when there is no server
connection");
}
@@ -70,6 +72,10 @@ public class RemoveWatchesCommand extends CliCommand {
wtype = WatcherType.Children;
} else if (cl.hasOption("d")) {
wtype = WatcherType.Data;
+ } else if (cl.hasOption("p")) {
+ wtype = WatcherType.Persistent;
+ } else if (cl.hasOption("r")) {
+ wtype = WatcherType.PersistentRecursive;
} else if (cl.hasOption("a")) {
wtype = WatcherType.Any;
}
diff --git
a/zookeeper-server/src/main/java/org/apache/zookeeper/server/DataTree.java
b/zookeeper-server/src/main/java/org/apache/zookeeper/server/DataTree.java
index c6e2ff456..1f0e7a749 100644
--- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/DataTree.java
+++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/DataTree.java
@@ -1563,11 +1563,16 @@ public class DataTree {
case Data:
containsWatcher = this.dataWatches.containsWatcher(path, watcher,
WatcherMode.STANDARD);
break;
+ case Persistent:
+ containsWatcher = this.dataWatches.containsWatcher(path, watcher,
WatcherMode.PERSISTENT);
+ break;
+ case PersistentRecursive:
+ containsWatcher = this.dataWatches.containsWatcher(path, watcher,
WatcherMode.PERSISTENT_RECURSIVE);
+ break;
case Any:
if (this.childWatches.containsWatcher(path, watcher, null)) {
containsWatcher = true;
- }
- if (this.dataWatches.containsWatcher(path, watcher, null)) {
+ } else if (this.dataWatches.containsWatcher(path, watcher, null)) {
containsWatcher = true;
}
break;
@@ -1584,6 +1589,17 @@ public class DataTree {
case Data:
removed = this.dataWatches.removeWatcher(path, watcher,
WatcherMode.STANDARD);
break;
+ case Persistent:
+ if (this.childWatches.removeWatcher(path, watcher,
WatcherMode.PERSISTENT)) {
+ removed = true;
+ }
+ if (this.dataWatches.removeWatcher(path, watcher,
WatcherMode.PERSISTENT)) {
+ removed = true;
+ }
+ break;
+ case PersistentRecursive:
+ removed = this.dataWatches.removeWatcher(path, watcher,
WatcherMode.PERSISTENT_RECURSIVE);
+ break;
case Any:
if (this.childWatches.removeWatcher(path, watcher, null)) {
removed = true;
diff --git
a/zookeeper-server/src/test/java/org/apache/zookeeper/RemoveWatchesTest.java
b/zookeeper-server/src/test/java/org/apache/zookeeper/RemoveWatchesTest.java
index a5de63596..5f3cf9b1d 100644
--- a/zookeeper-server/src/test/java/org/apache/zookeeper/RemoveWatchesTest.java
+++ b/zookeeper-server/src/test/java/org/apache/zookeeper/RemoveWatchesTest.java
@@ -30,6 +30,7 @@ import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.spy;
import java.io.IOException;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
@@ -45,6 +46,7 @@ import org.apache.zookeeper.KeeperException.Code;
import org.apache.zookeeper.Watcher.Event.EventType;
import org.apache.zookeeper.Watcher.WatcherType;
import org.apache.zookeeper.ZooDefs.Ids;
+import org.apache.zookeeper.ZooDefs.OpCode;
import org.apache.zookeeper.server.ServerCnxn;
import org.apache.zookeeper.test.ClientBase;
import org.junit.jupiter.api.AfterEach;
@@ -97,9 +99,16 @@ public class RemoveWatchesTest extends ClientBase {
MyCallback c1 = new MyCallback(rc.intValue(), path);
zk.removeWatches(path, watcher, watcherType, local, c1, null);
assertTrue(c1.matches(), "Didn't succeeds removeWatch operation");
- if (KeeperException.Code.OK.intValue() != c1.rc) {
- KeeperException ke =
KeeperException.create(KeeperException.Code.get(c1.rc));
- throw ke;
+ if (rc.intValue() != c1.rc) {
+ throw KeeperException.create(KeeperException.Code.get(c1.rc));
+ }
+ } else if (rc != Code.OK) {
+ try {
+ zk.removeWatches(path, watcher, watcherType, local);
+ fail("expect exception code " + rc);
+ } catch (KeeperException ex) {
+ assertEquals(rc, ex.code());
+ assertEquals(path, ex.getPath());
}
} else {
zk.removeWatches(path, watcher, watcherType, local);
@@ -118,15 +127,50 @@ public class RemoveWatchesTest extends ClientBase {
MyCallback c1 = new MyCallback(rc.intValue(), path);
zk.removeAllWatches(path, watcherType, local, c1, null);
assertTrue(c1.matches(), "Didn't succeeds removeWatch operation");
- if (KeeperException.Code.OK.intValue() != c1.rc) {
- KeeperException ke =
KeeperException.create(KeeperException.Code.get(c1.rc));
- throw ke;
+ if (rc.intValue() != c1.rc) {
+ throw KeeperException.create(KeeperException.Code.get(c1.rc));
+ }
+ } else if (rc != Code.OK) {
+ try {
+ zk.removeAllWatches(path, watcherType, local);
+ fail("expect exception code " + rc);
+ } catch (KeeperException ex) {
+ assertEquals(rc, ex.code());
+ assertEquals(path, ex.getPath());
}
} else {
zk.removeAllWatches(path, watcherType, local);
}
}
+ private void assertWatchers(ZooKeeper zk, String path, WatcherType...
watcherTypes) {
+ for (WatcherType watcherType : watcherTypes) {
+ String msg = String.format("expect watcher for path %s and type
%s", path, watcherType);
+ assertTrue(isServerSessionWatcher(zk.getSessionId(), path,
watcherType), msg);
+ }
+ }
+
+ private void assertNoWatchers(ZooKeeper zk, String path, WatcherType...
watcherTypes) {
+ for (WatcherType watcherType : watcherTypes) {
+ String msg = String.format("expect no watcher for path %s and type
%s", path, watcherType);
+ assertFalse(isServerSessionWatcher(zk.getSessionId(), path,
watcherType), msg);
+ }
+ }
+
+ private void assertWatchersExcept(ZooKeeper zk, String path,
WatcherType... watcherTypes) {
+ List<WatcherType> excludes = Arrays.asList(watcherTypes);
+ for (WatcherType watcherType : WatcherType.values()) {
+ if (watcherType == WatcherType.Any) {
+ continue;
+ }
+ if (excludes.contains(watcherType)) {
+ assertNoWatchers(zk, path, watcherType);
+ } else {
+ assertWatchers(zk, path, watcherType);
+ }
+ }
+ }
+
/**
* Test verifies removal of single watcher when there is server connection
*/
@@ -338,6 +382,96 @@ public class RemoveWatchesTest extends ClientBase {
assertTrue(events.contains(EventType.NodeDataChanged), "Didn't get
NodeDataChanged event");
}
+ /**
+ * Test verifies removing all watcher with WatcherType.Persistent.
+ *
+ * <p>All other watchers shouldn't be removed.
+ */
+ @ParameterizedTest
+ @ValueSource(booleans = {true, false})
+ @Timeout(value = 90)
+ public void testRemoveAllPersistentWatchers(boolean useAsync) throws
InterruptedException, KeeperException {
+ zk1.create("/node1", null, Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+ BlockingDeque<WatchedEvent> persistentEvents1 = new
LinkedBlockingDeque<>();
+ BlockingDeque<WatchedEvent> persistentEvents2 = new
LinkedBlockingDeque<>();
+
+ Watcher persistentWatcher1 = persistentEvents1::add;
+ Watcher persistentWatcher2 = persistentEvents2::add;
+ zk2.addWatch("/node1", persistentWatcher1, AddWatchMode.PERSISTENT);
+ zk2.addWatch("/node1", persistentWatcher2, AddWatchMode.PERSISTENT);
+
+ BlockingDeque<WatchedEvent> dataEvents = new LinkedBlockingDeque<>();
+ BlockingDeque<WatchedEvent> childrenEvents = new
LinkedBlockingDeque<>();
+ BlockingDeque<WatchedEvent> recursiveEvents = new
LinkedBlockingDeque<>();
+ zk2.getData("/node1", dataEvents::add, null);
+ zk2.getChildren("/node1", childrenEvents::add);
+ zk2.addWatch("/node1", recursiveEvents::add,
AddWatchMode.PERSISTENT_RECURSIVE);
+
+ removeWatches(zk2, "/node1", persistentWatcher1,
WatcherType.Persistent, false, Code.OK, useAsync);
+ removeWatches(zk2, "/node1", persistentWatcher2,
WatcherType.Persistent, false, Code.OK, useAsync);
+ removeWatches(zk2, "/node1", persistentWatcher1, WatcherType.Data,
false, Code.NOWATCHER, useAsync);
+ removeWatches(zk2, "/node1", persistentWatcher2, WatcherType.Data,
false, Code.NOWATCHER, useAsync);
+ removeWatches(zk2, "/node1", persistentWatcher1, WatcherType.Children,
false, Code.NOWATCHER, useAsync);
+ removeWatches(zk2, "/node1", persistentWatcher2, WatcherType.Children,
false, Code.NOWATCHER, useAsync);
+ removeWatches(zk2, "/node1", persistentWatcher1,
WatcherType.PersistentRecursive, false, Code.NOWATCHER, useAsync);
+ removeWatches(zk2, "/node1", persistentWatcher2,
WatcherType.PersistentRecursive, false, Code.NOWATCHER, useAsync);
+
+ zk1.create("/node1/node2", null, Ids.OPEN_ACL_UNSAFE,
CreateMode.PERSISTENT);
+ zk1.setData("/node1", null, -1);
+
+ assertEvent(persistentEvents1, EventType.PersistentWatchRemoved,
"/node1");
+ assertEvent(persistentEvents2, EventType.PersistentWatchRemoved,
"/node1");
+ assertEvent(dataEvents, EventType.NodeDataChanged, "/node1");
+ assertEvent(childrenEvents, EventType.NodeChildrenChanged, "/node1");
+ assertEvent(recursiveEvents, EventType.NodeCreated, "/node1/node2");
+ assertEvent(recursiveEvents, EventType.NodeDataChanged, "/node1");
+ }
+
+ /**
+ * Test verifies removing all watcher with WatcherType.PersistentRecursive.
+ *
+ * <p>All other watchers shouldn't be removed
+ */
+ @ParameterizedTest
+ @ValueSource(booleans = {true, false})
+ @Timeout(value = 90)
+ public void testRemoveAllPersistentRecursiveWatchers(boolean useAsync)
throws InterruptedException, KeeperException {
+ zk1.create("/node1", null, Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+ BlockingDeque<WatchedEvent> recursiveEvents1 = new
LinkedBlockingDeque<>();
+ BlockingDeque<WatchedEvent> recursiveEvents2 = new
LinkedBlockingDeque<>();
+
+ Watcher recursiveWatcher1 = recursiveEvents1::add;
+ Watcher recursiveWatcher2 = recursiveEvents2::add;
+ zk2.addWatch("/node1", recursiveWatcher1,
AddWatchMode.PERSISTENT_RECURSIVE);
+ zk2.addWatch("/node1", recursiveWatcher2,
AddWatchMode.PERSISTENT_RECURSIVE);
+
+ BlockingDeque<WatchedEvent> dataEvents = new LinkedBlockingDeque<>();
+ BlockingDeque<WatchedEvent> childrenEvents = new
LinkedBlockingDeque<>();
+ BlockingDeque<WatchedEvent> persistentEvents = new
LinkedBlockingDeque<>();
+ zk2.getData("/node1", dataEvents::add, null);
+ zk2.getChildren("/node1", childrenEvents::add);
+ zk2.addWatch("/node1", persistentEvents::add, AddWatchMode.PERSISTENT);
+
+ removeWatches(zk2, "/node1", recursiveWatcher1,
WatcherType.PersistentRecursive, false, Code.OK, useAsync);
+ removeWatches(zk2, "/node1", recursiveWatcher2,
WatcherType.PersistentRecursive, false, Code.OK, useAsync);
+ removeWatches(zk2, "/node1", recursiveWatcher1, WatcherType.Data,
false, Code.NOWATCHER, useAsync);
+ removeWatches(zk2, "/node1", recursiveWatcher2, WatcherType.Data,
false, Code.NOWATCHER, useAsync);
+ removeWatches(zk2, "/node1", recursiveWatcher1, WatcherType.Children,
false, Code.NOWATCHER, useAsync);
+ removeWatches(zk2, "/node1", recursiveWatcher2, WatcherType.Children,
false, Code.NOWATCHER, useAsync);
+ removeWatches(zk2, "/node1", recursiveWatcher1,
WatcherType.Persistent, false, Code.NOWATCHER, useAsync);
+ removeWatches(zk2, "/node1", recursiveWatcher2,
WatcherType.Persistent, false, Code.NOWATCHER, useAsync);
+
+ assertEvent(recursiveEvents1, EventType.PersistentWatchRemoved,
"/node1");
+ assertEvent(recursiveEvents2, EventType.PersistentWatchRemoved,
"/node1");
+
+ zk1.create("/node1/child1", null, Ids.OPEN_ACL_UNSAFE,
CreateMode.PERSISTENT);
+ zk1.setData("/node1", "test".getBytes(), -1);
+
+ assertEvent(dataEvents, EventType.NodeDataChanged, "/node1");
+ assertEvent(childrenEvents, EventType.NodeChildrenChanged, "/node1");
+ assertEvent(persistentEvents, EventType.NodeChildrenChanged, "/node1");
+ assertEvent(persistentEvents, EventType.NodeDataChanged, "/node1");
+ }
/**
* Test verifies given watcher doesn't exists!
*/
@@ -360,30 +494,10 @@ public class RemoveWatchesTest extends ClientBase {
// New Watcher which will be used for removal
MyWatcher w3 = new MyWatcher("/node1", 2);
- try {
- removeWatches(zk2, "/node1", w3, WatcherType.Any, false,
Code.NOWATCHER, useAsync);
- fail("Should throw exception as given watcher doesn't exists");
- } catch (KeeperException.NoWatcherException nwe) {
- // expected
- }
- try {
- removeWatches(zk2, "/node1", w3, WatcherType.Children, false,
Code.NOWATCHER, useAsync);
- fail("Should throw exception as given watcher doesn't exists");
- } catch (KeeperException.NoWatcherException nwe) {
- // expected
- }
- try {
- removeWatches(zk2, "/node1", w3, WatcherType.Data, false,
Code.NOWATCHER, useAsync);
- fail("Should throw exception as given watcher doesn't exists");
- } catch (KeeperException.NoWatcherException nwe) {
- // expected
- }
- try {
- removeWatches(zk2, "/nonexists", w3, WatcherType.Data, false,
Code.NOWATCHER, useAsync);
- fail("Should throw exception as given watcher doesn't exists");
- } catch (KeeperException.NoWatcherException nwe) {
- // expected
- }
+ removeWatches(zk2, "/node1", w3, WatcherType.Any, false,
Code.NOWATCHER, useAsync);
+ removeWatches(zk2, "/node1", w3, WatcherType.Children, false,
Code.NOWATCHER, useAsync);
+ removeWatches(zk2, "/node1", w3, WatcherType.Data, false,
Code.NOWATCHER, useAsync);
+ removeWatches(zk2, "/nonexists", w3, WatcherType.Data, false,
Code.NOWATCHER, useAsync);
}
/**
@@ -461,12 +575,7 @@ public class RemoveWatchesTest extends ClientBase {
removeWatches(zk2, "/node1", w2, WatcherType.Any, true, Code.OK,
useAsync);
assertTrue(w2.matches(), "Didn't remove child watcher");
assertFalse(w1.matches(), "Shouldn't remove data watcher");
- try {
- removeWatches(zk2, "/node1", w1, WatcherType.Any, false,
Code.CONNECTIONLOSS, useAsync);
- fail("Should throw exception as last watch removal requires server
connection");
- } catch (KeeperException.ConnectionLossException nwe) {
- // expected
- }
+ removeWatches(zk2, "/node1", w1, WatcherType.Any, false,
Code.CONNECTIONLOSS, useAsync);
assertFalse(w1.matches(), "Shouldn't remove data watcher");
// when local=true, here if connection not available, simply removes
@@ -682,25 +791,17 @@ public class RemoveWatchesTest extends ClientBase {
@ParameterizedTest
@ValueSource(booleans = {true, false})
@Timeout(value = 90)
- public void testNoWatcherServerException(boolean useAsync) throws
InterruptedException, IOException, TimeoutException {
+ public void testNoWatcherServerException(boolean useAsync) throws
KeeperException, InterruptedException, IOException, TimeoutException {
CountdownWatcher watcher = new CountdownWatcher();
ZooKeeper zk = spy(new ZooKeeper(hostPort, CONNECTION_TIMEOUT,
watcher));
MyWatchManager watchManager = new MyWatchManager(false, watcher);
doReturn(watchManager).when(zk).getWatchManager();
- boolean nw = false;
watcher.waitForConnected(CONNECTION_TIMEOUT);
- try {
- zk.removeWatches("/nowatchhere", watcher, WatcherType.Data, false);
- } catch (KeeperException nwe) {
- if (nwe.code().intValue() == Code.NOWATCHER.intValue()) {
- nw = true;
- }
- }
+ removeWatches(zk, "/nowatchhere", watcher, WatcherType.Data, false,
Code.NOWATCHER, useAsync);
assertThat("Server didn't return NOWATCHER",
watchManager.lastReturnCode, is(Code.NOWATCHER.intValue()));
- assertThat("NoWatcherException didn't happen", nw, is(true));
}
/**
@@ -711,12 +812,7 @@ public class RemoveWatchesTest extends ClientBase {
@Timeout(value = 90)
public void testRemoveAllNoWatcherException(boolean useAsync) throws
IOException, InterruptedException, KeeperException {
zk1.create("/node1", null, Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
- try {
- removeAllWatches(zk2, "/node1", WatcherType.Any, false,
Code.NOWATCHER, useAsync);
- fail("Should throw exception as given watcher doesn't exists");
- } catch (KeeperException.NoWatcherException nwe) {
- // expected
- }
+ removeAllWatches(zk2, "/node1", WatcherType.Any, false,
Code.NOWATCHER, useAsync);
}
/**
@@ -810,213 +906,342 @@ public class RemoveWatchesTest extends ClientBase {
}
/**
- * Test verifies WatcherType.Data - removes only the configured data
watcher
- * function
+ * Test verifies {@link WatcherType#Persistent} - removes only the
configured watcher function
+ */
+ @ParameterizedTest
+ @ValueSource(booleans = {true, false})
+ @Timeout(value = 90)
+ public void testRemoveWhenMultiplePersistentWatchesOnAPath(boolean
useAsync) throws Exception {
+ zk1.create("/node1", null, Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+
+ BlockingDeque<WatchedEvent> persistentEvents1 = new
LinkedBlockingDeque<>();
+ BlockingDeque<WatchedEvent> persistentEvents2 = new
LinkedBlockingDeque<>();
+ Watcher w1 = persistentEvents1::add;
+ // Add multiple persistent watches
+ zk2.addWatch("/node1", w1, AddWatchMode.PERSISTENT);
+ zk2.addWatch("/node1", persistentEvents2::add,
AddWatchMode.PERSISTENT);
+
+ removeWatches(zk2, "/node1", w1, WatcherType.Persistent, false,
Code.OK, useAsync);
+ assertEvent(persistentEvents1, EventType.PersistentWatchRemoved,
"/node1");
+
+ zk1.create("/node1/node2", null, Ids.OPEN_ACL_UNSAFE,
CreateMode.PERSISTENT);
+ assertEvent(persistentEvents2, EventType.NodeChildrenChanged,
"/node1");
+ assertNoEvent(persistentEvents1);
+ }
+
+ /**
+ * Test verifies {@link WatcherType#PersistentRecursive} - removes only
the configured watcher function
+ */
+ @ParameterizedTest
+ @ValueSource(booleans = {true, false})
+ @Timeout(value = 90)
+ public void
testRemoveWhenMultiplePersistentRecursiveWatchesOnAPath(boolean useAsync)
throws Exception {
+ zk1.create("/node1", null, Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+
+ BlockingDeque<WatchedEvent> recursiveEvents1 = new
LinkedBlockingDeque<>();
+ BlockingDeque<WatchedEvent> recursiveEvents2 = new
LinkedBlockingDeque<>();
+ Watcher w1 = recursiveEvents1::add;
+ // Add multiple persistent recursive watches
+ zk2.addWatch("/node1", w1, AddWatchMode.PERSISTENT_RECURSIVE);
+ zk2.addWatch("/node1", recursiveEvents2::add,
AddWatchMode.PERSISTENT_RECURSIVE);
+
+ removeWatches(zk2, "/node1", w1, WatcherType.PersistentRecursive,
false, Code.OK, useAsync);
+ assertEvent(recursiveEvents1, EventType.PersistentWatchRemoved,
"/node1");
+
+ zk1.create("/node1/node2", null, Ids.OPEN_ACL_UNSAFE,
CreateMode.PERSISTENT);
+ assertEvent(recursiveEvents2, EventType.NodeCreated, "/node1/node2");
+ assertNoEvent(recursiveEvents1);
+ }
+
+ /**
+ * Test verifies {@link OpCode#checkWatches} {@link
WatcherType#Persistent} using {@link WatcherType#Data}.
+ */
+ @ParameterizedTest
+ @ValueSource(booleans = {true, false})
+ @Timeout(value = 90)
+ public void testRemovePersistentWatchesOnAPathPartially(boolean useAsync)
throws Exception {
+ zk1.create("/node1", null, Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+
+ BlockingDeque<WatchedEvent> persistentEvents = new
LinkedBlockingDeque<>();
+ Watcher persistentWatcher = persistentEvents::add;
+ zk2.addWatch("/node1", persistentWatcher, AddWatchMode.PERSISTENT);
+
+ assertWatchers(zk2, "/node1", WatcherType.Persistent);
+ assertNoWatchers(zk2, "/node1", WatcherType.Data);
+ removeWatches(zk2, "/node1", persistentWatcher, WatcherType.Data,
false, Code.NOWATCHER, useAsync);
+ assertWatchers(zk2, "/node1", WatcherType.Persistent);
+ assertNoWatchers(zk2, "/node1", WatcherType.Data);
+
+ zk1.create("/node1/child1", null, Ids.OPEN_ACL_UNSAFE,
CreateMode.PERSISTENT);
+ zk1.setData("/node1", null, -1);
+
+ assertEvent(persistentEvents, EventType.NodeChildrenChanged, "/node1");
+ assertEvent(persistentEvents, EventType.NodeDataChanged, "/node1");
+
+ assertNull(persistentEvents.poll(10, TimeUnit.MILLISECONDS));
+ }
+
+ /**
+ * Test verifies {@link OpCode#removeWatches} {@link WatcherType#Data}.
+ *
+ * <p>All other watcher types shouldn't be removed.
*/
@ParameterizedTest
@ValueSource(booleans = {true, false})
@Timeout(value = 90)
public void testRemoveAllDataWatchesOnAPath(boolean useAsync) throws
Exception {
zk1.create("/node1", null, Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
- final CountDownLatch dWatchCount = new CountDownLatch(2);
- final CountDownLatch rmWatchCount = new CountDownLatch(2);
- Watcher w1 = event -> {
- switch (event.getType()) {
- case DataWatchRemoved:
- rmWatchCount.countDown();
- break;
- case NodeDataChanged:
- dWatchCount.countDown();
- break;
- default:
- break;
- }
- };
- Watcher w2 = event -> {
- switch (event.getType()) {
- case DataWatchRemoved:
- rmWatchCount.countDown();
- break;
- case NodeDataChanged:
- dWatchCount.countDown();
- break;
- default:
- break;
- }
- };
+
+ BlockingDeque<WatchedEvent> dataEvents1 = new LinkedBlockingDeque<>();
+ BlockingDeque<WatchedEvent> dataEvents2 = new LinkedBlockingDeque<>();
// Add multiple data watches
- LOG.info("Adding data watcher {} on path {}", w1, "/node1");
- assertNotNull(zk2.exists("/node1", w1), "Didn't set data watches");
- LOG.info("Adding data watcher {} on path {}", w2, "/node1");
- assertNotNull(zk2.exists("/node1", w2), "Didn't set data watches");
+ zk2.getData("/node1", dataEvents1::add, null);
+ zk2.getData("/node1", dataEvents2::add, null);
+ BlockingDeque<WatchedEvent> childrenEvents = new
LinkedBlockingDeque<>();
BlockingDeque<WatchedEvent> persistentEvents = new
LinkedBlockingDeque<>();
BlockingDeque<WatchedEvent> recursiveEvents = new
LinkedBlockingDeque<>();
+ zk2.getChildren("/node1", childrenEvents::add);
zk2.addWatch("/node1", persistentEvents::add, AddWatchMode.PERSISTENT);
zk2.addWatch("/node1", recursiveEvents::add,
AddWatchMode.PERSISTENT_RECURSIVE);
- assertTrue(isServerSessionWatcher(zk2.getSessionId(), "/node1",
WatcherType.Data), "Server session is not a watcher");
+ assertWatchers(zk2, "/node1", WatcherType.values());
removeAllWatches(zk2, "/node1", WatcherType.Data, false, Code.OK,
useAsync);
- assertTrue(rmWatchCount.await(CONNECTION_TIMEOUT,
TimeUnit.MILLISECONDS), "Didn't remove data watcher");
-
- assertFalse(isServerSessionWatcher(zk2.getSessionId(), "/node1",
WatcherType.Data), "Server session is still a watcher after removal");
+ assertEvent(dataEvents1, EventType.DataWatchRemoved, "/node1");
+ assertEvent(dataEvents2, EventType.DataWatchRemoved, "/node1");
+ assertWatchersExcept(zk2, "/node1", WatcherType.Data);
zk1.create("/node1/child", null, Ids.OPEN_ACL_UNSAFE,
CreateMode.PERSISTENT);
- zk1.setData("/node1/child", new byte[0], -1);
- zk1.delete("/node1/child", -1);
- zk1.setData("/node1", new byte[0], -1);
- zk1.delete("/node1", -1);
+ zk1.setData("/node1", null, -1);
+
+ assertEvent(childrenEvents, EventType.NodeChildrenChanged, "/node1");
- assertEvent(persistentEvents, EventType.NodeChildrenChanged, "/node1");
assertEvent(persistentEvents, EventType.NodeChildrenChanged, "/node1");
assertEvent(persistentEvents, EventType.NodeDataChanged, "/node1");
- assertEvent(persistentEvents, EventType.NodeDeleted, "/node1");
assertEvent(recursiveEvents, EventType.NodeCreated, "/node1/child");
- assertEvent(recursiveEvents, EventType.NodeDataChanged,
"/node1/child");
- assertEvent(recursiveEvents, EventType.NodeDeleted, "/node1/child");
assertEvent(recursiveEvents, EventType.NodeDataChanged, "/node1");
- assertEvent(recursiveEvents, EventType.NodeDeleted, "/node1");
- assertEquals(2, dWatchCount.getCount(), "Received watch notification
after removal!");
+ assertNoEvent(dataEvents1);
+ assertNoEvent(dataEvents2);
}
/**
- * Test verifies WatcherType.Children - removes only the configured child
- * watcher function
+ * Test verifies {@link OpCode#removeWatches} {@link WatcherType#Children}.
+ *
+ * <p>All other watcher types shouldn't be removed.
*/
@ParameterizedTest
@ValueSource(booleans = {true, false})
@Timeout(value = 90)
public void testRemoveAllChildWatchesOnAPath(boolean useAsync) throws
Exception {
zk1.create("/node1", null, Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
- final CountDownLatch cWatchCount = new CountDownLatch(2);
- final CountDownLatch rmWatchCount = new CountDownLatch(2);
- Watcher w1 = event -> {
- switch (event.getType()) {
- case ChildWatchRemoved:
- rmWatchCount.countDown();
- break;
- case NodeChildrenChanged:
- cWatchCount.countDown();
- break;
- default:
- break;
- }
- };
- Watcher w2 = event -> {
- switch (event.getType()) {
- case ChildWatchRemoved:
- rmWatchCount.countDown();
- break;
- case NodeChildrenChanged:
- cWatchCount.countDown();
- break;
- default:
- break;
- }
- };
+
+ BlockingDeque<WatchedEvent> childrenEvents1 = new
LinkedBlockingDeque<>();
+ BlockingDeque<WatchedEvent> childrenEvents2 = new
LinkedBlockingDeque<>();
// Add multiple child watches
- LOG.info("Adding child watcher {} on path {}", w1, "/node1");
- assertEquals(0, zk2.getChildren("/node1", w1).size(), "Didn't set
child watches");
- LOG.info("Adding child watcher {} on path {}", w2, "/node1");
- assertEquals(0, zk2.getChildren("/node1", w2).size(), "Didn't set
child watches");
+ zk2.getChildren("/node1", childrenEvents1::add);
+ zk2.getChildren("/node1", childrenEvents2::add);
+ BlockingDeque<WatchedEvent> dataEvents = new LinkedBlockingDeque<>();
BlockingDeque<WatchedEvent> persistentEvents = new
LinkedBlockingDeque<>();
+ BlockingDeque<WatchedEvent> recursiveEvents = new
LinkedBlockingDeque<>();
+ zk2.getData("/node1", dataEvents::add, null);
zk2.addWatch("/node1", persistentEvents::add, AddWatchMode.PERSISTENT);
+ zk2.addWatch("/node1", recursiveEvents::add,
AddWatchMode.PERSISTENT_RECURSIVE);
- assertTrue(isServerSessionWatcher(zk2.getSessionId(), "/node1",
WatcherType.Children), "Server session is not a watcher");
+ assertWatchers(zk2, "/node1", WatcherType.values());
removeAllWatches(zk2, "/node1", WatcherType.Children, false, Code.OK,
useAsync);
- assertTrue(rmWatchCount.await(CONNECTION_TIMEOUT,
TimeUnit.MILLISECONDS), "Didn't remove child watcher");
-
- assertFalse(isServerSessionWatcher(zk2.getSessionId(), "/node1",
WatcherType.Children), "Server session is still a watcher after removal");
+ assertEvent(childrenEvents1, EventType.ChildWatchRemoved, "/node1");
+ assertEvent(childrenEvents2, EventType.ChildWatchRemoved, "/node1");
+ assertWatchersExcept(zk2, "/node1", WatcherType.Children);
zk1.create("/node1/child", null, Ids.OPEN_ACL_UNSAFE,
CreateMode.PERSISTENT);
- zk1.setData("/node1/child", new byte[0], -1);
- zk1.delete("/node1/child", -1);
- zk1.setData("/node1", new byte[0], -1);
- zk1.delete("/node1", -1);
+ zk1.setData("/node1", null, -1);
+ assertEvent(dataEvents, EventType.NodeDataChanged, "/node1");
assertEvent(persistentEvents, EventType.NodeChildrenChanged, "/node1");
+ assertEvent(persistentEvents, EventType.NodeDataChanged, "/node1");
+ assertEvent(recursiveEvents, EventType.NodeCreated, "/node1/child");
+ assertEvent(recursiveEvents, EventType.NodeDataChanged, "/node1");
+
+ assertNull(childrenEvents1.poll(10, TimeUnit.MILLISECONDS));
+ assertNull(childrenEvents2.poll(10, TimeUnit.MILLISECONDS));
+ }
+
+ /**
+ * Test verifies {@link OpCode#removeWatches} {@link
WatcherType#Persistent}.
+ *
+ * <p>All other watcher types shouldn't be removed.
+ */
+ @ParameterizedTest
+ @ValueSource(booleans = {true, false})
+ @Timeout(value = 90)
+ public void testRemoveAllPersistentWatchesOnAPath(boolean useAsync) throws
Exception {
+ zk1.create("/node1", null, Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+
+ BlockingDeque<WatchedEvent> persistentEvents1 = new
LinkedBlockingDeque<>();
+ BlockingDeque<WatchedEvent> persistentEvents2 = new
LinkedBlockingDeque<>();
+ // Add multiple persistent watches
+ zk2.addWatch("/node1", persistentEvents1::add,
AddWatchMode.PERSISTENT);
+ zk2.addWatch("/node1", persistentEvents2::add,
AddWatchMode.PERSISTENT);
+
+ BlockingDeque<WatchedEvent> dataEvents = new LinkedBlockingDeque<>();
+ BlockingDeque<WatchedEvent> childrenEvents = new
LinkedBlockingDeque<>();
+ BlockingDeque<WatchedEvent> recursiveEvents = new
LinkedBlockingDeque<>();
+ zk2.getData("/node1", dataEvents::add, null);
+ zk2.getChildren("/node1", childrenEvents::add, null);
+ zk2.addWatch("/node1", recursiveEvents::add,
AddWatchMode.PERSISTENT_RECURSIVE);
+
+ assertWatchers(zk2, "/node1", WatcherType.values());
+ removeAllWatches(zk2, "/node1", WatcherType.Persistent, false,
Code.OK, useAsync);
+ assertEvent(persistentEvents1, EventType.PersistentWatchRemoved,
"/node1");
+ assertEvent(persistentEvents2, EventType.PersistentWatchRemoved,
"/node1");
+ assertWatchersExcept(zk2, "/node1", WatcherType.Persistent);
+
+ zk1.create("/node1/child1", null, Ids.OPEN_ACL_UNSAFE,
CreateMode.PERSISTENT);
+ zk1.setData("/node1", null, -1);
+
+ assertEvent(dataEvents, EventType.NodeDataChanged, "/node1");
+ assertEvent(childrenEvents, EventType.NodeChildrenChanged, "/node1");
+ assertEvent(recursiveEvents, EventType.NodeCreated, "/node1/child1");
+ assertEvent(recursiveEvents, EventType.NodeDataChanged, "/node1");
+
+ assertNull(persistentEvents1.poll(10, TimeUnit.MILLISECONDS));
+ assertNull(persistentEvents2.poll(10, TimeUnit.MILLISECONDS));
+ }
+
+ /**
+ * Test verifies {@link OpCode#removeWatches} {@link
WatcherType#Persistent} using {@link WatcherType#Data}.
+ */
+ @ParameterizedTest
+ @ValueSource(booleans = {true, false})
+ @Timeout(value = 90)
+ public void testRemoveAllPersistentWatchesOnAPathPartially(boolean
useAsync) throws Exception {
+ zk1.create("/node1", null, Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+
+ BlockingDeque<WatchedEvent> persistentEvents = new
LinkedBlockingDeque<>();
+ zk2.addWatch("/node1", persistentEvents::add, AddWatchMode.PERSISTENT);
+
+ assertWatchers(zk2, "/node1", WatcherType.Persistent);
+ assertNoWatchers(zk2, "/node1", WatcherType.Data);
+ removeAllWatches(zk2, "/node1", WatcherType.Data, false,
Code.NOWATCHER, useAsync);
+ assertWatchers(zk2, "/node1", WatcherType.Persistent);
+ assertNoWatchers(zk2, "/node1", WatcherType.Data);
+
+ zk1.create("/node1/child1", null, Ids.OPEN_ACL_UNSAFE,
CreateMode.PERSISTENT);
+ zk1.setData("/node1", null, -1);
+
assertEvent(persistentEvents, EventType.NodeChildrenChanged, "/node1");
assertEvent(persistentEvents, EventType.NodeDataChanged, "/node1");
- assertEvent(persistentEvents, EventType.NodeDeleted, "/node1");
- assertEquals(2, cWatchCount.getCount(), "Received watch notification
after removal!");
+ assertNull(persistentEvents.poll(10, TimeUnit.MILLISECONDS));
}
/**
- * Test verifies WatcherType.Any - removes all the configured child,data
- * watcher functions
+ * Test verifies {@link OpCode#removeWatches} {@link
WatcherType#PersistentRecursive}.
+ *
+ * <p>All other watcher types shouldn't be removed.
*/
@ParameterizedTest
@ValueSource(booleans = {true, false})
@Timeout(value = 90)
- public void testRemoveAllWatchesOnAPath(boolean useAsync) throws Exception
{
+ public void testRemoveAllPersistentRecursiveWatchesOnAPath(boolean
useAsync) throws Exception {
zk1.create("/node1", null, Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
- final CountDownLatch watchCount = new CountDownLatch(2);
- final CountDownLatch rmWatchCount = new CountDownLatch(4);
- Watcher w1 = event -> {
- switch (event.getType()) {
- case ChildWatchRemoved:
- case DataWatchRemoved:
- rmWatchCount.countDown();
- break;
- case NodeChildrenChanged:
- case NodeDataChanged:
- watchCount.countDown();
- break;
- default:
- break;
- }
- };
- Watcher w2 = event -> {
- switch (event.getType()) {
- case ChildWatchRemoved:
- case DataWatchRemoved:
- rmWatchCount.countDown();
- break;
- case NodeChildrenChanged:
- case NodeDataChanged:
- watchCount.countDown();
- break;
- default:
- break;
- }
- };
- // Add multiple child watches
- LOG.info("Adding child watcher {} on path {}", w1, "/node1");
- assertEquals(0, zk2.getChildren("/node1", w1).size(), "Didn't set
child watches");
- LOG.info("Adding child watcher {} on path {}", w2, "/node1");
- assertEquals(0, zk2.getChildren("/node1", w2).size(), "Didn't set
child watches");
- // Add multiple data watches
- LOG.info("Adding data watcher {} on path {}", w1, "/node1");
- assertNotNull(zk2.exists("/node1", w1), "Didn't set data watches");
- LOG.info("Adding data watcher {} on path {}", w2, "/node1");
- assertNotNull(zk2.exists("/node1", w2), "Didn't set data watches");
+ BlockingDeque<WatchedEvent> recursiveEvents1 = new
LinkedBlockingDeque<>();
+ BlockingDeque<WatchedEvent> recursiveEvents2 = new
LinkedBlockingDeque<>();
+ // Add multiple persistent recursive watches
+ zk2.addWatch("/node1", recursiveEvents1::add,
AddWatchMode.PERSISTENT_RECURSIVE);
+ zk2.addWatch("/node1", recursiveEvents2::add,
AddWatchMode.PERSISTENT_RECURSIVE);
+ BlockingDeque<WatchedEvent> dataEvents = new LinkedBlockingDeque<>();
+ BlockingDeque<WatchedEvent> childrenEvents = new
LinkedBlockingDeque<>();
BlockingDeque<WatchedEvent> persistentEvents = new
LinkedBlockingDeque<>();
- BlockingDeque<WatchedEvent> recursiveEvents = new
LinkedBlockingDeque<>();
+ zk2.getData("/node1", dataEvents::add, null);
+ zk2.getChildren("/node1", childrenEvents::add, null);
zk2.addWatch("/node1", persistentEvents::add, AddWatchMode.PERSISTENT);
- zk2.addWatch("/node1", recursiveEvents::add,
AddWatchMode.PERSISTENT_RECURSIVE);
- assertTrue(isServerSessionWatcher(zk2.getSessionId(), "/node1",
WatcherType.Any), "Server session is not a watcher");
- assertTrue(isServerSessionWatcher(zk2.getSessionId(), "/node1",
WatcherType.Data), "Server session is not a watcher");
- assertTrue(isServerSessionWatcher(zk2.getSessionId(), "/node1",
WatcherType.Children), "Server session is not a watcher");
+ assertWatchers(zk2, "/node1", WatcherType.values());
+ removeAllWatches(zk2, "/node1", WatcherType.PersistentRecursive,
false, Code.OK, useAsync);
+ assertEvent(recursiveEvents1, EventType.PersistentWatchRemoved,
"/node1");
+ assertEvent(recursiveEvents2, EventType.PersistentWatchRemoved,
"/node1");
+ assertWatchersExcept(zk2, "/node1", WatcherType.PersistentRecursive);
+
+ zk1.create("/node1/child1", null, Ids.OPEN_ACL_UNSAFE,
CreateMode.PERSISTENT);
+ zk1.setData("/node1", null, -1);
+
+ assertEvent(dataEvents, EventType.NodeDataChanged, "/node1");
+ assertEvent(childrenEvents, EventType.NodeChildrenChanged, "/node1");
+ assertEvent(persistentEvents, EventType.NodeChildrenChanged, "/node1");
+ assertEvent(persistentEvents, EventType.NodeDataChanged, "/node1");
+
+ assertNull(recursiveEvents1.poll(10, TimeUnit.MILLISECONDS));
+ assertNull(recursiveEvents2.poll(10, TimeUnit.MILLISECONDS));
+ }
+
+ /**
+ * Test verifies {@link OpCode#removeWatches} {@link WatcherType#Any}.
+ *
+ * <p>All watcher types should be removed.
+ */
+ @ParameterizedTest
+ @ValueSource(booleans = {true, false})
+ @Timeout(value = 90)
+ public void testRemoveAllWatchesOnAPath(boolean useAsync) throws Exception
{
+ zk1.create("/node1", null, Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+
+ // Add multiple child watches
+ BlockingDeque<WatchedEvent> childEvents1 = new LinkedBlockingDeque<>();
+ BlockingDeque<WatchedEvent> childEvents2 = new LinkedBlockingDeque<>();
+ zk2.getChildren("/node1", childEvents1::add);
+ zk2.getChildren("/node1", childEvents2::add);
+
+ // Add multiple data watches
+ BlockingDeque<WatchedEvent> dataEvents1 = new LinkedBlockingDeque<>();
+ BlockingDeque<WatchedEvent> dataEvents2 = new LinkedBlockingDeque<>();
+ zk2.getData("/node1", dataEvents1::add, null);
+ zk2.exists("/node1", dataEvents2::add);
+
+ // Add multiple persistent watches
+ BlockingDeque<WatchedEvent> persistentEvents1 = new
LinkedBlockingDeque<>();
+ BlockingDeque<WatchedEvent> persistentEvents2 = new
LinkedBlockingDeque<>();
+ zk2.addWatch("/node1", persistentEvents1::add,
AddWatchMode.PERSISTENT);
+ zk2.addWatch("/node1", persistentEvents2::add,
AddWatchMode.PERSISTENT);
+
+ // Add multiple recursive watches
+ BlockingDeque<WatchedEvent> recursiveEvents1 = new
LinkedBlockingDeque<>();
+ BlockingDeque<WatchedEvent> recursiveEvents2 = new
LinkedBlockingDeque<>();
+ zk2.addWatch("/node1", recursiveEvents1::add,
AddWatchMode.PERSISTENT_RECURSIVE);
+ zk2.addWatch("/node1", recursiveEvents2::add,
AddWatchMode.PERSISTENT_RECURSIVE);
+
+ assertWatchers(zk2, "/node1", WatcherType.values());
removeAllWatches(zk2, "/node1", WatcherType.Any, false, Code.OK,
useAsync);
- assertTrue(rmWatchCount.await(CONNECTION_TIMEOUT,
TimeUnit.MILLISECONDS), "Didn't remove data watcher");
- assertFalse(isServerSessionWatcher(zk2.getSessionId(), "/node1",
WatcherType.Any), "Server session is still a watcher after removal");
- assertFalse(isServerSessionWatcher(zk2.getSessionId(), "/node1",
WatcherType.Data), "Server session is still a watcher after removal");
- assertFalse(isServerSessionWatcher(zk2.getSessionId(), "/node1",
WatcherType.Children), "Server session is still a watcher after removal");
- assertEvent(persistentEvents, EventType.PersistentWatchRemoved,
"/node1");
- assertEvent(recursiveEvents, EventType.PersistentWatchRemoved,
"/node1");
+ assertEvent(childEvents1, EventType.ChildWatchRemoved, "/node1");
+ assertEvent(childEvents2, EventType.ChildWatchRemoved, "/node1");
- zk1.delete("/node1", -1);
- assertNull(persistentEvents.poll(10, TimeUnit.MILLISECONDS));
- assertNull(recursiveEvents.poll(10, TimeUnit.MILLISECONDS));
- assertEquals(2, watchCount.getCount(), "Received watch notification
after removal!");
+ assertEvent(dataEvents1, EventType.DataWatchRemoved, "/node1");
+ assertEvent(dataEvents2, EventType.DataWatchRemoved, "/node1");
+
+ assertEvent(persistentEvents1, EventType.PersistentWatchRemoved,
"/node1");
+ assertEvent(persistentEvents2, EventType.PersistentWatchRemoved,
"/node1");
+
+ assertEvent(recursiveEvents1, EventType.PersistentWatchRemoved,
"/node1");
+ assertEvent(recursiveEvents2, EventType.PersistentWatchRemoved,
"/node1");
+ assertNoWatchers(zk2, "/node1", WatcherType.values());
+
+ zk1.create("/node1/child1", null, Ids.OPEN_ACL_UNSAFE,
CreateMode.PERSISTENT);
+ zk1.setData("/node1", null, -1);
+
+ assertNoEvent(childEvents1);
+ assertNoEvent(childEvents2);
+ assertNoEvent(dataEvents1);
+ assertNoEvent(dataEvents2);
+ assertNoEvent(persistentEvents1);
+ assertNoEvent(persistentEvents2);
+ assertNoEvent(recursiveEvents1);
+ assertNoEvent(recursiveEvents2);
}
private static class MyWatchManager extends ZKWatchManager {
@@ -1159,4 +1384,12 @@ public class RemoveWatchesTest extends ClientBase {
assertEquals(eventType, event.getType());
assertEquals(path, event.getPath());
}
+
+ /**
+ * Asserts no event from queue in a short period.
+ */
+ private void assertNoEvent(BlockingQueue<WatchedEvent> events) throws
InterruptedException {
+ // Short timeout so we don't hurt CI too much. It will fail finally
given enough run if there are bugs.
+ assertNull(events.poll(10, TimeUnit.MILLISECONDS));
+ }
}