This is an automated email from the ASF dual-hosted git repository. randgalt pushed a commit to branch CURATOR-549-zk36-persistent-watcher-recipes in repository https://gitbox.apache.org/repos/asf/curator.git
commit ca5e3b2b953dc0c0fd111317b7c206449da3ac69 Author: randgalt <[email protected]> AuthorDate: Sat Nov 2 11:40:44 2019 -0500 CURATOR-549 Bring Curator up to ZooKeeper 3.5.6 in preparation for supporting persistent recursive watchers while maintaining background compatability with previous versions of ZK. Added a new module to make sure we maintain compatibility with ZK 3.5.x. ZooKeeper 3.6.0 has some significant changes from previous versions. The reconfig APIs have moved into a new class, ZooKeeperAdmin. This class existed in 3.5.x but wasn't required. Now it is. A bunch of little things changed in the ZK server code [...] There is a new module, curator-test-zk35. It forces ZooKeeper 3.5.6 and performs selected tests from the other modules to ensure compatibility. Tests annotated with TestNG groups zk35 and zk35Compatibility are tested. Group zk36 is excluded. Note: these tests will only run from Maven. I don't think IntelliJ/Eclipse support the Maven syntax I used. Support persistent watchers in ZK 3.6+ while maintaining background compatability with previous versions of ZK. Added a new module to make sure we maintain comaptibility with ZK 3.5.x --- .../curator/framework/imps/TestWatchesBuilder.java | 431 +++++++++++---------- 1 file changed, 222 insertions(+), 209 deletions(-) diff --git a/curator-framework/src/test/java/org/apache/curator/framework/imps/TestWatchesBuilder.java b/curator-framework/src/test/java/org/apache/curator/framework/imps/TestWatchesBuilder.java index 26c41f1..b65e85c 100644 --- a/curator-framework/src/test/java/org/apache/curator/framework/imps/TestWatchesBuilder.java +++ b/curator-framework/src/test/java/org/apache/curator/framework/imps/TestWatchesBuilder.java @@ -16,6 +16,7 @@ * specific language governing permissions and limitations * under the License. */ + package org.apache.curator.framework.imps; import org.apache.curator.framework.CuratorFramework; @@ -55,7 +56,7 @@ public class TestWatchesBuilder extends CuratorTestBase final AtomicReference<ConnectionState> state = new AtomicReference<ConnectionState>(); client.getConnectionStateListenable().addListener(new ConnectionStateListener() { - + @Override public void stateChanged(CuratorFramework client, ConnectionState newState) { @@ -66,13 +67,13 @@ public class TestWatchesBuilder extends CuratorTestBase } } }); - + return state; } - + private boolean blockUntilDesiredConnectionState(AtomicReference<ConnectionState> stateRef, Timing timing, final ConnectionState desiredState) { - if(stateRef.get() == desiredState) + if ( stateRef.get() == desiredState ) { return true; } @@ -80,55 +81,55 @@ public class TestWatchesBuilder extends CuratorTestBase //noinspection SynchronizationOnLocalVariableOrMethodParameter synchronized(stateRef) { - if(stateRef.get() == desiredState) + if ( stateRef.get() == desiredState ) { return true; } - + try { stateRef.wait(timing.milliseconds()); return stateRef.get() == desiredState; } - catch(InterruptedException e) + catch ( InterruptedException e ) { Thread.currentThread().interrupt(); return false; } } } - + @Test public void testRemoveCuratorDefaultWatcher() throws Exception { Timing timing = new Timing(); CuratorFramework client = CuratorFrameworkFactory.builder(). - connectString(server.getConnectString()). - retryPolicy(new RetryOneTime(1)). - build(); + connectString(server.getConnectString()). + retryPolicy(new RetryOneTime(1)). + build(); try { client.start(); - + final CountDownLatch removedLatch = new CountDownLatch(1); - - final String path = "/"; + + final String path = "/"; client.getCuratorListenable().addListener(new CuratorListener() - { + { @Override - public void eventReceived(CuratorFramework client, CuratorEvent event) - throws Exception + public void eventReceived(CuratorFramework client, CuratorEvent event) throws Exception { - if(event.getType() == CuratorEventType.WATCHED && event.getWatchedEvent().getType() == EventType.DataWatchRemoved) { + if ( event.getType() == CuratorEventType.WATCHED && event.getWatchedEvent().getType() == EventType.DataWatchRemoved ) + { removedLatch.countDown(); - } + } } }); - + client.checkExists().watched().forPath(path); - + client.watches().removeAll().forPath(path); - + Assert.assertTrue(timing.awaitLatch(removedLatch), "Timed out waiting for watch removal"); } finally @@ -136,34 +137,35 @@ public class TestWatchesBuilder extends CuratorTestBase CloseableUtils.closeQuietly(client); } } - + @Test public void testRemoveCuratorWatch() throws Exception - { + { Timing timing = new Timing(); CuratorFrameworkImpl client = (CuratorFrameworkImpl)CuratorFrameworkFactory.builder(). - connectString(server.getConnectString()). - retryPolicy(new RetryOneTime(1)). - build(); + connectString(server.getConnectString()). + retryPolicy(new RetryOneTime(1)). + build(); try { client.start(); - + final CountDownLatch removedLatch = new CountDownLatch(1); - - final String path = "/"; + + final String path = "/"; CuratorWatcher watcher = new CuratorWatcher() { - + @Override public void process(WatchedEvent event) throws Exception { - if(event.getPath().equals(path) && event.getType() == EventType.DataWatchRemoved) { + if ( event.getPath().equals(path) && event.getType() == EventType.DataWatchRemoved ) + { removedLatch.countDown(); } } }; - + client.checkExists().usingWatcher(watcher).forPath(path); client.watches().remove(watcher).forPath(path); @@ -174,25 +176,25 @@ public class TestWatchesBuilder extends CuratorTestBase { CloseableUtils.closeQuietly(client); } - } - + } + @Test public void testRemoveWatch() throws Exception - { + { Timing timing = new Timing(); CuratorFrameworkImpl client = (CuratorFrameworkImpl)CuratorFrameworkFactory.builder(). - connectString(server.getConnectString()). - retryPolicy(new RetryOneTime(1)). - build(); + connectString(server.getConnectString()). + retryPolicy(new RetryOneTime(1)). + build(); try { client.start(); - + final CountDownLatch removedLatch = new CountDownLatch(1); - - final String path = "/"; + + final String path = "/"; Watcher watcher = new CountDownWatcher(path, removedLatch, EventType.DataWatchRemoved); - + client.checkExists().usingWatcher(watcher).forPath(path); client.watches().remove(watcher).forPath(path); @@ -204,97 +206,97 @@ public class TestWatchesBuilder extends CuratorTestBase CloseableUtils.closeQuietly(client); } } - + @Test public void testRemoveWatchInBackgroundWithCallback() throws Exception - { + { Timing timing = new Timing(); CuratorFrameworkImpl client = (CuratorFrameworkImpl)CuratorFrameworkFactory.builder(). - connectString(server.getConnectString()). - retryPolicy(new RetryOneTime(1)). - build(); + connectString(server.getConnectString()). + retryPolicy(new RetryOneTime(1)). + build(); try - { + { client.start(); - + //Make sure that the event fires on both the watcher and the callback. final CountDownLatch removedLatch = new CountDownLatch(2); final String path = "/"; Watcher watcher = new CountDownWatcher(path, removedLatch, EventType.DataWatchRemoved); - + BackgroundCallback callback = new BackgroundCallback() { - + @Override - public void processResult(CuratorFramework client, CuratorEvent event) - throws Exception + public void processResult(CuratorFramework client, CuratorEvent event) throws Exception { - if(event.getType() == CuratorEventType.REMOVE_WATCHES && event.getPath().equals(path)) { + if ( event.getType() == CuratorEventType.REMOVE_WATCHES && event.getPath().equals(path) ) + { removedLatch.countDown(); } } }; - + client.checkExists().usingWatcher(watcher).forPath(path); client.watches().remove(watcher).ofType(WatcherType.Any).inBackground(callback).forPath(path); Assert.assertTrue(timing.awaitLatch(removedLatch), "Timed out waiting for watch removal"); - + } finally { CloseableUtils.closeQuietly(client); } } - + @Test public void testRemoveWatchInBackgroundWithNoCallback() throws Exception - { + { Timing timing = new Timing(); CuratorFrameworkImpl client = (CuratorFrameworkImpl)CuratorFrameworkFactory.builder(). - connectString(server.getConnectString()). - retryPolicy(new RetryOneTime(1)). - build(); + connectString(server.getConnectString()). + retryPolicy(new RetryOneTime(1)). + build(); try { client.start(); - + final String path = "/"; final CountDownLatch removedLatch = new CountDownLatch(1); Watcher watcher = new CountDownWatcher(path, removedLatch, EventType.DataWatchRemoved); - + client.checkExists().usingWatcher(watcher).forPath(path); client.watches().remove(watcher).inBackground().forPath(path); Assert.assertTrue(timing.awaitLatch(removedLatch), "Timed out waiting for watch removal"); - + } finally { CloseableUtils.closeQuietly(client); } - } - + } + @Test public void testRemoveAllWatches() throws Exception - { + { Timing timing = new Timing(); CuratorFrameworkImpl client = (CuratorFrameworkImpl)CuratorFrameworkFactory.builder(). - connectString(server.getConnectString()). - retryPolicy(new RetryOneTime(1)). - build(); + connectString(server.getConnectString()). + retryPolicy(new RetryOneTime(1)). + build(); try { client.start(); - + final String path = "/"; final CountDownLatch removedLatch = new CountDownLatch(2); - - Watcher watcher1 = new CountDownWatcher(path, removedLatch, EventType.ChildWatchRemoved); - Watcher watcher2 = new CountDownWatcher(path, removedLatch, EventType.DataWatchRemoved); - + + Watcher watcher1 = new CountDownWatcher(path, removedLatch, EventType.ChildWatchRemoved); + Watcher watcher2 = new CountDownWatcher(path, removedLatch, EventType.DataWatchRemoved); + client.getChildren().usingWatcher(watcher1).forPath(path); client.checkExists().usingWatcher(watcher2).forPath(path); @@ -306,32 +308,32 @@ public class TestWatchesBuilder extends CuratorTestBase { CloseableUtils.closeQuietly(client); } - } - + } + @Test public void testRemoveAllDataWatches() throws Exception - { + { Timing timing = new Timing(); CuratorFramework client = CuratorFrameworkFactory.builder(). - connectString(server.getConnectString()). - retryPolicy(new RetryOneTime(1)). - build(); + connectString(server.getConnectString()). + retryPolicy(new RetryOneTime(1)). + build(); try { client.start(); - + final String path = "/"; final AtomicBoolean removedFlag = new AtomicBoolean(false); final CountDownLatch removedLatch = new CountDownLatch(1); - - Watcher watcher1 = new BooleanWatcher(path, removedFlag, EventType.ChildWatchRemoved); - Watcher watcher2 = new CountDownWatcher(path, removedLatch, EventType.DataWatchRemoved); - + + Watcher watcher1 = new BooleanWatcher(path, removedFlag, EventType.ChildWatchRemoved); + Watcher watcher2 = new CountDownWatcher(path, removedLatch, EventType.DataWatchRemoved); + client.getChildren().usingWatcher(watcher1).forPath(path); client.checkExists().usingWatcher(watcher2).forPath(path); - + client.watches().removeAll().ofType(WatcherType.Data).forPath(path); - + Assert.assertTrue(timing.awaitLatch(removedLatch), "Timed out waiting for watch removal"); Assert.assertEquals(removedFlag.get(), false); } @@ -340,31 +342,31 @@ public class TestWatchesBuilder extends CuratorTestBase CloseableUtils.closeQuietly(client); } } - + @Test public void testRemoveAllChildWatches() throws Exception - { + { Timing timing = new Timing(); CuratorFramework client = CuratorFrameworkFactory.builder(). - connectString(server.getConnectString()). - retryPolicy(new RetryOneTime(1)). - build(); + connectString(server.getConnectString()). + retryPolicy(new RetryOneTime(1)). + build(); try { client.start(); - + final String path = "/"; final AtomicBoolean removedFlag = new AtomicBoolean(false); final CountDownLatch removedLatch = new CountDownLatch(1); - - Watcher watcher1 = new BooleanWatcher(path, removedFlag, EventType.DataWatchRemoved); - Watcher watcher2 = new CountDownWatcher(path, removedLatch, EventType.ChildWatchRemoved); - + + Watcher watcher1 = new BooleanWatcher(path, removedFlag, EventType.DataWatchRemoved); + Watcher watcher2 = new CountDownWatcher(path, removedLatch, EventType.ChildWatchRemoved); + client.checkExists().usingWatcher(watcher1).forPath(path); client.getChildren().usingWatcher(watcher2).forPath(path); - + client.watches().removeAll().ofType(WatcherType.Children).forPath(path); - + Assert.assertTrue(timing.awaitLatch(removedLatch), "Timed out waiting for watch removal"); Assert.assertEquals(removedFlag.get(), false); } @@ -372,34 +374,35 @@ public class TestWatchesBuilder extends CuratorTestBase { CloseableUtils.closeQuietly(client); } - } - + } + @Test - public void testRemoveLocalWatch() throws Exception { + public void testRemoveLocalWatch() throws Exception + { Timing timing = new Timing(); CuratorFrameworkImpl client = (CuratorFrameworkImpl)CuratorFrameworkFactory.builder(). - connectString(server.getConnectString()). - retryPolicy(new RetryOneTime(1)). - build(); + connectString(server.getConnectString()). + retryPolicy(new RetryOneTime(1)). + build(); try { client.start(); - + AtomicReference<ConnectionState> stateRef = registerConnectionStateListener(client); - + final String path = "/"; - + final CountDownLatch removedLatch = new CountDownLatch(1); - - Watcher watcher = new CountDownWatcher(path, removedLatch, EventType.DataWatchRemoved); - + + Watcher watcher = new CountDownWatcher(path, removedLatch, EventType.DataWatchRemoved); + client.checkExists().usingWatcher(watcher).forPath(path); //Stop the server so we can check if we can remove watches locally when offline server.stop(); - + Assert.assertTrue(blockUntilDesiredConnectionState(stateRef, timing, ConnectionState.SUSPENDED)); - + client.watches().removeAll().locally().forPath(path); Assert.assertTrue(timing.awaitLatch(removedLatch), "Timed out waiting for watch removal"); @@ -409,33 +412,34 @@ public class TestWatchesBuilder extends CuratorTestBase CloseableUtils.closeQuietly(client); } } - + @Test - public void testRemoveLocalWatchInBackground() throws Exception { + public void testRemoveLocalWatchInBackground() throws Exception + { Timing timing = new Timing(); CuratorFrameworkImpl client = (CuratorFrameworkImpl)CuratorFrameworkFactory.builder(). - connectString(server.getConnectString()). - retryPolicy(new RetryOneTime(1)). - build(); + connectString(server.getConnectString()). + retryPolicy(new RetryOneTime(1)). + build(); try { client.start(); - + AtomicReference<ConnectionState> stateRef = registerConnectionStateListener(client); - + final String path = "/"; - + final CountDownLatch removedLatch = new CountDownLatch(1); - - Watcher watcher = new CountDownWatcher(path, removedLatch, EventType.DataWatchRemoved); - + + Watcher watcher = new CountDownWatcher(path, removedLatch, EventType.DataWatchRemoved); + client.checkExists().usingWatcher(watcher).forPath(path); //Stop the server so we can check if we can remove watches locally when offline server.stop(); - + Assert.assertTrue(blockUntilDesiredConnectionState(stateRef, timing, ConnectionState.SUSPENDED)); - + client.watches().removeAll().locally().inBackground().forPath(path); Assert.assertTrue(timing.awaitLatch(removedLatch), "Timed out waiting for watch removal"); @@ -444,8 +448,8 @@ public class TestWatchesBuilder extends CuratorTestBase { CloseableUtils.closeQuietly(client); } - } - + } + /** * Test the case where we try and remove an unregistered watcher. In this case we expect a NoWatcherException to * be thrown. @@ -455,21 +459,22 @@ public class TestWatchesBuilder extends CuratorTestBase public void testRemoveUnregisteredWatcher() throws Exception { CuratorFramework client = CuratorFrameworkFactory.builder(). - connectString(server.getConnectString()). - retryPolicy(new RetryOneTime(1)). - build(); + connectString(server.getConnectString()). + retryPolicy(new RetryOneTime(1)). + build(); try { client.start(); - - final String path = "/"; - Watcher watcher = new Watcher() { + + final String path = "/"; + Watcher watcher = new Watcher() + { @Override public void process(WatchedEvent event) { - } + } }; - + try { client.watches().remove(watcher).forPath(path); @@ -485,7 +490,7 @@ public class TestWatchesBuilder extends CuratorTestBase CloseableUtils.closeQuietly(client); } } - + /** * Test the case where we try and remove an unregistered watcher but have the quietly flag set. In this case we expect success. * @throws Exception @@ -495,22 +500,22 @@ public class TestWatchesBuilder extends CuratorTestBase { Timing timing = new Timing(); CuratorFramework client = CuratorFrameworkFactory.builder(). - connectString(server.getConnectString()). - retryPolicy(new RetryOneTime(1)). - build(); + connectString(server.getConnectString()). + retryPolicy(new RetryOneTime(1)). + build(); try { client.start(); - + final AtomicBoolean watcherRemoved = new AtomicBoolean(false); - - final String path = "/"; + + final String path = "/"; Watcher watcher = new BooleanWatcher(path, watcherRemoved, EventType.DataWatchRemoved); - + client.watches().remove(watcher).quietly().forPath(path); - + timing.sleepABit(); - + //There should be no watcher removed as none were registered. Assert.assertEquals(watcherRemoved.get(), false); } @@ -519,94 +524,94 @@ public class TestWatchesBuilder extends CuratorTestBase CloseableUtils.closeQuietly(client); } } - + @Test - public void testGuaranteedRemoveWatch() throws Exception { + public void testGuaranteedRemoveWatch() throws Exception + { Timing timing = new Timing(); CuratorFramework client = CuratorFrameworkFactory.builder(). - connectString(server.getConnectString()). - retryPolicy(new RetryOneTime(1)). - build(); + connectString(server.getConnectString()). + retryPolicy(new RetryOneTime(1)). + build(); try { client.start(); - + AtomicReference<ConnectionState> stateRef = registerConnectionStateListener(client); - + String path = "/"; - + CountDownLatch removeLatch = new CountDownLatch(1); - - Watcher watcher = new CountDownWatcher(path, removeLatch, EventType.DataWatchRemoved); + + Watcher watcher = new CountDownWatcher(path, removeLatch, EventType.DataWatchRemoved); client.checkExists().usingWatcher(watcher).forPath(path); - - server.stop(); - + + server.stop(); + Assert.assertTrue(blockUntilDesiredConnectionState(stateRef, timing, ConnectionState.SUSPENDED)); - + //Remove the watch while we're not connected - try + try { client.watches().remove(watcher).guaranteed().forPath(path); Assert.fail(); } - catch(KeeperException.ConnectionLossException e) + catch ( KeeperException.ConnectionLossException e ) { //Expected } - + server.restart(); - - timing.awaitLatch(removeLatch); + + timing.awaitLatch(removeLatch); } finally { CloseableUtils.closeQuietly(client); } } - + @Test - public void testGuaranteedRemoveWatchInBackground() throws Exception { + public void testGuaranteedRemoveWatchInBackground() throws Exception + { Timing timing = new Timing(); - CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), timing.connection(), - new ExponentialBackoffRetry(100, 3)); + CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), timing.connection(), new ExponentialBackoffRetry(100, 3)); try { client.start(); - + AtomicReference<ConnectionState> stateRef = registerConnectionStateListener(client); - + final CountDownLatch guaranteeAddedLatch = new CountDownLatch(1); - + ((CuratorFrameworkImpl)client).getFailedRemoveWatcherManager().debugListener = new FailedOperationManager.FailedOperationManagerListener<FailedRemoveWatchManager.FailedRemoveWatchDetails>() { @Override - public void pathAddedForGuaranteedOperation( - FailedRemoveWatchDetails detail) + public void pathAddedForGuaranteedOperation(FailedRemoveWatchDetails detail) { guaranteeAddedLatch.countDown(); } }; - + String path = "/"; - + CountDownLatch removeLatch = new CountDownLatch(1); - - Watcher watcher = new CountDownWatcher(path, removeLatch, EventType.DataWatchRemoved); + + Watcher watcher = new CountDownWatcher(path, removeLatch, EventType.DataWatchRemoved); client.checkExists().usingWatcher(watcher).forPath(path); - - server.stop(); + + server.stop(); Assert.assertTrue(blockUntilDesiredConnectionState(stateRef, timing, ConnectionState.SUSPENDED)); - + //Remove the watch while we're not connected client.watches().remove(watcher).guaranteed().inBackground().forPath(path); - + timing.awaitLatch(guaranteeAddedLatch); - + server.restart(); - - timing.awaitLatch(removeLatch); + + timing.awaitLatch(removeLatch); } finally { @@ -617,7 +622,7 @@ public class TestWatchesBuilder extends CuratorTestBase @Test(groups = CuratorTestBase.zk36Group) public void testPersistentRecursiveWatch() throws Exception { - try ( CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new RetryOneTime(1)) ) + try (CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new RetryOneTime(1))) { client.start(); client.blockUntilConnected(); @@ -647,7 +652,7 @@ public class TestWatchesBuilder extends CuratorTestBase }; return new ZooKeeper(connectString, sessionTimeout, actualWatcher); }; - try (CuratorFramework client = CuratorFrameworkFactory.builder().connectString(server.getConnectString()).retryPolicy(new RetryOneTime(1)).zookeeperFactory(zookeeperFactory).build() ) + try (CuratorFramework client = CuratorFrameworkFactory.builder().connectString(server.getConnectString()).retryPolicy(new RetryOneTime(1)).zookeeperFactory(zookeeperFactory).build()) { client.start(); client.blockUntilConnected(); @@ -664,51 +669,59 @@ public class TestWatchesBuilder extends CuratorTestBase } } - private static class CountDownWatcher implements Watcher { + private static class CountDownWatcher implements Watcher + { private String path; private EventType eventType; private CountDownLatch removeLatch; - - public CountDownWatcher(String path, CountDownLatch removeLatch, EventType eventType) { + + public CountDownWatcher(String path, CountDownLatch removeLatch, EventType eventType) + { this.path = path; this.eventType = eventType; - this.removeLatch = removeLatch; + this.removeLatch = removeLatch; } - + @Override public void process(WatchedEvent event) { - if(event.getPath() == null || event.getType() == null) { + if ( event.getPath() == null || event.getType() == null ) + { return; } - - if(event.getPath().equals(path) && event.getType() == eventType) { + + if ( event.getPath().equals(path) && event.getType() == eventType ) + { removeLatch.countDown(); } - } + } } - - private static class BooleanWatcher implements Watcher { + + private static class BooleanWatcher implements Watcher + { private String path; private EventType eventType; private AtomicBoolean removedFlag; - - public BooleanWatcher(String path, AtomicBoolean removedFlag, EventType eventType) { + + public BooleanWatcher(String path, AtomicBoolean removedFlag, EventType eventType) + { this.path = path; this.eventType = eventType; - this.removedFlag = removedFlag; + this.removedFlag = removedFlag; } - + @Override public void process(WatchedEvent event) { - if(event.getPath() == null || event.getType() == null) { + if ( event.getPath() == null || event.getType() == null ) + { return; } - - if(event.getPath().equals(path) && event.getType() == eventType) { + + if ( event.getPath().equals(path) && event.getType() == eventType ) + { removedFlag.set(true); } - } - } + } + } }
