This is an automated email from the ASF dual-hosted git repository. randgalt pushed a commit to branch persistent-watcher-recipe in repository https://gitbox.apache.org/repos/asf/curator.git
commit 55938084e46950578b2466b7dfe689dd2a77d3c4 Author: randgalt <[email protected]> AuthorDate: Wed Oct 2 23:24:14 2019 -0500 Added support for a PersistentWatcher recipe based on new persistent watch APIs --- .../imps/AddPersistentWatchBuilderImpl.java | 169 --------------------- .../framework/recipes/watch/PersistentWatcher.java | 134 ++++++++++++++++ .../recipes/watch/TestPersistentWatcher.java | 86 +++++++++++ 3 files changed, 220 insertions(+), 169 deletions(-) diff --git a/curator-framework/src/main/java/org/apache/curator/framework/imps/AddPersistentWatchBuilderImpl.java b/curator-framework/src/main/java/org/apache/curator/framework/imps/AddPersistentWatchBuilderImpl.java deleted file mode 100644 index acb70c8..0000000 --- a/curator-framework/src/main/java/org/apache/curator/framework/imps/AddPersistentWatchBuilderImpl.java +++ /dev/null @@ -1,169 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.curator.framework.imps; - -import org.apache.curator.RetryLoop; -import org.apache.curator.drivers.OperationTrace; -import org.apache.curator.framework.api.AddPersistentWatchBuilder; -import org.apache.curator.framework.api.AddPersistentWatchBuilder2; -import org.apache.curator.framework.api.AddPersistentWatchable; -import org.apache.curator.framework.api.BackgroundCallback; -import org.apache.curator.framework.api.CuratorEvent; -import org.apache.curator.framework.api.CuratorEventType; -import org.apache.curator.framework.api.CuratorWatcher; -import org.apache.curator.framework.api.Pathable; -import org.apache.zookeeper.Watcher; -import java.util.concurrent.Executor; - -public class AddPersistentWatchBuilderImpl implements AddPersistentWatchBuilder, Pathable<Void>, BackgroundOperation<String> -{ - private final CuratorFrameworkImpl client; - private Watching watching = null; - private Backgrounding backgrounding = new Backgrounding(); - private boolean recursive = false; - - AddPersistentWatchBuilderImpl(CuratorFrameworkImpl client) - { - this.client = client; - } - - public AddPersistentWatchBuilderImpl(CuratorFrameworkImpl client, Watching watching, Backgrounding backgrounding, boolean recursive) - { - this.client = client; - this.watching = watching; - this.backgrounding = backgrounding; - this.recursive = recursive; - } - - @Override - public AddPersistentWatchable<Pathable<Void>> inBackground() - { - backgrounding = new Backgrounding(); - return this; - } - - @Override - public AddPersistentWatchBuilder2 recursive() - { - recursive = true; - return this; - } - - @Override - public Pathable<Void> usingWatcher(Watcher watcher) - { - watching = new Watching(client, watcher); - return this; - } - - @Override - public Pathable<Void> usingWatcher(CuratorWatcher watcher) - { - watching = new Watching(client, watcher); - return this; - } - - @Override - public AddPersistentWatchable<Pathable<Void>> inBackground(Object context) - { - backgrounding = new Backgrounding(context); - return this; - } - - @Override - public AddPersistentWatchable<Pathable<Void>> inBackground(BackgroundCallback callback) - { - backgrounding = new Backgrounding(callback); - return this; - } - - @Override - public AddPersistentWatchable<Pathable<Void>> inBackground(BackgroundCallback callback, Object context) - { - backgrounding = new Backgrounding(callback, context); - return this; - } - - @Override - public AddPersistentWatchable<Pathable<Void>> inBackground(BackgroundCallback callback, Executor executor) - { - backgrounding = new Backgrounding(callback, executor); - return this; - } - - @Override - public AddPersistentWatchable<Pathable<Void>> inBackground(BackgroundCallback callback, Object context, Executor executor) - { - backgrounding = new Backgrounding(client, callback, context, executor); - return this; - } - - @Override - public Void forPath(String path) throws Exception - { - if ( backgrounding.inBackground() ) - { - client.processBackgroundOperation(new OperationAndData<>(this, path, backgrounding.getCallback(), null, backgrounding.getContext(), watching), null); - } - else - { - pathInForeground(path); - } - return null; - } - - @Override - public void performBackgroundOperation(final OperationAndData<String> data) throws Exception - { - String path = data.getData(); - String fixedPath = client.fixForNamespace(path); - try - { - final OperationTrace trace = client.getZookeeperClient().startAdvancedTracer("AddPersistentWatchBuilderImpl-Background"); - client.getZooKeeper().addPersistentWatch - ( - fixedPath, - watching.getWatcher(path), - recursive, (rc, path1, ctx) -> { - trace.setReturnCode(rc).setWithWatcher(true).setPath(path1).commit(); - CuratorEvent event = new CuratorEventImpl(client, CuratorEventType.ADD_PERSISTENT_WATCH, rc, path1, null, ctx, null, null, null, null, null, null); - client.processBackgroundOperation(data, event); - }, - backgrounding.getContext() - ); - } - catch ( Throwable e ) - { - backgrounding.checkError(e, watching); - } - } - - private void pathInForeground(final String path) throws Exception - { - final String fixedPath = client.fixForNamespace(path); - OperationTrace trace = client.getZookeeperClient().startAdvancedTracer("AddPersistentWatchBuilderImpl-Foreground"); - RetryLoop.callWithRetry - ( - client.getZookeeperClient(), () -> { - client.getZooKeeper().addPersistentWatch(fixedPath, watching.getWatcher(path), recursive); - return null; - }); - trace.setPath(fixedPath).setWithWatcher(true).commit(); - } -} \ No newline at end of file diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/PersistentWatcher.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/PersistentWatcher.java new file mode 100644 index 0000000..2c97490 --- /dev/null +++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/PersistentWatcher.java @@ -0,0 +1,134 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + package org.apache.curator.framework.recipes.watch; + +import com.google.common.base.Preconditions; +import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.framework.api.BackgroundCallback; +import org.apache.curator.framework.listen.Listenable; +import org.apache.curator.framework.listen.StandardListenerManager; +import org.apache.curator.framework.state.ConnectionStateListener; +import org.apache.curator.utils.ThreadUtils; +import org.apache.zookeeper.AddWatchMode; +import org.apache.zookeeper.KeeperException; +import org.apache.zookeeper.Watcher; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import java.io.Closeable; +import java.util.Objects; +import java.util.concurrent.atomic.AtomicReference; + +/** + * A managed persistent watcher. The watch will be managed such that it stays set through + * connection lapses, etc. + */ + public class PersistentWatcher implements Closeable + { + private final Logger log = LoggerFactory.getLogger(getClass()); + private final AtomicReference<State> state = new AtomicReference<>(State.LATENT); + private final StandardListenerManager<Watcher> listeners = StandardListenerManager.standard(); + private final ConnectionStateListener connectionStateListener = (client, newState) -> { + if ( newState.isConnected() ) + { + reset(); + } + }; + private final Watcher watcher = event -> listeners.forEach(w -> w.process(event)); + private final CuratorFramework client; + private final String basePath; + private final boolean recursive; + + private enum State + { + LATENT, + STARTED, + CLOSED + } + + /** + * @param client client + * @param basePath path to set the watch on + * @param recursive ZooKeeper persistent watches can optionally be recursive + */ + public PersistentWatcher(CuratorFramework client, String basePath, boolean recursive) + { + this.client = Objects.requireNonNull(client, "client cannot be null"); + this.basePath = Objects.requireNonNull(basePath, "basePath cannot be null"); + this.recursive = recursive; + } + + /** + * Start watching + */ + public void start() + { + Preconditions.checkState(state.compareAndSet(State.LATENT, State.STARTED), "Already started"); + client.getConnectionStateListenable().addListener(connectionStateListener); + reset(); + } + + /** + * Remove the watcher + */ + @Override + public void close() + { + if ( state.compareAndSet(State.STARTED, State.CLOSED) ) + { + listeners.clear(); + client.getConnectionStateListenable().removeListener(connectionStateListener); + try + { + client.watches().remove(watcher).guaranteed().inBackground().forPath(basePath); + } + catch ( Exception e ) + { + ThreadUtils.checkInterrupted(e); + log.debug(String.format("Could not remove watcher for path: %s", basePath), e); + } + } + } + + /** + * Container for setting listeners + * + * @return listener container + */ + public Listenable<Watcher> getListenable() + { + return listeners; + } + + private void reset() + { + try + { + BackgroundCallback callback = (__, event) -> { + if ( event.getResultCode() != KeeperException.Code.OK.intValue() ) { + client.runSafe(this::reset); + } + }; + client.addWatch().withMode(recursive ? AddWatchMode.PERSISTENT_RECURSIVE : AddWatchMode.PERSISTENT).inBackground(callback).usingWatcher(watcher).forPath(basePath); + } + catch ( Exception e ) + { + log.error("Could not reset persistent watch at path: " + basePath, e); + } + } + } \ No newline at end of file diff --git a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/watch/TestPersistentWatcher.java b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/watch/TestPersistentWatcher.java new file mode 100644 index 0000000..df18de5 --- /dev/null +++ b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/watch/TestPersistentWatcher.java @@ -0,0 +1,86 @@ +package org.apache.curator.framework.recipes.watch; + +import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.framework.CuratorFrameworkFactory; +import org.apache.curator.framework.state.ConnectionState; +import org.apache.curator.retry.RetryOneTime; +import org.apache.curator.test.BaseClassForTests; +import org.apache.curator.test.compatibility.Timing2; +import org.apache.zookeeper.WatchedEvent; +import org.apache.zookeeper.Watcher; +import org.testng.Assert; +import org.testng.annotations.Test; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.LinkedBlockingQueue; + +public class TestPersistentWatcher extends BaseClassForTests +{ + private final Timing2 timing = new Timing2(); + + @Test + public void testConnectionLostRecursive() throws Exception + { + internalTest(true); + } + + @Test + public void testConnectionLost() throws Exception + { + internalTest(false); + } + + private void internalTest(boolean recursive) throws Exception + { + try ( CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), timing.connection(), new RetryOneTime(1)) ) + { + CountDownLatch lostLatch = new CountDownLatch(1); + CountDownLatch reconnectedLatch = new CountDownLatch(1); + client.start(); + client.getConnectionStateListenable().addListener((__, newState) -> { + if ( newState == ConnectionState.LOST ) { + lostLatch.countDown(); + } else if ( newState == ConnectionState.RECONNECTED ) { + reconnectedLatch.countDown(); + } + }); + + try ( PersistentWatcher persistentWatcher = new PersistentWatcher(client, "/top/main", recursive) ) + { + persistentWatcher.start(); + + BlockingQueue<WatchedEvent> events = new LinkedBlockingQueue<>(); + persistentWatcher.getListenable().addListener(events::add); + + client.create().creatingParentsIfNeeded().forPath("/top/main/a"); + Assert.assertEquals(timing.takeFromQueue(events).getPath(), "/top/main"); + if ( recursive ) + { + Assert.assertEquals(timing.takeFromQueue(events).getPath(), "/top/main/a"); + } + else + { + Assert.assertEquals(timing.takeFromQueue(events).getPath(), "/top/main"); // child added + } + + server.stop(); + Assert.assertEquals(timing.takeFromQueue(events).getState(), Watcher.Event.KeeperState.Disconnected); + Assert.assertTrue(timing.awaitLatch(lostLatch)); + + server.restart(); + Assert.assertTrue(timing.awaitLatch(reconnectedLatch)); + + timing.sleepABit(); // time to allow watcher to get reset + events.clear(); + + if ( recursive ) + { + client.setData().forPath("/top/main/a", "foo".getBytes()); + Assert.assertEquals(timing.takeFromQueue(events).getType(), Watcher.Event.EventType.NodeDataChanged); + } + client.setData().forPath("/top/main", "bar".getBytes()); + Assert.assertEquals(timing.takeFromQueue(events).getPath(), "/top/main"); + } + } + } +}
