This is an automated email from the ASF dual-hosted git repository. randgalt pushed a commit to branch persistent-watcher-cache in repository https://gitbox.apache.org/repos/asf/curator.git
commit a83efbff780f12df5f78d4fce5a49225963e1dfd Author: randgalt <[email protected]> AuthorDate: Mon Oct 7 15:48:57 2019 +0300 wip --- .../framework/recipes/cache/CuratorCache.java | 141 +++++++++ .../framework/recipes/cache/CuratorCacheImpl.java | 273 ++++++++++++++++++ .../recipes/cache/CuratorCacheListener.java | 141 +++++++++ .../recipes/cache/CuratorCacheStorage.java | 117 ++++++++ .../curator/framework/recipes/cache/NodeCache.java | 3 + .../recipes/cache/NodeCacheListenerWrapper.java | 47 +++ .../framework/recipes/cache/PathChildrenCache.java | 3 + .../cache/PathChildrenCacheListenerWrapper.java | 71 +++++ .../recipes/cache/StandardCuratorCacheStorage.java | 98 +++++++ .../curator/framework/recipes/cache/TreeCache.java | 3 + .../recipes/cache/TreeCacheListenerWrapper.java | 71 +++++ .../framework/recipes/watch/PersistentWatcher.java | 39 ++- .../framework/recipes/cache/TestCuratorCache.java | 112 ++++++++ .../recipes/cache/TestCuratorCacheConsistency.java | 317 +++++++++++++++++++++ .../recipes/cache/TestWrappedNodeCache.java | 165 +++++++++++ .../recipes/watch/TestPersistentWatcher.java | 20 ++ .../org/apache/curator/test/TestingCluster.java | 2 +- .../x/async/api/AsyncPersistentWatchBuilder.java | 18 ++ .../details/AsyncPersistentWatchBuilderImpl.java | 75 ----- 19 files changed, 1638 insertions(+), 78 deletions(-) diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/CuratorCache.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/CuratorCache.java new file mode 100644 index 0000000..44465c8 --- /dev/null +++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/CuratorCache.java @@ -0,0 +1,141 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.curator.framework.recipes.cache; + +import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.framework.listen.Listenable; +import java.io.Closeable; +import java.util.Optional; +import java.util.function.Consumer; + +/** + * <p> + * A utility that attempts to keep the data from a node locally cached. Optionally the entire + * tree of children below the node can also be cached. Will respond to update/create/delete events, pull + * down the data, etc. You can register listeners that will get notified when changes occur. + * </p> + * + * <p> + * <b>IMPORTANT</b> - it's not possible to stay transactionally in sync. Users of this class must + * be prepared for false-positives and false-negatives. Additionally, always use the version number + * when updating data to avoid overwriting another process' change. + * </p> + */ +public interface CuratorCache extends Closeable +{ + /** + * cache build options + */ + enum Options + { + /** + * Cache the entire tree of nodes starting at the given node + */ + RECURSIVE, + + /** + * Decompress data via {@link org.apache.curator.framework.api.GetDataBuilder#decompressed()} + */ + COMPRESSED_DATA + } + + /** + * Return a Curator cache for the given path with the given options using a standard storage instance + * + * @param client Curator client + * @param path path to cache + * @param options any options + * @return cache (note it must be started via {@link #start()} + */ + static CuratorCache build(CuratorFramework client, String path, Options... options) + { + return build(client, CuratorCacheStorage.standard(), path, options); + } + + /** + * Return a Curator cache for the given path with the given options and the given storage instance + * + * @param client Curator client + * @param storage storage to use + * @param path path to cache + * @param options any options + * @return cache (note it must be started via {@link #start()} + */ + static CuratorCache build(CuratorFramework client, CuratorCacheStorage storage, String path, Options... options) + { + return new CuratorCacheImpl(client, storage, path, options); + } + + /** + * Start the cache. This will cause a complete refresh from the cache's root node and generate + * events for all nodes found, etc. + */ + void start(); + + /** + * Close the cache, stop responding to events, etc. Note: also calls {@link CuratorCacheStorage#close()} + */ + @Override + void close(); + + /** + * Utility to force a rebuild of the cache. Normally, this should not ever be needed + */ + void forceRebuild(); + + /** + * Return the storage instance being used + * + * @return storage + */ + CuratorCacheStorage storage(); + + /** + * Return the root node being cached (i.e. the node passed to the builder) + * + * @return root node path + */ + String getRootPath(); + + /** + * Convenience to return the root node data + * + * @return data (if it's in the cache) + */ + default Optional<ChildData> getRootData() + { + return storage().get(getRootPath()); + } + + /** + * Return the listener container so that listeners can be registered to be notified of changes to the cache + * + * @return listener container + */ + Listenable<CuratorCacheListener> listenable(); + + /** + * By default any unexpected exception is handled by logging the exception. You can change + * so that a handler is called instead. Under normal circumstances, this shouldn't be necessary. + * + * @param newHandler new exception handler or {@code null} to reset to the default logging + */ + void setExceptionHandler(Consumer<Exception> newHandler); +} diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/CuratorCacheImpl.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/CuratorCacheImpl.java new file mode 100644 index 0000000..a31f841 --- /dev/null +++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/CuratorCacheImpl.java @@ -0,0 +1,273 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.curator.framework.recipes.cache; + +import com.google.common.base.Preconditions; +import com.google.common.collect.Sets; +import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.framework.api.BackgroundCallback; +import org.apache.curator.framework.api.CuratorEvent; +import org.apache.curator.framework.listen.Listenable; +import org.apache.curator.framework.listen.StandardListenerManager; +import org.apache.curator.framework.recipes.watch.PersistentWatcher; +import org.apache.curator.utils.ThreadUtils; +import org.apache.curator.utils.ZKPaths; +import org.apache.zookeeper.KeeperException; +import org.apache.zookeeper.WatchedEvent; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import java.util.Collections; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Consumer; + +import static org.apache.zookeeper.KeeperException.Code.NONODE; +import static org.apache.zookeeper.KeeperException.Code.OK; + +class CuratorCacheImpl implements CuratorCache +{ + private final Logger log = LoggerFactory.getLogger(getClass()); + private final AtomicReference<State> state = new AtomicReference<>(State.LATENT); + private final PersistentWatcher persistentWatcher; + private final CuratorFramework client; + private final CuratorCacheStorage storage; + private final String path; + private final boolean recursive; + private final boolean compressedData; + private final StandardListenerManager<CuratorCacheListener> listenerManager = StandardListenerManager.standard(); + private volatile Consumer<Exception> exceptionHandler; + + private enum State + { + LATENT, + STARTED, + CLOSED + } + + CuratorCacheImpl(CuratorFramework client, CuratorCacheStorage storage, String path, Options... optionsArg) + { + Set<Options> options = (optionsArg != null) ? Sets.newHashSet(optionsArg) : Collections.emptySet(); + this.client = client; + this.storage = storage; + this.path = path; + this.recursive = options.contains(Options.RECURSIVE); + this.compressedData = options.contains(Options.COMPRESSED_DATA); + persistentWatcher = new PersistentWatcher(client, path, recursive); + persistentWatcher.getListenable().addListener(this::processEvent); + persistentWatcher.getResetListenable().addListener(this::forceRebuild); + setExceptionHandler(null); + } + + @Override + public void start() + { + Preconditions.checkState(state.compareAndSet(State.LATENT, State.STARTED), "Already started"); + persistentWatcher.start(); + } + + @Override + public void close() + { + if ( state.compareAndSet(State.STARTED, State.CLOSED) ) + { + persistentWatcher.close(); + storage.close(); + } + } + + @Override + public void forceRebuild() + { + if ( state.get() != State.STARTED ) + { + return; + } + + nodeChanged(path); + storage.stream() + .map(ChildData::getPath) + .filter(p -> !p.equals(path)) + .forEach(this::nodeChanged); + } + + @Override + public String getRootPath() + { + return path; + } + + @Override + public CuratorCacheStorage storage() + { + return storage; + } + + @Override + public Listenable<CuratorCacheListener> listenable() + { + return listenerManager; + } + + @Override + public void setExceptionHandler(Consumer<Exception> newHandler) + { + this.exceptionHandler = (newHandler != null) ? newHandler : e -> log.error("CuratorCache error", e); + } + + private void processEvent(WatchedEvent event) + { + if ( state.get() != State.STARTED ) + { + return; + } + + switch ( event.getType() ) + { + case NodeDataChanged: + case NodeCreated: + { + nodeChanged(event.getPath()); + break; + } + + case NodeDeleted: + { + removeStorage(event.getPath()); + break; + } + + case NodeChildrenChanged: + { + nodeChildrenChanged(event.getPath()); + break; + } + } + } + + private void nodeChildrenChanged(String fromPath) + { + if ( (state.get() != State.STARTED) || !recursive ) + { + return; + } + + try + { + BackgroundCallback callback = (__, event) -> { + if ( event.getResultCode() == OK.intValue() ) + { + event.getChildren().forEach(child -> nodeChanged(ZKPaths.makePath(fromPath, child))); + } + else if ( event.getResultCode() == NONODE.intValue() ) + { + removeStorage(event.getPath()); + } + else + { + handleException(event); + } + }; + client.getChildren().inBackground(callback).forPath(fromPath); + } + catch ( Exception e ) + { + handleException(e); + } + } + + private void nodeChanged(String fromPath) + { + if ( state.get() != State.STARTED ) + { + return; + } + + try + { + BackgroundCallback callback = (__, event) -> { + if ( event.getResultCode() == OK.intValue() ) + { + putStorage(new ChildData(event.getPath(), event.getStat(), event.getData())); + nodeChildrenChanged(event.getPath()); + } + else if ( event.getResultCode() == NONODE.intValue() ) + { + removeStorage(event.getPath()); + } + else + { + handleException(event); + } + }; + if ( compressedData ) + { + client.getData().decompressed().inBackground(callback).forPath(fromPath); + } + else + { + client.getData().inBackground(callback).forPath(fromPath); + } + } + catch ( Exception e ) + { + handleException(e); + } + } + + private void putStorage(ChildData data) + { + Optional<ChildData> previousData = storage.put(data); + if ( previousData.isPresent() ) + { + if ( previousData.get().getStat().getMzxid() != data.getStat().getMzxid() ) + { + callListeners(l -> l.nodeChanged(previousData.get(), data)); + } + } + else + { + callListeners(l -> l.nodeChanged(null, data)); + } + } + + private void removeStorage(String path) + { + storage.remove(path).ifPresent(previousData -> callListeners(l -> l.nodeChanged(previousData, null))); + } + + private void callListeners(Consumer<CuratorCacheListener> proc) + { + if ( state.get() == State.STARTED ) + { + client.runSafe(() -> listenerManager.forEach(proc)); + } + } + + private void handleException(CuratorEvent event) + { + handleException(KeeperException.create(KeeperException.Code.get(event.getResultCode()))); + } + + private void handleException(Exception e) + { + ThreadUtils.checkInterrupted(e); + exceptionHandler.accept(e); + } +} diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/CuratorCacheListener.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/CuratorCacheListener.java new file mode 100644 index 0000000..e26ac92 --- /dev/null +++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/CuratorCacheListener.java @@ -0,0 +1,141 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.curator.framework.recipes.cache; + +import org.apache.curator.framework.CuratorFramework; +import java.util.Arrays; +import java.util.HashSet; +import java.util.Set; + +/** + * Listener for {@link CuratorCache} events + */ +@SuppressWarnings("deprecation") +@FunctionalInterface +public interface CuratorCacheListener +{ + enum Type + { + /** + * A new node was added to the cache + */ + NODE_CREATED, + + /** + * A node already in the cache has changed + */ + NODE_CHANGED, + + /** + * A node already in the cache was deleted + */ + NODE_DELETED + ; + + public static Type get(ChildData oldNode, ChildData node) + { + if ( (oldNode != null) && (node != null) ) + { + return NODE_CHANGED; + } + else if ( oldNode != null ) + { + return NODE_DELETED; + } + return NODE_CREATED; + } + } + + /** + * Called when a node changes or is deleted. You can get a descriptive + * type for the operation by calling {@link org.apache.curator.framework.recipes.cache.CuratorCacheListener.Type#get(ChildData, ChildData)} + * with the {@code oldNode} and {@code node} arguments + * + * @param oldNode the old node or null + * @param node the new node or null + */ + void nodeChanged(ChildData oldNode, ChildData node); + + /** + * Utility - wrap the given listener so that it's only called when an event of any of the given + * types is received + * + * @param types types to listen for + * @param listener listener + * @return wrapped listener + */ + static CuratorCacheListener forTypes(CuratorCacheListener listener, Type... types) + { + Set<Type> typeSet = new HashSet<>(Arrays.asList(types)); + return (oldNode, node) -> { + if ( typeSet.contains(Type.get(oldNode, node)) ) + { + listener.nodeChanged(oldNode, node); + } + }; + } + + /** + * Bridge listener. You can reuse old-style {@link org.apache.curator.framework.recipes.cache.PathChildrenCacheListener}s + * with CuratorCache. IMPORTANT: the connection state methods in the listener will never be called as CuratorCache + * does not register the listener with the connection state listener container. Also note that CuratorCache + * behaves differently than {@link org.apache.curator.framework.recipes.cache.PathChildrenCache} so + * things such as event ordering will likely be different. + * + * @param client the curator client + * @param listener the listener to wrap + * @return a CuratorCacheListener that forwards to the given listener + */ + static CuratorCacheListener wrap(CuratorFramework client, PathChildrenCacheListener listener) + { + return new PathChildrenCacheListenerWrapper(listener, client); + } + + /** + * Bridge listener. You can reuse old-style {@link org.apache.curator.framework.recipes.cache.TreeCacheListener}s + * with CuratorCache. IMPORTANT: the connection state methods in the listener will never be called as CuratorCache + * does not register the listener with the connection state listener container. Also note that CuratorCache + * behaves differently than {@link org.apache.curator.framework.recipes.cache.TreeCache} so + * things such as event ordering will likely be different. + * + * @param client the curator client + * @param listener the listener to wrap + * @return a CuratorCacheListener that forwards to the given listener + */ + static CuratorCacheListener wrap(CuratorFramework client, TreeCacheListener listener) + { + return new TreeCacheListenerWrapper(client, listener); + } + + /** + * Bridge listener. You can reuse old-style {@link org.apache.curator.framework.recipes.cache.NodeCacheListener}s + * with CuratorCache. IMPORTANT: the connection state methods in the listener will never be called as CuratorCache + * does not register the listener with the connection state listener container. Also note that CuratorCache + * behaves differently than {@link org.apache.curator.framework.recipes.cache.NodeCache} so + * things such as event ordering will likely be different. + * + * @param listener the listener to wrap + * @return a CuratorCacheListener that forwards to the given listener + */ + static CuratorCacheListener wrap(NodeCacheListener listener) + { + return new NodeCacheListenerWrapper(listener); + } + +} diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/CuratorCacheStorage.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/CuratorCacheStorage.java new file mode 100644 index 0000000..ca41d15 --- /dev/null +++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/CuratorCacheStorage.java @@ -0,0 +1,117 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.curator.framework.recipes.cache; + +import java.io.Closeable; +import java.util.Optional; +import java.util.stream.Stream; + +/** + * Interface for maintaining data in a {@link org.apache.curator.framework.recipes.cache.CuratorCache} + */ +public interface CuratorCacheStorage extends Closeable +{ + /** + * Return a new standard storage instance + * + * @return storage instance + */ + static CuratorCacheStorage standard() { + return new StandardCuratorCacheStorage(true); + } + + /** + * Return a new storage instance that does not retain the data bytes. i.e. ChildData objects + * returned by this storage will always return {@code null} for {@link ChildData#getData()}. + * + * @return storage instance that does not retain data bytes + */ + static CuratorCacheStorage bytesNotCached() { + return new StandardCuratorCacheStorage(false); + } + + /** + * Returns a new copy of this storage that does not clear its internal data when it is closed. + * Useful for retaining the cache after it is closed. + * + * @return new copy that does not clear on close + */ + CuratorCacheStorage doNotClearOnClose(); + + /** + * Add an entry to storage and return any previous entry at that path + * + * @param data entry to add + * @return previous entry or {@code empty()} + */ + Optional<ChildData> put(ChildData data); + + /** + * Remove the entry from storage and return any previous entry at that path + * + * @param path path to remove + * @return previous entry or {@code empty()} + */ + Optional<ChildData> remove(String path); + + /** + * Return an entry from storage + * + * @param path path to get + * @return entry or {@code empty()} + */ + Optional<ChildData> get(String path); + + /** + * Return true if the storage currently has an entry for the given path + * + * @param path path to check + * @return true/false + */ + boolean containsPath(String path); + + /** + * Return the current number of entries in storage + * + * @return number of entries + */ + int size(); + + /** + * Return a stream over the storage entries. Note: for a standard storage instance, the stream + * behaves like a stream returned by {@link java.util.concurrent.ConcurrentHashMap#entrySet()} + * + * @return stream over entries + */ + Stream<ChildData> stream(); + + /** + * Reset the storage to zero entries + */ + void clear(); + + /** + * Close the storage. For a standard storage instance, the storage is cleared. + */ + @Override + default void close() + { + clear(); + } +} diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/NodeCache.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/NodeCache.java index 9687e1b..f730f78 100644 --- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/NodeCache.java +++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/NodeCache.java @@ -53,7 +53,10 @@ import java.util.concurrent.atomic.AtomicReference; * <p><b>IMPORTANT</b> - it's not possible to stay transactionally in sync. Users of this class must * be prepared for false-positives and false-negatives. Additionally, always use the version number * when updating data to avoid overwriting another process' change.</p> + * + * @deprecated Use {@link CuratorCache} */ +@Deprecated public class NodeCache implements Closeable { private final Logger log = LoggerFactory.getLogger(getClass()); diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/NodeCacheListenerWrapper.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/NodeCacheListenerWrapper.java new file mode 100644 index 0000000..89360aa --- /dev/null +++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/NodeCacheListenerWrapper.java @@ -0,0 +1,47 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.curator.framework.recipes.cache; + +public class NodeCacheListenerWrapper implements CuratorCacheListener +{ + private final NodeCacheListener listener; + + public NodeCacheListenerWrapper(NodeCacheListener listener) + { + this.listener = listener; + } + + @Override + public void nodeChanged(ChildData oldNode, ChildData node) + { + callListener(); + } + + private void callListener() + { + try + { + listener.nodeChanged(); + } + catch ( Exception e ) + { + throw new RuntimeException(e); + } + } +} diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/PathChildrenCache.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/PathChildrenCache.java index bdc73cc..eb37936 100644 --- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/PathChildrenCache.java +++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/PathChildrenCache.java @@ -64,8 +64,11 @@ import java.util.concurrent.atomic.AtomicReference; * <p><b>IMPORTANT</b> - it's not possible to stay transactionally in sync. Users of this class must * be prepared for false-positives and false-negatives. Additionally, always use the version number * when updating data to avoid overwriting another process' change.</p> + * + * @deprecated Use {@link CuratorCache} */ @SuppressWarnings("NullableProblems") +@Deprecated public class PathChildrenCache implements Closeable { private final Logger log = LoggerFactory.getLogger(getClass()); diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/PathChildrenCacheListenerWrapper.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/PathChildrenCacheListenerWrapper.java new file mode 100644 index 0000000..0393204 --- /dev/null +++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/PathChildrenCacheListenerWrapper.java @@ -0,0 +1,71 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.curator.framework.recipes.cache; + +import org.apache.curator.framework.CuratorFramework; + +public class PathChildrenCacheListenerWrapper implements CuratorCacheListener +{ + private final PathChildrenCacheListener listener; + private final CuratorFramework client; + + public PathChildrenCacheListenerWrapper(PathChildrenCacheListener listener, CuratorFramework client) + { + this.listener = listener; + this.client = client; + } + + @Override + public void nodeChanged(ChildData oldData, ChildData node) + { + switch ( CuratorCacheListener.Type.get(oldData, node) ) + { + case NODE_CREATED: + { + sendEvent(node, PathChildrenCacheEvent.Type.CHILD_ADDED); + break; + } + + case NODE_CHANGED: + { + sendEvent(node, PathChildrenCacheEvent.Type.CHILD_UPDATED); + break; + } + + case NODE_DELETED: + { + sendEvent(oldData, PathChildrenCacheEvent.Type.CHILD_REMOVED); + break; + } + } + } + + private void sendEvent(ChildData node, PathChildrenCacheEvent.Type type) + { + PathChildrenCacheEvent event = new PathChildrenCacheEvent(type, node); + try + { + listener.childEvent(client, event); + } + catch ( Exception e ) + { + throw new RuntimeException(e); + } + } +} diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/StandardCuratorCacheStorage.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/StandardCuratorCacheStorage.java new file mode 100644 index 0000000..5049770 --- /dev/null +++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/StandardCuratorCacheStorage.java @@ -0,0 +1,98 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.curator.framework.recipes.cache; + +import java.util.Map; +import java.util.Optional; +import java.util.concurrent.ConcurrentHashMap; +import java.util.stream.Stream; + +class StandardCuratorCacheStorage implements CuratorCacheStorage +{ + private final Map<String, ChildData> dataMap; + private final boolean cacheBytes; + + StandardCuratorCacheStorage(boolean cacheBytes) + { + this(new ConcurrentHashMap<>(), cacheBytes); + } + + @Override + public CuratorCacheStorage doNotClearOnClose() + { + Map<String, ChildData> copyMap = new ConcurrentHashMap<>(dataMap); + return new StandardCuratorCacheStorage(copyMap, cacheBytes) + { + @Override + public void close() + { + // NOP + } + }; + } + + @Override + public Optional<ChildData> put(ChildData data) + { + ChildData localData = cacheBytes ? data : new ChildData(data.getPath(), data.getStat(), null); + return Optional.ofNullable(dataMap.put(data.getPath(), localData)); + } + + @Override + public Optional<ChildData> remove(String path) + { + return Optional.ofNullable(dataMap.remove(path)); + } + + @Override + public Optional<ChildData> get(String path) + { + return Optional.ofNullable(dataMap.get(path)); + } + + @Override + public boolean containsPath(String path) + { + return dataMap.containsKey(path); + } + + @Override + public int size() + { + return dataMap.size(); + } + + @Override + public Stream<ChildData> stream() + { + return dataMap.values().stream(); + } + + @Override + public void clear() + { + dataMap.clear(); + } + + private StandardCuratorCacheStorage(Map<String, ChildData> dataMap, boolean cacheBytes) + { + this.dataMap = dataMap; + this.cacheBytes = cacheBytes; + } +} diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/TreeCache.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/TreeCache.java index f42c1d5..e321eba 100644 --- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/TreeCache.java +++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/TreeCache.java @@ -71,7 +71,10 @@ import static org.apache.curator.utils.PathUtils.validatePath; * <p><b>IMPORTANT</b> - it's not possible to stay transactionally in sync. Users of this class must * be prepared for false-positives and false-negatives. Additionally, always use the version number * when updating data to avoid overwriting another process' change.</p> + * + * @deprecated Use {@link CuratorCache} */ +@Deprecated public class TreeCache implements Closeable { private static final Logger LOG = LoggerFactory.getLogger(TreeCache.class); diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/TreeCacheListenerWrapper.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/TreeCacheListenerWrapper.java new file mode 100644 index 0000000..0c3a75b --- /dev/null +++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/TreeCacheListenerWrapper.java @@ -0,0 +1,71 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.curator.framework.recipes.cache; + +import org.apache.curator.framework.CuratorFramework; + +public class TreeCacheListenerWrapper implements CuratorCacheListener +{ + private final CuratorFramework client; + private final TreeCacheListener listener; + + public TreeCacheListenerWrapper(CuratorFramework client, TreeCacheListener listener) + { + this.client = client; + this.listener = listener; + } + + @Override + public void nodeChanged(ChildData oldData, ChildData node) + { + switch ( CuratorCacheListener.Type.get(oldData, node) ) + { + case NODE_CREATED: + { + sendEvent(node, TreeCacheEvent.Type.NODE_ADDED); + break; + } + + case NODE_CHANGED: + { + sendEvent(node, TreeCacheEvent.Type.NODE_UPDATED); + break; + } + + case NODE_DELETED: + { + sendEvent(oldData, TreeCacheEvent.Type.NODE_REMOVED); + break; + } + } + } + + private void sendEvent(ChildData node, TreeCacheEvent.Type type) + { + TreeCacheEvent event = new TreeCacheEvent(type, node); + try + { + listener.childEvent(client, event); + } + catch ( Exception e ) + { + throw new RuntimeException(e); + } + } +} diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/PersistentWatcher.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/PersistentWatcher.java index 2c97490..40174fa 100644 --- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/PersistentWatcher.java +++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/PersistentWatcher.java @@ -1,4 +1,22 @@ /** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +/** * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -43,6 +61,7 @@ import java.util.concurrent.atomic.AtomicReference; private final Logger log = LoggerFactory.getLogger(getClass()); private final AtomicReference<State> state = new AtomicReference<>(State.LATENT); private final StandardListenerManager<Watcher> listeners = StandardListenerManager.standard(); + private final StandardListenerManager<Runnable> resetListeners = StandardListenerManager.standard(); private final ConnectionStateListener connectionStateListener = (client, newState) -> { if ( newState.isConnected() ) { @@ -115,13 +134,29 @@ import java.util.concurrent.atomic.AtomicReference; return listeners; } + /** + * Listeners are called when the persistent watcher has been successfully registered + * or re-registered after a connection disruption + * + * @return listener container + */ + public StandardListenerManager<Runnable> getResetListenable() + { + return resetListeners; + } + private void reset() { try { BackgroundCallback callback = (__, event) -> { - if ( event.getResultCode() != KeeperException.Code.OK.intValue() ) { - client.runSafe(this::reset); + if ( event.getResultCode() != KeeperException.Code.OK.intValue() ) + { + reset(); + } + else + { + resetListeners.forEach(Runnable::run); } }; client.addWatch().withMode(recursive ? AddWatchMode.PERSISTENT_RECURSIVE : AddWatchMode.PERSISTENT).inBackground(callback).usingWatcher(watcher).forPath(basePath); diff --git a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestCuratorCache.java b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestCuratorCache.java new file mode 100644 index 0000000..3ffec11 --- /dev/null +++ b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestCuratorCache.java @@ -0,0 +1,112 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.curator.framework.recipes.cache; + +import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.framework.CuratorFrameworkFactory; +import org.apache.curator.framework.state.ConnectionState; +import org.apache.curator.retry.RetryOneTime; +import org.apache.curator.test.BaseClassForTests; +import org.apache.curator.test.InstanceSpec; +import org.apache.curator.test.TestingCluster; +import org.apache.curator.test.compatibility.Timing2; +import org.apache.curator.test.compatibility.Zk35MethodInterceptor; +import org.testng.Assert; +import org.testng.annotations.Test; +import java.util.concurrent.CountDownLatch; + +import static org.apache.curator.framework.recipes.cache.CuratorCache.Options.RECURSIVE; +import static org.apache.curator.framework.recipes.cache.CuratorCacheListener.Type.NODE_CHANGED; +import static org.apache.curator.framework.recipes.cache.CuratorCacheListener.Type.NODE_CREATED; + +@Test(groups = Zk35MethodInterceptor.zk35Group) +public class TestCuratorCache extends BaseClassForTests +{ + private final Timing2 timing = new Timing2(); + + @Test + public void testServerLoss() throws Exception // mostly copied from TestPathChildrenCacheInCluster + { + try ( TestingCluster cluster = new TestingCluster(3) ) + { + cluster.start(); + + try ( CuratorFramework client = CuratorFrameworkFactory.newClient(cluster.getConnectString(), timing.session(), timing.connection(), new RetryOneTime(1)) ) + { + client.start(); + client.create().creatingParentsIfNeeded().forPath("/test"); + + try (CuratorCache cache = CuratorCache.build(client, "/test", RECURSIVE)) + { + cache.start(); + + CountDownLatch reconnectLatch = new CountDownLatch(1); + client.getConnectionStateListenable().addListener((__, newState) -> { + if ( newState == ConnectionState.RECONNECTED ) + { + reconnectLatch.countDown(); + } + }); + CountDownLatch latch = new CountDownLatch(3); + cache.listenable().addListener((__, ___) -> latch.countDown()); + + client.create().forPath("/test/one"); + client.create().forPath("/test/two"); + client.create().forPath("/test/three"); + + Assert.assertTrue(timing.awaitLatch(latch)); + + InstanceSpec connectionInstance = cluster.findConnectionInstance(client.getZookeeperClient().getZooKeeper()); + cluster.killServer(connectionInstance); + + Assert.assertTrue(timing.awaitLatch(reconnectLatch)); + + timing.sleepABit(); + + Assert.assertEquals(cache.storage().stream().count(), 4); + } + } + } + } + + @Test + public void testUpdateWhenNotCachingData() throws Exception // mostly copied from TestPathChildrenCache + { + CuratorCacheStorage storage = new StandardCuratorCacheStorage(false); + try ( CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), timing.connection(), new RetryOneTime(1)) ) + { + client.start(); + final CountDownLatch updatedLatch = new CountDownLatch(1); + final CountDownLatch addedLatch = new CountDownLatch(1); + client.create().creatingParentsIfNeeded().forPath("/test"); + try (CuratorCache cache = CuratorCache.build(client, storage, "/test", RECURSIVE)) + { + cache.listenable().addListener(CuratorCacheListener.forTypes((__, ___) -> updatedLatch.countDown(), NODE_CHANGED)); + cache.listenable().addListener(CuratorCacheListener.forTypes((__, ___) -> addedLatch.countDown(), NODE_CREATED)); + cache.start(); + + client.create().forPath("/test/foo", "first".getBytes()); + Assert.assertTrue(timing.awaitLatch(addedLatch)); + + client.setData().forPath("/test/foo", "something new".getBytes()); + Assert.assertTrue(timing.awaitLatch(updatedLatch)); + } + } + } +} diff --git a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestCuratorCacheConsistency.java b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestCuratorCacheConsistency.java new file mode 100644 index 0000000..7a6c964 --- /dev/null +++ b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestCuratorCacheConsistency.java @@ -0,0 +1,317 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.curator.framework.recipes.cache; + +import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.framework.CuratorFrameworkFactory; +import org.apache.curator.retry.ExponentialBackoffRetry; +import org.apache.curator.test.BaseClassForTests; +import org.apache.curator.test.InstanceSpec; +import org.apache.curator.test.TestingCluster; +import org.apache.curator.test.compatibility.Timing2; +import org.apache.curator.test.compatibility.Zk35MethodInterceptor; +import org.apache.curator.utils.ZKPaths; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.testng.Assert; +import org.testng.annotations.Test; +import java.io.Closeable; +import java.time.Duration; +import java.time.Instant; +import java.util.AbstractMap; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ThreadLocalRandom; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + +import static org.apache.curator.framework.recipes.cache.CuratorCache.Options.RECURSIVE; + +@Test(groups = Zk35MethodInterceptor.zk35Group) +public class TestCuratorCacheConsistency extends BaseClassForTests +{ + private final Logger log = LoggerFactory.getLogger(getClass()); + private final ThreadLocalRandom random = ThreadLocalRandom.current(); + + private static final Timing2 timing = new Timing2(); + private static final Duration testLength = Duration.ofSeconds(30); + private static final Duration thirdOfTestLength = Duration.ofMillis(testLength.toMillis() / 3); + private static final Duration sleepLength = Duration.ofMillis(5); + private static final int nodesPerLevel = 10; + private static final int clusterSize = 5; + private static final int maxServerKills = 2; + + private static final String BASE_PATH = "/test"; + + private static class Client implements Closeable + { + private final CuratorFramework client; + private final CuratorCache cache; + private final int index; + + Client(int index, String connectionString) + { + this.index = index; + client = buildClient(connectionString); + cache = CuratorCache.build(client, CuratorCacheStorage.standard().doNotClearOnClose(), BASE_PATH, RECURSIVE); + } + + void start() + { + client.start(); + cache.start(); + } + + @Override + public void close() + { + cache.close(); + client.close(); + } + } + + @Test + public void testConsistencyAfterSimulation() throws Exception + { + int clientQty = random.nextInt(10, 20); + int maxDepth = random.nextInt(5, 10); + + log.info("clientQty: {}, maxDepth: {}", clientQty, maxDepth); + + List<Client> clients = Collections.emptyList(); + Map<String, String> actualTree; + + try ( TestingCluster cluster = new TestingCluster(clusterSize) ) + { + cluster.start(); + + initializeBasePath(cluster); + try + { + clients = buildClients(cluster, clientQty); + workLoop(cluster, clients, maxDepth); + + log.info("Test complete - sleeping to allow events to complete"); + timing.sleepABit(); + } + finally + { + clients.forEach(Client::close); + } + + actualTree = buildActual(cluster); + } + + log.info("client qty: {}", clientQty); + + Map<Integer, List<String>> errorsList = clients.stream() + .map(client -> findErrors(client.index, actualTree, client.cache.storage())) + .filter(errorsEntry -> !errorsEntry.getValue().isEmpty()) + .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); + if ( !errorsList.isEmpty() ) + { + log.error("{} clients had errors", errorsList.size()); + errorsList.forEach((index, errorList) -> { + log.error("Client {}", index); + errorList.forEach(log::error); + log.error(""); + }); + + Assert.fail("Errors found"); + } + } + + private Map<String, String> buildActual(TestingCluster cluster) + { + Map<String, String> actual = new HashMap<>(); + try ( CuratorFramework client = buildClient(cluster.getConnectString()) ) + { + client.start(); + buildActual(client, actual, BASE_PATH); + } + return actual; + } + + private void buildActual(CuratorFramework client, Map<String, String> actual, String fromPath) + { + try + { + byte[] bytes = client.getData().forPath(fromPath); + actual.put(fromPath, new String(bytes)); + client.getChildren().forPath(fromPath).forEach(child -> buildActual(client, actual, ZKPaths.makePath(fromPath, child))); + } + catch ( Exception e ) + { + Assert.fail("", e); + } + } + + private List<Client> buildClients(TestingCluster cluster, int clientQty) + { + return IntStream.range(0, clientQty) + .mapToObj(index -> new Client(index, cluster.getConnectString())) + .peek(Client::start) + .collect(Collectors.toList()); + } + + private void initializeBasePath(TestingCluster cluster) throws Exception + { + try ( CuratorFramework client = buildClient(cluster.getConnectString()) ) + { + client.start(); + client.create().forPath(BASE_PATH, "".getBytes()); + } + } + + private void workLoop(TestingCluster cluster, List<Client> clients, int maxDepth) throws Exception + { + Instant start = Instant.now(); + Instant lastServerKill = Instant.now(); + int serverKillIndex = 0; + while ( true ) + { + Duration elapsed = Duration.between(start, Instant.now()); + if ( elapsed.compareTo(testLength) >= 0 ) + { + break; + } + + Duration elapsedFromLastServerKill = Duration.between(lastServerKill, Instant.now()); + if ( elapsedFromLastServerKill.compareTo(thirdOfTestLength) >= 0 ) + { + lastServerKill = Instant.now(); + if ( serverKillIndex < maxServerKills ) + { + doKillServer(cluster, serverKillIndex++); + } + } + + int thisDepth = random.nextInt(0, maxDepth); + String thisPath = buildPath(thisDepth); + CuratorFramework client = randomClient(clients); + if ( random.nextBoolean() ) + { + doDelete(client, thisPath); + } + else + { + doChange(client, thisPath); + } + + Thread.sleep(sleepLength.toMillis()); + } + } + + private void doChange(CuratorFramework client, String thisPath) + { + try + { + String thisData = Long.toString(random.nextLong()); + client.create().orSetData().creatingParentsIfNeeded().forPath(thisPath, thisData.getBytes()); + } + catch ( Exception e ) + { + Assert.fail("Could not create/set: " + thisPath); + } + } + + private void doDelete(CuratorFramework client, String thisPath) + { + if ( thisPath.equals(BASE_PATH) ) + { + return; + } + try + { + client.delete().quietly().deletingChildrenIfNeeded().forPath(thisPath); + } + catch ( Exception e ) + { + Assert.fail("Could not delete: " + thisPath); + } + } + + private void doKillServer(TestingCluster cluster, int serverKillIndex) throws Exception + { + log.info("Killing server {}", serverKillIndex); + InstanceSpec killSpec = new ArrayList<>(cluster.getInstances()).get(serverKillIndex); + cluster.killServer(killSpec); + } + + private CuratorFramework randomClient(List<Client> clients) + { + return clients.get(random.nextInt(clients.size())).client; + } + + private Map.Entry<Integer, List<String>> findErrors(int index, Map<String, String> tree, CuratorCacheStorage storage) + { + List<String> errors = new ArrayList<>(); + if ( tree.size() != storage.size() ) + { + errors.add(String.format("Size mismatch. Expected: %d - Actual: %d", tree.size(), storage.size())); + } + tree.keySet().forEach(path -> { + if ( !storage.containsPath(path) ) + { + errors.add(String.format("Path %s in master but not client", path)); + } + }); + storage.stream().forEach(data -> { + String treeValue = tree.get(data.getPath()); + if ( treeValue != null ) + { + if ( !treeValue.equals(new String(data.getData())) ) + { + errors.add(String.format("Data at %s is not the same", data.getPath())); + } + } + else + { + errors.add(String.format("Path %s in client but not master", data.getPath())); + } + }); + return new AbstractMap.SimpleEntry<>(index, errors); + } + + private String buildPath(int depth) + { + StringBuilder str = new StringBuilder(BASE_PATH); + while ( depth-- > 0 ) + { + int levelNodeName = random.nextInt(nodesPerLevel); + str.append("/").append(levelNodeName); + } + return str.toString(); + } + + @Override + protected void createServer() + { + // do nothing + } + + private static CuratorFramework buildClient(String connectionString) + { + ExponentialBackoffRetry retryPolicy = new ExponentialBackoffRetry(100, 100); + return CuratorFrameworkFactory.newClient(connectionString, timing.session(), timing.connection(), retryPolicy); + } +} diff --git a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestWrappedNodeCache.java b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestWrappedNodeCache.java new file mode 100644 index 0000000..d595e3c --- /dev/null +++ b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestWrappedNodeCache.java @@ -0,0 +1,165 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.curator.framework.recipes.cache; + +import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.framework.CuratorFrameworkFactory; +import org.apache.curator.framework.imps.TestCleanState; +import org.apache.curator.framework.recipes.cache.CuratorCache; +import org.apache.curator.framework.recipes.cache.CuratorCacheListener; +import org.apache.curator.framework.recipes.cache.NodeCacheListener; +import org.apache.curator.framework.state.ConnectionState; +import org.apache.curator.retry.RetryOneTime; +import org.apache.curator.test.BaseClassForTests; +import org.apache.curator.test.compatibility.Timing2; +import org.apache.curator.test.compatibility.Zk35MethodInterceptor; +import org.apache.curator.utils.CloseableUtils; +import org.apache.curator.utils.Compatibility; +import org.testng.Assert; +import org.testng.annotations.Test; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.Semaphore; + +import static org.apache.curator.framework.recipes.cache.CuratorCache.Options.RECURSIVE; + +@Test(groups = Zk35MethodInterceptor.zk35Group) +public class TestWrappedNodeCache extends BaseClassForTests +{ + private final Timing2 timing = new Timing2(); + + @Test + public void testDeleteThenCreate() throws Exception + { + CuratorCache cache = null; + CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new RetryOneTime(1)); + try + { + client.start(); + client.create().creatingParentsIfNeeded().forPath("/test/foo", "one".getBytes()); + + final Semaphore semaphore = new Semaphore(0); + cache = CuratorCache.build(client, "/test/foo", RECURSIVE); + NodeCacheListener listener = semaphore::release; + cache.listenable().addListener(CuratorCacheListener.wrap(listener)); + + cache.start(); + Assert.assertTrue(timing.acquireSemaphore(semaphore)); + + Assert.assertTrue(cache.getRootData().isPresent()); + Assert.assertEquals(cache.getRootData().get().getData(), "one".getBytes()); + + client.delete().forPath("/test/foo"); + Assert.assertTrue(timing.acquireSemaphore(semaphore)); + client.create().forPath("/test/foo", "two".getBytes()); + Assert.assertTrue(timing.acquireSemaphore(semaphore)); + + Assert.assertTrue(cache.getRootData().isPresent()); + Assert.assertEquals(cache.getRootData().get().getData(), "two".getBytes()); + } + finally + { + CloseableUtils.closeQuietly(cache); + TestCleanState.closeAndTestClean(client); + } + } + + @Test + public void testKilledSession() throws Exception + { + CuratorCache cache = null; + CuratorFramework client = null; + try + { + client = CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), timing.connection(), new RetryOneTime(1)); + client.start(); + client.create().creatingParentsIfNeeded().forPath("/test/node", "start".getBytes()); + + CountDownLatch lostLatch = new CountDownLatch(1); + client.getConnectionStateListenable().addListener((__, newState) -> { + if ( newState == ConnectionState.LOST ) + { + lostLatch.countDown(); + } + }); + + cache = CuratorCache.build(client,"/test/node", RECURSIVE); + + Semaphore latch = new Semaphore(0); + NodeCacheListener listener = latch::release; + cache.listenable().addListener(CuratorCacheListener.wrap(listener)); + + cache.start(); + Assert.assertTrue(timing.acquireSemaphore(latch)); + + Compatibility.injectSessionExpiration(client.getZookeeperClient().getZooKeeper()); + Assert.assertTrue(timing.awaitLatch(lostLatch)); + + Assert.assertTrue(cache.getRootData().isPresent()); + Assert.assertEquals(cache.getRootData().get().getData(), "start".getBytes()); + + client.setData().forPath("/test/node", "new data".getBytes()); + Assert.assertTrue(timing.acquireSemaphore(latch)); + Assert.assertTrue(cache.getRootData().isPresent()); + Assert.assertEquals(cache.getRootData().get().getData(), "new data".getBytes()); + } + finally + { + CloseableUtils.closeQuietly(cache); + TestCleanState.closeAndTestClean(client); + } + } + + @Test + public void testBasics() throws Exception + { + CuratorCache cache = null; + CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), timing.connection(), new RetryOneTime(1)); + try + { + client.start(); + client.create().forPath("/test"); + + cache = CuratorCache.build(client, "/test/node", RECURSIVE); + cache.start(); + + final Semaphore semaphore = new Semaphore(0); + NodeCacheListener listener = semaphore::release; + cache.listenable().addListener(CuratorCacheListener.wrap(listener)); + + Assert.assertNull(cache.getRootData().orElse(null)); + + client.create().forPath("/test/node", "a".getBytes()); + Assert.assertTrue(timing.acquireSemaphore(semaphore)); + Assert.assertEquals(cache.getRootData().orElse(null).getData(), "a".getBytes()); + + client.setData().forPath("/test/node", "b".getBytes()); + Assert.assertTrue(timing.acquireSemaphore(semaphore)); + Assert.assertEquals(cache.getRootData().orElse(null).getData(), "b".getBytes()); + + client.delete().forPath("/test/node"); + Assert.assertTrue(timing.acquireSemaphore(semaphore)); + Assert.assertNull(cache.getRootData().orElse(null)); + } + finally + { + CloseableUtils.closeQuietly(cache); + TestCleanState.closeAndTestClean(client); + } + } +} diff --git a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/watch/TestPersistentWatcher.java b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/watch/TestPersistentWatcher.java index df18de5..b3d5701 100644 --- a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/watch/TestPersistentWatcher.java +++ b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/watch/TestPersistentWatcher.java @@ -1,3 +1,21 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ package org.apache.curator.framework.recipes.watch; import org.apache.curator.framework.CuratorFramework; @@ -6,6 +24,7 @@ import org.apache.curator.framework.state.ConnectionState; import org.apache.curator.retry.RetryOneTime; import org.apache.curator.test.BaseClassForTests; import org.apache.curator.test.compatibility.Timing2; +import org.apache.curator.test.compatibility.Zk35MethodInterceptor; import org.apache.zookeeper.WatchedEvent; import org.apache.zookeeper.Watcher; import org.testng.Assert; @@ -14,6 +33,7 @@ import java.util.concurrent.BlockingQueue; import java.util.concurrent.CountDownLatch; import java.util.concurrent.LinkedBlockingQueue; +@Test(groups = Zk35MethodInterceptor.zk35Group) public class TestPersistentWatcher extends BaseClassForTests { private final Timing2 timing = new Timing2(); diff --git a/curator-test/src/main/java/org/apache/curator/test/TestingCluster.java b/curator-test/src/main/java/org/apache/curator/test/TestingCluster.java index 3d38fe1..58da2c0 100644 --- a/curator-test/src/main/java/org/apache/curator/test/TestingCluster.java +++ b/curator-test/src/main/java/org/apache/curator/test/TestingCluster.java @@ -225,7 +225,7 @@ public class TestingCluster implements Closeable */ public InstanceSpec findConnectionInstance(ZooKeeper client) throws Exception { - Method m = client.getClass().getDeclaredMethod("testableRemoteSocketAddress"); + Method m = ZooKeeper.class.getDeclaredMethod("testableRemoteSocketAddress"); m.setAccessible(true); InetSocketAddress address = (InetSocketAddress)m.invoke(client); if ( address != null ) diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/api/AsyncPersistentWatchBuilder.java b/curator-x-async/src/main/java/org/apache/curator/x/async/api/AsyncPersistentWatchBuilder.java index 0f29233..143a2c8 100644 --- a/curator-x-async/src/main/java/org/apache/curator/x/async/api/AsyncPersistentWatchBuilder.java +++ b/curator-x-async/src/main/java/org/apache/curator/x/async/api/AsyncPersistentWatchBuilder.java @@ -1,4 +1,22 @@ /** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +/** * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncPersistentWatchBuilderImpl.java b/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncPersistentWatchBuilderImpl.java deleted file mode 100644 index 14f3e30..0000000 --- a/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncPersistentWatchBuilderImpl.java +++ /dev/null @@ -1,75 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - package org.apache.curator.x.async.details; - - import org.apache.curator.framework.api.AddPersistentWatchable; - import org.apache.curator.framework.api.CuratorWatcher; - import org.apache.curator.framework.imps.AddPersistentWatchBuilderImpl; - import org.apache.curator.framework.imps.CuratorFrameworkImpl; - import org.apache.curator.framework.imps.Watching; - import org.apache.curator.x.async.AsyncStage; - import org.apache.curator.x.async.api.AsyncPathable; - import org.apache.curator.x.async.api.AsyncPersistentWatchBuilder; - import org.apache.zookeeper.Watcher; - - import static org.apache.curator.x.async.details.BackgroundProcs.ignoredProc; - import static org.apache.curator.x.async.details.BackgroundProcs.safeCall; - - class AsyncPersistentWatchBuilderImpl implements AsyncPersistentWatchBuilder, AddPersistentWatchable<AsyncPathable<AsyncStage<Void>>>, AsyncPathable<AsyncStage<Void>> - { - private final CuratorFrameworkImpl client; - private final Filters filters; - private Watching watching = null; - private boolean recursive = false; - - AsyncPersistentWatchBuilderImpl(CuratorFrameworkImpl client, Filters filters) - { - this.client = client; - this.filters = filters; - } - - @Override - public AddPersistentWatchable<AsyncPathable<AsyncStage<Void>>> recursive() - { - recursive = true; - return this; - } - - @Override - public AsyncPathable<AsyncStage<Void>> usingWatcher(Watcher watcher) - { - watching = new Watching(client, watcher); - return this; - } - - @Override - public AsyncPathable<AsyncStage<Void>> usingWatcher(CuratorWatcher watcher) - { - watching = new Watching(client, watcher); - return this; - } - - @Override - public AsyncStage<Void> forPath(String path) - { - BuilderCommon<Void> common = new BuilderCommon<>(filters, ignoredProc); - AddPersistentWatchBuilderImpl builder = new AddPersistentWatchBuilderImpl(client, watching, common.backgrounding, recursive); - return safeCall(common.internalCallback, () -> builder.forPath(path)); - } - } \ No newline at end of file
