This is an automated email from the ASF dual-hosted git repository. randgalt pushed a commit to branch persistent-watcher-recipe in repository https://gitbox.apache.org/repos/asf/curator.git
commit 306846871cdf7e7c42aeb96bd54e14ad98348c6d Author: randgalt <[email protected]> AuthorDate: Wed Oct 2 23:24:14 2019 -0500 Added support for a PersistentWatcher recipe based on new persistent watch APIs --- .../framework/recipes/watch/PersistentWatcher.java | 135 +++++++++++++++++++++ .../recipes/watch/TestPersistentWatcher.java | 86 +++++++++++++ 2 files changed, 221 insertions(+) diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/PersistentWatcher.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/PersistentWatcher.java new file mode 100644 index 0000000..3fc05c9 --- /dev/null +++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/PersistentWatcher.java @@ -0,0 +1,135 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + package org.apache.curator.framework.recipes.watch; + +import com.google.common.base.Preconditions; +import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.framework.api.AddPersistentWatchBuilder2; +import org.apache.curator.framework.api.BackgroundCallback; +import org.apache.curator.framework.listen.Listenable; +import org.apache.curator.framework.listen.StandardListenerManager; +import org.apache.curator.framework.state.ConnectionStateListener; +import org.apache.curator.utils.ThreadUtils; +import org.apache.zookeeper.KeeperException; +import org.apache.zookeeper.Watcher; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import java.io.Closeable; +import java.util.Objects; +import java.util.concurrent.atomic.AtomicReference; + +/** + * A managed persistent watcher. The watch will be managed such that it stays set through + * connection lapses, etc. + */ + public class PersistentWatcher implements Closeable + { + private final Logger log = LoggerFactory.getLogger(getClass()); + private final AtomicReference<State> state = new AtomicReference<>(State.LATENT); + private final StandardListenerManager<Watcher> listeners = StandardListenerManager.standard(); + private final ConnectionStateListener connectionStateListener = (client, newState) -> { + if ( newState.isConnected() ) + { + reset(); + } + }; + private final Watcher watcher = event -> listeners.forEach(w -> w.process(event)); + private final CuratorFramework client; + private final String basePath; + private final boolean recursive; + + private enum State + { + LATENT, + STARTED, + CLOSED + } + + /** + * @param client client + * @param basePath path to set the watch on + * @param recursive ZooKeeper persistent watches can optionally be recursive + */ + public PersistentWatcher(CuratorFramework client, String basePath, boolean recursive) + { + this.client = Objects.requireNonNull(client, "client cannot be null"); + this.basePath = Objects.requireNonNull(basePath, "basePath cannot be null"); + this.recursive = recursive; + } + + /** + * Start watching + */ + public void start() + { + Preconditions.checkState(state.compareAndSet(State.LATENT, State.STARTED), "Already started"); + client.getConnectionStateListenable().addListener(connectionStateListener); + reset(); + } + + /** + * Remove the watcher + */ + @Override + public void close() + { + if ( state.compareAndSet(State.STARTED, State.CLOSED) ) + { + listeners.clear(); + client.getConnectionStateListenable().removeListener(connectionStateListener); + try + { + client.watches().remove(watcher).guaranteed().inBackground().forPath(basePath); + } + catch ( Exception e ) + { + ThreadUtils.checkInterrupted(e); + log.debug(String.format("Could not remove watcher for path: %s", basePath), e); + } + } + } + + /** + * Container for setting listeners + * + * @return listener container + */ + public Listenable<Watcher> getListenable() + { + return listeners; + } + + private void reset() + { + try + { + AddPersistentWatchBuilder2 builder = recursive ? client.addPersistentWatch().recursive() : client.addPersistentWatch(); + BackgroundCallback callback = (__, event) -> { + if ( event.getResultCode() != KeeperException.Code.OK.intValue() ) { + client.runSafe(this::reset); + } + }; + builder.inBackground(callback).usingWatcher(watcher).forPath(basePath); + } + catch ( Exception e ) + { + log.error("Could not reset persistent watch at path: " + basePath, e); + } + } + } \ No newline at end of file diff --git a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/watch/TestPersistentWatcher.java b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/watch/TestPersistentWatcher.java new file mode 100644 index 0000000..df18de5 --- /dev/null +++ b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/watch/TestPersistentWatcher.java @@ -0,0 +1,86 @@ +package org.apache.curator.framework.recipes.watch; + +import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.framework.CuratorFrameworkFactory; +import org.apache.curator.framework.state.ConnectionState; +import org.apache.curator.retry.RetryOneTime; +import org.apache.curator.test.BaseClassForTests; +import org.apache.curator.test.compatibility.Timing2; +import org.apache.zookeeper.WatchedEvent; +import org.apache.zookeeper.Watcher; +import org.testng.Assert; +import org.testng.annotations.Test; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.LinkedBlockingQueue; + +public class TestPersistentWatcher extends BaseClassForTests +{ + private final Timing2 timing = new Timing2(); + + @Test + public void testConnectionLostRecursive() throws Exception + { + internalTest(true); + } + + @Test + public void testConnectionLost() throws Exception + { + internalTest(false); + } + + private void internalTest(boolean recursive) throws Exception + { + try ( CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), timing.connection(), new RetryOneTime(1)) ) + { + CountDownLatch lostLatch = new CountDownLatch(1); + CountDownLatch reconnectedLatch = new CountDownLatch(1); + client.start(); + client.getConnectionStateListenable().addListener((__, newState) -> { + if ( newState == ConnectionState.LOST ) { + lostLatch.countDown(); + } else if ( newState == ConnectionState.RECONNECTED ) { + reconnectedLatch.countDown(); + } + }); + + try ( PersistentWatcher persistentWatcher = new PersistentWatcher(client, "/top/main", recursive) ) + { + persistentWatcher.start(); + + BlockingQueue<WatchedEvent> events = new LinkedBlockingQueue<>(); + persistentWatcher.getListenable().addListener(events::add); + + client.create().creatingParentsIfNeeded().forPath("/top/main/a"); + Assert.assertEquals(timing.takeFromQueue(events).getPath(), "/top/main"); + if ( recursive ) + { + Assert.assertEquals(timing.takeFromQueue(events).getPath(), "/top/main/a"); + } + else + { + Assert.assertEquals(timing.takeFromQueue(events).getPath(), "/top/main"); // child added + } + + server.stop(); + Assert.assertEquals(timing.takeFromQueue(events).getState(), Watcher.Event.KeeperState.Disconnected); + Assert.assertTrue(timing.awaitLatch(lostLatch)); + + server.restart(); + Assert.assertTrue(timing.awaitLatch(reconnectedLatch)); + + timing.sleepABit(); // time to allow watcher to get reset + events.clear(); + + if ( recursive ) + { + client.setData().forPath("/top/main/a", "foo".getBytes()); + Assert.assertEquals(timing.takeFromQueue(events).getType(), Watcher.Event.EventType.NodeDataChanged); + } + client.setData().forPath("/top/main", "bar".getBytes()); + Assert.assertEquals(timing.takeFromQueue(events).getPath(), "/top/main"); + } + } + } +}
