This is an automated email from the ASF dual-hosted git repository. randgalt pushed a commit to branch zk36-recipes in repository https://gitbox.apache.org/repos/asf/curator.git
commit 7a2e00241577110fc0525bc751801a134331d843 Author: randgalt <[email protected]> AuthorDate: Sun Nov 3 12:45:21 2019 -0500 Add recipes that use persistent watchers including new cache recipes to replace all others. --- .../framework/recipes/cache/CuratorCache.java | 114 +++++++ .../recipes/cache/CuratorCacheBuilder.java | 63 ++++ .../recipes/cache/CuratorCacheBuilderImpl.java | 74 ++++ .../framework/recipes/cache/CuratorCacheImpl.java | 303 +++++++++++++++++ .../recipes/cache/CuratorCacheListener.java | 78 +++++ .../recipes/cache/CuratorCacheListenerBuilder.java | 129 +++++++ .../cache/CuratorCacheListenerBuilderImpl.java | 161 +++++++++ .../recipes/cache/CuratorCacheStorage.java | 114 +++++++ .../recipes/cache/NodeCacheListenerWrapper.java | 46 +++ .../cache/PathChildrenCacheListenerWrapper.java | 78 +++++ .../recipes/cache/StandardCuratorCacheStorage.java | 88 +++++ .../recipes/cache/TreeCacheListenerWrapper.java | 81 +++++ .../framework/recipes/watch/PersistentWatcher.java | 169 ++++++++++ .../framework/recipes/cache/TestCuratorCache.java | 248 ++++++++++++++ .../recipes/cache/TestCuratorCacheConsistency.java | 373 +++++++++++++++++++++ .../cache/TestCuratorCacheEventOrdering.java | 52 +++ .../recipes/cache/TestCuratorCacheWrappers.java | 162 +++++++++ .../recipes/cache/TestWrappedNodeCache.java | 172 ++++++++++ .../recipes/watch/TestPersistentWatcher.java | 105 ++++++ 19 files changed, 2610 insertions(+) diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/CuratorCache.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/CuratorCache.java new file mode 100644 index 0000000..ebe06e0 --- /dev/null +++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/CuratorCache.java @@ -0,0 +1,114 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.curator.framework.recipes.cache; + +import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.framework.listen.Listenable; +import java.io.Closeable; + +/** + * <p> + * A utility that attempts to keep the data from a node locally cached. Optionally the entire + * tree of children below the node can also be cached. Will respond to update/create/delete events, pull + * down the data, etc. You can register listeners that will get notified when changes occur. + * </p> + * + * <p> + * <b>IMPORTANT</b> - it's not possible to stay transactionally in sync. Users of this class must + * be prepared for false-positives and false-negatives. Additionally, always use the version number + * when updating data to avoid overwriting another process' change. + * </p> + */ +public interface CuratorCache extends Closeable +{ + /** + * cache build options + */ + enum Options + { + /** + * Normally the entire tree of nodes starting at the given node are cached. This option + * causes only the given node to be cached (i.e. a single node cache) + */ + SINGLE_NODE_CACHE, + + /** + * Decompress data via {@link org.apache.curator.framework.api.GetDataBuilder#decompressed()} + */ + COMPRESSED_DATA, + + /** + * Normally, when the cache is closed via {@link CuratorCache#close()}, the storage is cleared + * via {@link CuratorCacheStorage#clear()}. This option prevents the storage from being cleared. + */ + DO_NOT_CLEAR_ON_CLOSE + } + + /** + * Return a Curator Cache for the given path with the given options using a standard storage instance + * + * @param client Curator client + * @param path path to cache + * @param options any options + * @return cache (note it must be started via {@link #start()} + */ + static CuratorCache build(CuratorFramework client, String path, Options... options) + { + return builder(client, path).withOptions(options).build(); + } + + /** + * Start a Curator Cache builder + * + * @param client Curator client + * @param path path to cache + * @return builder + */ + static CuratorCacheBuilder builder(CuratorFramework client, String path) + { + return new CuratorCacheBuilderImpl(client, path); + } + + /** + * Start the cache. This will cause a complete refresh from the cache's root node and generate + * events for all nodes found, etc. + */ + void start(); + + /** + * Close the cache, stop responding to events, etc. + */ + @Override + void close(); + + /** + * Return the storage instance being used + * + * @return storage + */ + CuratorCacheStorage storage(); + + /** + * Return the listener container so that listeners can be registered to be notified of changes to the cache + * + * @return listener container + */ + Listenable<CuratorCacheListener> listenable(); +} diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/CuratorCacheBuilder.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/CuratorCacheBuilder.java new file mode 100644 index 0000000..35a5f26 --- /dev/null +++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/CuratorCacheBuilder.java @@ -0,0 +1,63 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.curator.framework.recipes.cache; + +import java.util.concurrent.Executor; +import java.util.function.Consumer; + +public interface CuratorCacheBuilder +{ + /** + * @param options any options + * @return this + */ + CuratorCacheBuilder withOptions(CuratorCache.Options... options); + + /** + * Alternate storage to use. If not specified, {@link StandardCuratorCacheStorage#standard()} is used + * + * @param storage storage instance to use + * @return this + */ + CuratorCacheBuilder withStorage(CuratorCacheStorage storage); + + /** + * By default any unexpected exception is handled by logging the exception. You can change + * so that a handler is called instead. Under normal circumstances, this shouldn't be necessary. + * + * @param exceptionHandler exception handler to use + */ + CuratorCacheBuilder withExceptionHandler(Consumer<Exception> exceptionHandler); + + /** + * Normally, listeners are wrapped in {@link org.apache.curator.framework.CuratorFramework#runSafe(Runnable)}. Use this + * method to set a different executor. + * + * @param executor to use + */ + CuratorCacheBuilder withExecutor(Executor executor); + + /** + * Return a new Curator Cache based on the builder methods that have been called + * + * @return new Curator Cache + */ + CuratorCache build(); +} \ No newline at end of file diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/CuratorCacheBuilderImpl.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/CuratorCacheBuilderImpl.java new file mode 100644 index 0000000..9f9e03d --- /dev/null +++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/CuratorCacheBuilderImpl.java @@ -0,0 +1,74 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.curator.framework.recipes.cache; + +import org.apache.curator.framework.CuratorFramework; +import java.util.concurrent.Executor; +import java.util.function.Consumer; + +class CuratorCacheBuilderImpl implements CuratorCacheBuilder +{ + private final CuratorFramework client; + private final String path; + private CuratorCacheStorage storage; + private Consumer<Exception> exceptionHandler; + private Executor executor; + private CuratorCache.Options[] options; + + CuratorCacheBuilderImpl(CuratorFramework client, String path) + { + this.client = client; + this.path = path; + } + + @Override + public CuratorCacheBuilder withOptions(CuratorCache.Options... options) + { + this.options = options; + return this; + } + + @Override + public CuratorCacheBuilder withStorage(CuratorCacheStorage storage) + { + this.storage = storage; + return this; + } + + @Override + public CuratorCacheBuilder withExceptionHandler(Consumer<Exception> exceptionHandler) + { + this.exceptionHandler = exceptionHandler; + return this; + } + + @Override + public CuratorCacheBuilder withExecutor(Executor executor) + { + this.executor = executor; + return this; + } + + @Override + public CuratorCache build() + { + return new CuratorCacheImpl(client, storage, path, options, executor, exceptionHandler); + } +} \ No newline at end of file diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/CuratorCacheImpl.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/CuratorCacheImpl.java new file mode 100644 index 0000000..6349530 --- /dev/null +++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/CuratorCacheImpl.java @@ -0,0 +1,303 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.curator.framework.recipes.cache; + +import com.google.common.base.Preconditions; +import com.google.common.collect.Sets; +import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.framework.api.BackgroundCallback; +import org.apache.curator.framework.api.CuratorEvent; +import org.apache.curator.framework.listen.Listenable; +import org.apache.curator.framework.listen.StandardListenerManager; +import org.apache.curator.framework.recipes.watch.PersistentWatcher; +import org.apache.curator.utils.ThreadUtils; +import org.apache.curator.utils.ZKPaths; +import org.apache.zookeeper.KeeperException; +import org.apache.zookeeper.WatchedEvent; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import java.util.Collections; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.Executor; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Consumer; + +import static org.apache.curator.framework.recipes.cache.CuratorCacheListener.Type.*; +import static org.apache.zookeeper.KeeperException.Code.NONODE; +import static org.apache.zookeeper.KeeperException.Code.OK; + +class CuratorCacheImpl implements CuratorCache +{ + private final Logger log = LoggerFactory.getLogger(getClass()); + private final AtomicReference<State> state = new AtomicReference<>(State.LATENT); + private final PersistentWatcher persistentWatcher; + private final CuratorFramework client; + private final CuratorCacheStorage storage; + private final String path; + private final boolean recursive; + private final boolean compressedData; + private final boolean clearOnClose; + private final StandardListenerManager<CuratorCacheListener> listenerManager = StandardListenerManager.standard(); + private final Consumer<Exception> exceptionHandler; + private final Executor executor; + + private volatile AtomicLong outstandingOps; + + private enum State + { + LATENT, + STARTED, + CLOSED + } + + CuratorCacheImpl(CuratorFramework client, CuratorCacheStorage storage, String path, Options[] optionsArg, Executor executor, Consumer<Exception> exceptionHandler) + { + Set<Options> options = (optionsArg != null) ? Sets.newHashSet(optionsArg) : Collections.emptySet(); + this.client = client; + this.storage = (storage != null) ? storage : CuratorCacheStorage.standard(); + this.path = path; + recursive = !options.contains(Options.SINGLE_NODE_CACHE); + compressedData = options.contains(Options.COMPRESSED_DATA); + clearOnClose = !options.contains(Options.DO_NOT_CLEAR_ON_CLOSE); + persistentWatcher = new PersistentWatcher(client, path, recursive); + persistentWatcher.getListenable().addListener(this::processEvent); + persistentWatcher.getResetListenable().addListener(this::rebuild); + this.exceptionHandler = (exceptionHandler != null) ? exceptionHandler : e -> log.error("CuratorCache error", e); + this.executor = (executor != null) ? executor : client::runSafe; + } + + @Override + public void start() + { + Preconditions.checkState(state.compareAndSet(State.LATENT, State.STARTED), "Already started"); + outstandingOps = new AtomicLong(0); + persistentWatcher.start(); + } + + @Override + public void close() + { + if ( state.compareAndSet(State.STARTED, State.CLOSED) ) + { + persistentWatcher.close(); + if ( clearOnClose ) + { + storage.clear(); + } + } + } + + @Override + public CuratorCacheStorage storage() + { + return storage; + } + + @Override + public Listenable<CuratorCacheListener> listenable() + { + return listenerManager; + } + + private void rebuild() + { + if ( state.get() != State.STARTED ) + { + return; + } + + nodeChanged(path); + storage.stream() + .map(ChildData::getPath) + .filter(p -> !p.equals(path)) + .forEach(this::nodeChanged); + } + + private void processEvent(WatchedEvent event) + { + if ( state.get() != State.STARTED ) + { + return; + } + + switch ( event.getType() ) + { + case NodeDataChanged: + case NodeCreated: + { + nodeChanged(event.getPath()); + break; + } + + case NodeDeleted: + { + removeStorage(event.getPath()); + break; + } + + case NodeChildrenChanged: + { + nodeChildrenChanged(event.getPath()); + break; + } + } + } + + private void nodeChildrenChanged(String fromPath) + { + if ( (state.get() != State.STARTED) || !recursive ) + { + return; + } + + try + { + BackgroundCallback callback = (__, event) -> { + if ( event.getResultCode() == OK.intValue() ) + { + event.getChildren().forEach(child -> nodeChanged(ZKPaths.makePath(fromPath, child))); + } + else if ( event.getResultCode() == NONODE.intValue() ) + { + removeStorage(event.getPath()); + } + else + { + handleException(event); + } + checkDecrementOutstandingOps(); + }; + + checkIncrementOutstandingOps(); + client.getChildren().inBackground(callback).forPath(fromPath); + } + catch ( Exception e ) + { + handleException(e); + } + } + + private void nodeChanged(String fromPath) + { + if ( state.get() != State.STARTED ) + { + return; + } + + try + { + BackgroundCallback callback = (__, event) -> { + if ( event.getResultCode() == OK.intValue() ) + { + putStorage(new ChildData(event.getPath(), event.getStat(), event.getData())); + nodeChildrenChanged(event.getPath()); + } + else if ( event.getResultCode() == NONODE.intValue() ) + { + removeStorage(event.getPath()); + } + else + { + handleException(event); + } + checkDecrementOutstandingOps(); + }; + + checkIncrementOutstandingOps(); + if ( compressedData ) + { + client.getData().decompressed().inBackground(callback).forPath(fromPath); + } + else + { + client.getData().inBackground(callback).forPath(fromPath); + } + } + catch ( Exception e ) + { + handleException(e); + } + } + + public static volatile boolean JZTEST = false; + + private void putStorage(ChildData data) + { + Optional<ChildData> previousData = storage.put(data); + if ( previousData.isPresent() ) + { + if ( previousData.get().getStat().getMzxid() != data.getStat().getMzxid() ) + { + callListeners(l -> l.event(NODE_CHANGED, previousData.get(), data)); + } + } + else + { + callListeners(l -> l.event(NODE_CREATED, null, data)); + } + } + + private void removeStorage(String path) + { + storage.remove(path).ifPresent(previousData -> callListeners(l -> l.event(NODE_DELETED, previousData, null))); + } + + private void callListeners(Consumer<CuratorCacheListener> proc) + { + if ( state.get() == State.STARTED ) + { + executor.execute(() -> listenerManager.forEach(proc)); + } + } + + private void handleException(CuratorEvent event) + { + handleException(KeeperException.create(KeeperException.Code.get(event.getResultCode()))); + } + + private void handleException(Exception e) + { + ThreadUtils.checkInterrupted(e); + exceptionHandler.accept(e); + } + + private void checkIncrementOutstandingOps() + { + AtomicLong localOutstandingOps = outstandingOps; + if ( localOutstandingOps != null ) + { + localOutstandingOps.incrementAndGet(); + } + } + + private void checkDecrementOutstandingOps() + { + AtomicLong localOutstandingOps = outstandingOps; + if ( localOutstandingOps != null ) + { + if ( localOutstandingOps.decrementAndGet() == 0 ) + { + outstandingOps = null; + callListeners(CuratorCacheListener::initialized); + } + } + } +} diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/CuratorCacheListener.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/CuratorCacheListener.java new file mode 100644 index 0000000..620e471 --- /dev/null +++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/CuratorCacheListener.java @@ -0,0 +1,78 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.curator.framework.recipes.cache; + +/** + * Listener for {@link CuratorCache} events. The main functional interface is general purpose + * but you can build event specific listeners, etc. using the builder. Note: all listeners + * are wrapped in {@link org.apache.curator.framework.CuratorFramework#runSafe(Runnable)} when called. + */ +@FunctionalInterface +public interface CuratorCacheListener +{ + /** + * An enumerated type that describes a change + */ + enum Type + { + /** + * A new node was added to the cache + */ + NODE_CREATED, + + /** + * A node already in the cache has changed + */ + NODE_CHANGED, + + /** + * A node already in the cache was deleted + */ + NODE_DELETED + } + + /** + * Called when a data is created, changed or deleted. + * + * @param type the type of event + * @param oldData the old data or null + * @param data the new data or null + */ + void event(Type type, ChildData oldData, ChildData data); + + /** + * When the cache is started, the initial nodes are tracked and when they are finished loading + * into the cache this method is called. + */ + default void initialized() + { + // NOP + } + + /** + * Returns a builder allowing type specific, and special purpose listeners. + * + * @return builder + */ + static CuratorCacheListenerBuilder builder() + { + return new CuratorCacheListenerBuilderImpl(); + } +} \ No newline at end of file diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/CuratorCacheListenerBuilder.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/CuratorCacheListenerBuilder.java new file mode 100644 index 0000000..c57e881 --- /dev/null +++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/CuratorCacheListenerBuilder.java @@ -0,0 +1,129 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.curator.framework.recipes.cache; + +import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.framework.recipes.cache.CuratorCacheListener.Type; +import java.util.function.Consumer; + +public interface CuratorCacheListenerBuilder +{ + /** + * Add a standard listener + * + * @param listener listener to add + * @return this + */ + CuratorCacheListenerBuilder forAll(CuratorCacheListener listener); + + /** + * Add a listener only for {@link Type#NODE_CREATED} + * + * @param listener listener to add + * @return this + */ + CuratorCacheListenerBuilder forCreates(Consumer<ChildData> listener); + + @FunctionalInterface + interface ChangeListener + { + void event(ChildData oldNode, ChildData node); + } + + /** + * Add a listener only for {@link Type#NODE_CHANGED} + * + * @param listener listener to add + * @return this + */ + CuratorCacheListenerBuilder forChanges(ChangeListener listener); + + /** + * Add a listener only both {@link Type#NODE_CREATED} and {@link Type#NODE_CHANGED} + * + * @param listener listener to add + * @return this + */ + CuratorCacheListenerBuilder forCreatesAndChanges(ChangeListener listener); + + /** + * Add a listener only for {@link Type#NODE_DELETED} + * + * @param listener listener to add + * @return this + */ + CuratorCacheListenerBuilder forDeletes(Consumer<ChildData> listener); + + /** + * Add a listener only for {@link CuratorCacheListener#initialized()} + * + * @param listener listener to add + * @return this + */ + CuratorCacheListenerBuilder forInitialized(Runnable listener); + + /** + * Bridge listener. You can reuse old-style {@link org.apache.curator.framework.recipes.cache.PathChildrenCacheListener}s + * with CuratorCache. IMPORTANT: the connection state methods in the listener will never be called as CuratorCache + * does not register the listener with the connection state listener container. Also note that CuratorCache + * behaves differently than {@link org.apache.curator.framework.recipes.cache.PathChildrenCache} so + * things such as event ordering will likely be different. + * + * @param client the curator client + * @param listener the listener to wrap + * @return a CuratorCacheListener that forwards to the given listener + */ + CuratorCacheListenerBuilder forPathChildrenCache(CuratorFramework client, PathChildrenCacheListener listener); + + /** + * Bridge listener. You can reuse old-style {@link org.apache.curator.framework.recipes.cache.TreeCacheListener}s + * with CuratorCache. IMPORTANT: the connection state methods in the listener will never be called as CuratorCache + * does not register the listener with the connection state listener container. Also note that CuratorCache + * behaves differently than {@link org.apache.curator.framework.recipes.cache.TreeCache} so + * things such as event ordering will likely be different. + * + * @param client the curator client + * @param listener the listener to wrap + * @return a CuratorCacheListener that forwards to the given listener + */ + CuratorCacheListenerBuilder forTreeCache(CuratorFramework client, TreeCacheListener listener); + + /** + * Bridge listener. You can reuse old-style {@link org.apache.curator.framework.recipes.cache.NodeCacheListener}s + * with CuratorCache. + * + * @param listener the listener to wrap + * @return a CuratorCacheListener that forwards to the given listener + */ + CuratorCacheListenerBuilder forNodeCache(NodeCacheListener listener); + + /** + * Make the built listener so that it only becomes active once {@link CuratorCacheListener#initialized()} has been called. + * i.e. changes that occur as the cache is initializing are not sent to the listener + */ + CuratorCacheListenerBuilder afterInitialized(); + + /** + * Build and return a new listener based on the methods that have been previously called + * + * @return new listener + */ + CuratorCacheListener build(); +} \ No newline at end of file diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/CuratorCacheListenerBuilderImpl.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/CuratorCacheListenerBuilderImpl.java new file mode 100644 index 0000000..4873868 --- /dev/null +++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/CuratorCacheListenerBuilderImpl.java @@ -0,0 +1,161 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.curator.framework.recipes.cache; + +import org.apache.curator.framework.CuratorFramework; +import java.util.ArrayList; +import java.util.List; +import java.util.function.Consumer; + +class CuratorCacheListenerBuilderImpl implements CuratorCacheListenerBuilder +{ + private final List<CuratorCacheListener> listeners = new ArrayList<>(); + private boolean afterInitializedOnly = false; + + @Override + public CuratorCacheListenerBuilder forAll(CuratorCacheListener listener) + { + listeners.add(listener); + return this; + } + + @Override + public CuratorCacheListenerBuilder forCreates(Consumer<ChildData> listener) + { + listeners.add((type, oldNode, node) -> { + if ( type == CuratorCacheListener.Type.NODE_CREATED ) + { + listener.accept(node); + } + }); + return this; + } + + @Override + public CuratorCacheListenerBuilder forChanges(ChangeListener listener) + { + listeners.add((type, oldNode, node) -> { + if ( type == CuratorCacheListener.Type.NODE_CHANGED ) + { + listener.event(oldNode, node); + } + }); + return this; + } + + @Override + public CuratorCacheListenerBuilder forCreatesAndChanges(ChangeListener listener) + { + listeners.add((type, oldNode, node) -> { + if ( (type == CuratorCacheListener.Type.NODE_CHANGED) || (type == CuratorCacheListener.Type.NODE_CREATED) ) + { + listener.event(oldNode, node); + } + }); + return this; + } + + @Override + public CuratorCacheListenerBuilder forDeletes(Consumer<ChildData> listener) + { + listeners.add((type, oldNode, node) -> { + if ( type == CuratorCacheListener.Type.NODE_DELETED ) + { + listener.accept(oldNode); + } + }); + return this; + } + + @Override + public CuratorCacheListenerBuilder forInitialized(Runnable listener) + { + CuratorCacheListener localListener = new CuratorCacheListener() + { + @Override + public void event(Type type, ChildData oldData, ChildData data) + { + // NOP + } + + @Override + public void initialized() + { + listener.run(); + } + }; + listeners.add(localListener); + return this; + } + + @Override + public CuratorCacheListenerBuilder forPathChildrenCache(CuratorFramework client, PathChildrenCacheListener listener) + { + listeners.add(new PathChildrenCacheListenerWrapper(client, listener)); + return this; + } + + @Override + public CuratorCacheListenerBuilder forTreeCache(CuratorFramework client, TreeCacheListener listener) + { + listeners.add(new TreeCacheListenerWrapper(client, listener)); + return this; + } + + @Override + public CuratorCacheListenerBuilder forNodeCache(NodeCacheListener listener) + { + listeners.add(new NodeCacheListenerWrapper(listener)); + return this; + } + + @Override + public CuratorCacheListenerBuilder afterInitialized() + { + afterInitializedOnly = true; + return this; + } + + @Override + public CuratorCacheListener build() + { + List<CuratorCacheListener> copy = new ArrayList<>(listeners); + return new CuratorCacheListener() + { + private volatile boolean isInitialized = !afterInitializedOnly; + + @Override + public void event(Type type, ChildData oldData, ChildData data) + { + if ( isInitialized ) + { + copy.forEach(l -> l.event(type, oldData, data)); + } + } + + @Override + public void initialized() + { + isInitialized = true; + copy.forEach(CuratorCacheListener::initialized); + } + }; + } +} \ No newline at end of file diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/CuratorCacheStorage.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/CuratorCacheStorage.java new file mode 100644 index 0000000..f3d870d --- /dev/null +++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/CuratorCacheStorage.java @@ -0,0 +1,114 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.curator.framework.recipes.cache; + +import java.util.AbstractMap; +import java.util.Map; +import java.util.Optional; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +/** + * Interface for maintaining data in a {@link CuratorCache} + */ +public interface CuratorCacheStorage +{ + /** + * Return a new standard storage instance + * + * @return storage instance + */ + static CuratorCacheStorage standard() { + return new StandardCuratorCacheStorage(true); + } + + /** + * Return a new storage instance that does not retain the data bytes. i.e. ChildData objects + * returned by this storage will always return {@code null} for {@link ChildData#getData()}. + * + * @return storage instance that does not retain data bytes + */ + static CuratorCacheStorage bytesNotCached() { + return new StandardCuratorCacheStorage(false); + } + + /** + * Add an entry to storage and return any previous entry at that path + * + * @param data entry to add + * @return previous entry or {@code empty()} + */ + Optional<ChildData> put(ChildData data); + + /** + * Remove the entry from storage and return any previous entry at that path + * + * @param path path to remove + * @return previous entry or {@code empty()} + */ + Optional<ChildData> remove(String path); + + /** + * Return an entry from storage + * + * @param path path to get + * @return entry or {@code empty()} + */ + Optional<ChildData> get(String path); + + /** + * Return the current number of entries in storage + * + * @return number of entries + */ + int size(); + + /** + * Return a stream over the storage entries. Note: for a standard storage instance, the stream + * behaves like a stream returned by {@link java.util.concurrent.ConcurrentHashMap#entrySet()} + * + * @return stream over entries + */ + Stream<ChildData> stream(); + + /** + * Return a stream over the storage entries that are the immediate children of the given node. + * + * @return stream over entries + */ + Stream<ChildData> streamImmediateChildren(String fromParent); + + /** + * Utility - given a stream of child nodes, build a map. Note: it is assumed that each child + * data in the stream has a unique path + * + * @param stream stream of child nodes with unique paths + * @return map + */ + static Map<String, ChildData> toMap(Stream<ChildData> stream) + { + return stream.map(data -> new AbstractMap.SimpleEntry<>(data.getPath(), data)) + .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); + } + + /** + * Reset the storage to zero entries + */ + void clear(); +} diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/NodeCacheListenerWrapper.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/NodeCacheListenerWrapper.java new file mode 100644 index 0000000..468049b --- /dev/null +++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/NodeCacheListenerWrapper.java @@ -0,0 +1,46 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.curator.framework.recipes.cache; + +import org.apache.curator.framework.recipes.cache.ChildData; +import org.apache.curator.framework.recipes.cache.NodeCacheListener; + +class NodeCacheListenerWrapper implements CuratorCacheListener +{ + private final NodeCacheListener listener; + + NodeCacheListenerWrapper(NodeCacheListener listener) + { + this.listener = listener; + } + + @Override + public void event(Type type, ChildData oldData, ChildData data) + { + try + { + listener.nodeChanged(); + } + catch ( Exception e ) + { + throw new RuntimeException(e); + } + } +} \ No newline at end of file diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/PathChildrenCacheListenerWrapper.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/PathChildrenCacheListenerWrapper.java new file mode 100644 index 0000000..a9123c1 --- /dev/null +++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/PathChildrenCacheListenerWrapper.java @@ -0,0 +1,78 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.curator.framework.recipes.cache; + +import org.apache.curator.framework.CuratorFramework; + +class PathChildrenCacheListenerWrapper implements CuratorCacheListener +{ + private final PathChildrenCacheListener listener; + private final CuratorFramework client; + + PathChildrenCacheListenerWrapper(CuratorFramework client, PathChildrenCacheListener listener) + { + this.listener = listener; + this.client = client; + } + + @Override + public void event(Type type, ChildData oldData, ChildData data) + { + switch ( type ) + { + case NODE_CREATED: + { + sendEvent(data, PathChildrenCacheEvent.Type.CHILD_ADDED); + break; + } + + case NODE_CHANGED: + { + sendEvent(data, PathChildrenCacheEvent.Type.CHILD_UPDATED); + break; + } + + case NODE_DELETED: + { + sendEvent(oldData, PathChildrenCacheEvent.Type.CHILD_REMOVED); + break; + } + } + } + + @Override + public void initialized() + { + sendEvent(null, PathChildrenCacheEvent.Type.INITIALIZED); + } + + private void sendEvent(ChildData node, PathChildrenCacheEvent.Type type) + { + PathChildrenCacheEvent event = new PathChildrenCacheEvent(type, node); + try + { + listener.childEvent(client, event); + } + catch ( Exception e ) + { + throw new RuntimeException(e); + } + } +} \ No newline at end of file diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/StandardCuratorCacheStorage.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/StandardCuratorCacheStorage.java new file mode 100644 index 0000000..bbdb21d --- /dev/null +++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/StandardCuratorCacheStorage.java @@ -0,0 +1,88 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.curator.framework.recipes.cache; + +import org.apache.curator.framework.recipes.cache.ChildData; +import org.apache.curator.utils.ZKPaths; +import java.util.Map; +import java.util.Optional; +import java.util.concurrent.ConcurrentHashMap; +import java.util.stream.Stream; + +class StandardCuratorCacheStorage implements CuratorCacheStorage +{ + private final Map<String, ChildData> dataMap; + private final boolean cacheBytes; + + StandardCuratorCacheStorage(boolean cacheBytes) + { + this.dataMap = new ConcurrentHashMap<>(); + this.cacheBytes = cacheBytes; + } + + @Override + public Optional<ChildData> put(ChildData data) + { + ChildData localData = cacheBytes ? data : new ChildData(data.getPath(), data.getStat(), null); + return Optional.ofNullable(dataMap.put(data.getPath(), localData)); + } + + @Override + public Optional<ChildData> remove(String path) + { + return Optional.ofNullable(dataMap.remove(path)); + } + + @Override + public Optional<ChildData> get(String path) + { + return Optional.ofNullable(dataMap.get(path)); + } + + @Override + public int size() + { + return dataMap.size(); + } + + @Override + public Stream<ChildData> stream() + { + return dataMap.values().stream(); + } + + @Override + public Stream<ChildData> streamImmediateChildren(String fromParent) + { + return dataMap.entrySet() + .stream() + .filter(entry -> { + ZKPaths.PathAndNode pathAndNode = ZKPaths.getPathAndNode(entry.getKey()); + return pathAndNode.getPath().equals(fromParent); + }) + .map(Map.Entry::getValue); + } + + @Override + public void clear() + { + dataMap.clear(); + } +} \ No newline at end of file diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/TreeCacheListenerWrapper.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/TreeCacheListenerWrapper.java new file mode 100644 index 0000000..570799b --- /dev/null +++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/TreeCacheListenerWrapper.java @@ -0,0 +1,81 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.curator.framework.recipes.cache; + +import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.framework.recipes.cache.ChildData; +import org.apache.curator.framework.recipes.cache.TreeCacheEvent; +import org.apache.curator.framework.recipes.cache.TreeCacheListener; + +class TreeCacheListenerWrapper implements CuratorCacheListener +{ + private final CuratorFramework client; + private final TreeCacheListener listener; + + TreeCacheListenerWrapper(CuratorFramework client, TreeCacheListener listener) + { + this.client = client; + this.listener = listener; + } + + @Override + public void event(Type type, ChildData oldData, ChildData data) + { + switch ( type ) + { + case NODE_CREATED: + { + sendEvent(data, TreeCacheEvent.Type.NODE_ADDED); + break; + } + + case NODE_CHANGED: + { + sendEvent(data, TreeCacheEvent.Type.NODE_UPDATED); + break; + } + + case NODE_DELETED: + { + sendEvent(oldData, TreeCacheEvent.Type.NODE_REMOVED); + break; + } + } + } + + @Override + public void initialized() + { + sendEvent(null, TreeCacheEvent.Type.INITIALIZED); + } + + private void sendEvent(ChildData node, TreeCacheEvent.Type type) + { + TreeCacheEvent event = new TreeCacheEvent(type, node); + try + { + listener.childEvent(client, event); + } + catch ( Exception e ) + { + throw new RuntimeException(e); + } + } +} \ No newline at end of file diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/PersistentWatcher.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/PersistentWatcher.java new file mode 100644 index 0000000..87ecb6e --- /dev/null +++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/PersistentWatcher.java @@ -0,0 +1,169 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.curator.framework.recipes.watch; + +import com.google.common.base.Preconditions; +import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.framework.api.BackgroundCallback; +import org.apache.curator.framework.listen.Listenable; +import org.apache.curator.framework.listen.StandardListenerManager; +import org.apache.curator.framework.state.ConnectionStateListener; +import org.apache.curator.utils.ThreadUtils; +import org.apache.zookeeper.AddWatchMode; +import org.apache.zookeeper.KeeperException; +import org.apache.zookeeper.Watcher; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import java.io.Closeable; +import java.util.Objects; +import java.util.concurrent.atomic.AtomicReference; + +/** + * A managed persistent watcher. The watch will be managed such that it stays set through + * connection lapses, etc. + */ +public class PersistentWatcher implements Closeable +{ + private final Logger log = LoggerFactory.getLogger(getClass()); + private final AtomicReference<State> state = new AtomicReference<>(State.LATENT); + private final StandardListenerManager<Watcher> listeners = StandardListenerManager.standard(); + private final StandardListenerManager<Runnable> resetListeners = StandardListenerManager.standard(); + private final ConnectionStateListener connectionStateListener = (client, newState) -> { + if ( newState.isConnected() ) + { + reset(); + } + }; + private final Watcher watcher = event -> listeners.forEach(w -> w.process(event)); + private final CuratorFramework client; + private final String basePath; + private final boolean recursive; + + private enum State + { + LATENT, + STARTED, + CLOSED + } + + /** + * @param client client + * @param basePath path to set the watch on + * @param recursive ZooKeeper persistent watches can optionally be recursive + */ + public PersistentWatcher(CuratorFramework client, String basePath, boolean recursive) + { + this.client = Objects.requireNonNull(client, "client cannot be null"); + this.basePath = Objects.requireNonNull(basePath, "basePath cannot be null"); + this.recursive = recursive; + } + + /** + * Start watching + */ + public void start() + { + Preconditions.checkState(state.compareAndSet(State.LATENT, State.STARTED), "Already started"); + client.getConnectionStateListenable().addListener(connectionStateListener); + reset(); + } + + /** + * Remove the watcher + */ + @Override + public void close() + { + if ( state.compareAndSet(State.STARTED, State.CLOSED) ) + { + listeners.clear(); + client.getConnectionStateListenable().removeListener(connectionStateListener); + try + { + client.watches().remove(watcher).guaranteed().inBackground().forPath(basePath); + } + catch ( Exception e ) + { + ThreadUtils.checkInterrupted(e); + log.debug(String.format("Could not remove watcher for path: %s", basePath), e); + } + } + } + + /** + * Container for setting listeners + * + * @return listener container + */ + public Listenable<Watcher> getListenable() + { + return listeners; + } + + /** + * Listeners are called when the persistent watcher has been successfully registered + * or re-registered after a connection disruption + * + * @return listener container + */ + public StandardListenerManager<Runnable> getResetListenable() + { + return resetListeners; + } + + private void reset() + { + try + { + BackgroundCallback callback = (__, event) -> { + if ( event.getResultCode() != KeeperException.Code.OK.intValue() ) + { + reset(); + } + else + { + resetListeners.forEach(Runnable::run); + } + }; + client.watchers().add().withMode(recursive ? AddWatchMode.PERSISTENT_RECURSIVE : AddWatchMode.PERSISTENT).inBackground(callback).usingWatcher(watcher).forPath(basePath); + } + catch ( Exception e ) + { + log.error("Could not reset persistent watch at path: " + basePath, e); + } + } +} diff --git a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestCuratorCache.java b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestCuratorCache.java new file mode 100644 index 0000000..8560f87 --- /dev/null +++ b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestCuratorCache.java @@ -0,0 +1,248 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.curator.framework.recipes.cache; + +import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.framework.CuratorFrameworkFactory; +import org.apache.curator.framework.state.ConnectionState; +import org.apache.curator.retry.RetryOneTime; +import org.apache.curator.test.InstanceSpec; +import org.apache.curator.test.TestingCluster; +import org.apache.curator.test.compatibility.CuratorTestBase; +import org.testng.Assert; +import org.testng.annotations.Test; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.Executor; +import java.util.concurrent.Semaphore; +import java.util.concurrent.atomic.AtomicInteger; + +import static org.apache.curator.framework.recipes.cache.CuratorCache.Options.DO_NOT_CLEAR_ON_CLOSE; +import static org.apache.curator.framework.recipes.cache.CuratorCacheListener.builder; + +@Test(groups = CuratorTestBase.zk36Group) +public class TestCuratorCache extends CuratorTestBase +{ + @Test + public void testServerLoss() throws Exception // mostly copied from TestPathChildrenCacheInCluster + { + try (TestingCluster cluster = new TestingCluster(3)) + { + cluster.start(); + + try (CuratorFramework client = CuratorFrameworkFactory.newClient(cluster.getConnectString(), timing.session(), timing.connection(), new RetryOneTime(1))) + { + client.start(); + client.create().creatingParentsIfNeeded().forPath("/test"); + + try (CuratorCache cache = CuratorCache.build(client, "/test")) + { + cache.start(); + + CountDownLatch reconnectLatch = new CountDownLatch(1); + client.getConnectionStateListenable().addListener((__, newState) -> { + if ( newState == ConnectionState.RECONNECTED ) + { + reconnectLatch.countDown(); + } + }); + CountDownLatch latch = new CountDownLatch(3); + cache.listenable().addListener((__, ___, ____) -> latch.countDown()); + + client.create().forPath("/test/one"); + client.create().forPath("/test/two"); + client.create().forPath("/test/three"); + + Assert.assertTrue(timing.awaitLatch(latch)); + + InstanceSpec connectionInstance = cluster.findConnectionInstance(client.getZookeeperClient().getZooKeeper()); + cluster.killServer(connectionInstance); + + Assert.assertTrue(timing.awaitLatch(reconnectLatch)); + + timing.sleepABit(); + + Assert.assertEquals(cache.storage().stream().count(), 4); + } + } + } + } + + @Test + public void testUpdateWhenNotCachingData() throws Exception // mostly copied from TestPathChildrenCache + { + CuratorCacheStorage storage = new StandardCuratorCacheStorage(false); + try (CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), timing.connection(), new RetryOneTime(1))) + { + client.start(); + final CountDownLatch updatedLatch = new CountDownLatch(1); + final CountDownLatch addedLatch = new CountDownLatch(1); + client.create().creatingParentsIfNeeded().forPath("/test"); + try (CuratorCache cache = CuratorCache.builder(client, "/test").withStorage(storage).build()) + { + cache.listenable().addListener(builder().forChanges((__, ___) -> updatedLatch.countDown()).build()); + cache.listenable().addListener(builder().forCreates(__ -> addedLatch.countDown()).build()); + cache.start(); + + client.create().forPath("/test/foo", "first".getBytes()); + Assert.assertTrue(timing.awaitLatch(addedLatch)); + + client.setData().forPath("/test/foo", "something new".getBytes()); + Assert.assertTrue(timing.awaitLatch(updatedLatch)); + } + } + } + + @Test + public void testAfterInitialized() throws Exception + { + try (CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), timing.connection(), new RetryOneTime(1))) + { + client.start(); + client.create().creatingParentsIfNeeded().forPath("/test"); + client.create().creatingParentsIfNeeded().forPath("/test/one"); + client.create().creatingParentsIfNeeded().forPath("/test/one/two"); + client.create().creatingParentsIfNeeded().forPath("/test/one/two/three"); + try (CuratorCache cache = CuratorCache.build(client, "/test")) + { + CountDownLatch initializedLatch = new CountDownLatch(1); + AtomicInteger eventCount = new AtomicInteger(0); + CuratorCacheListener listener = new CuratorCacheListener() + { + @Override + public void event(Type type, ChildData oldData, ChildData data) + { + eventCount.incrementAndGet(); + } + + @Override + public void initialized() + { + initializedLatch.countDown(); + } + }; + cache.listenable().addListener(builder().forAll(listener).afterInitialized().build()); + cache.start(); + Assert.assertTrue(timing.awaitLatch(initializedLatch)); + + Assert.assertEquals(initializedLatch.getCount(), 0); + Assert.assertEquals(cache.storage().size(), 4); + Assert.assertTrue(cache.storage().get("/test").isPresent()); + Assert.assertTrue(cache.storage().get("/test/one").isPresent()); + Assert.assertTrue(cache.storage().get("/test/one/two").isPresent()); + Assert.assertTrue(cache.storage().get("/test/one/two/three").isPresent()); + } + } + } + + @Test + public void testListenerBuilder() throws Exception + { + try (CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), timing.connection(), new RetryOneTime(1))) + { + client.start(); + try (CuratorCache cache = CuratorCache.build(client, "/test")) + { + Semaphore all = new Semaphore(0); + Semaphore deletes = new Semaphore(0); + Semaphore changes = new Semaphore(0); + Semaphore creates = new Semaphore(0); + Semaphore createsAndChanges = new Semaphore(0); + + CuratorCacheListener listener = builder().forAll((__, ___, ____) -> all.release()).forDeletes(__ -> deletes.release()).forChanges((__, ___) -> changes.release()).forCreates(__ -> creates.release()).forCreatesAndChanges((__, ___) -> createsAndChanges.release()).build(); + cache.listenable().addListener(listener); + cache.start(); + + client.create().forPath("/test"); + Assert.assertTrue(timing.acquireSemaphore(all, 1)); + Assert.assertTrue(timing.acquireSemaphore(creates, 1)); + Assert.assertTrue(timing.acquireSemaphore(createsAndChanges, 1)); + Assert.assertEquals(changes.availablePermits(), 0); + Assert.assertEquals(deletes.availablePermits(), 0); + + client.setData().forPath("/test", "new".getBytes()); + Assert.assertTrue(timing.acquireSemaphore(all, 1)); + Assert.assertTrue(timing.acquireSemaphore(changes, 1)); + Assert.assertTrue(timing.acquireSemaphore(createsAndChanges, 1)); + Assert.assertEquals(creates.availablePermits(), 0); + Assert.assertEquals(deletes.availablePermits(), 0); + + client.delete().forPath("/test"); + Assert.assertTrue(timing.acquireSemaphore(all, 1)); + Assert.assertTrue(timing.acquireSemaphore(deletes, 1)); + Assert.assertEquals(creates.availablePermits(), 0); + Assert.assertEquals(changes.availablePermits(), 0); + Assert.assertEquals(createsAndChanges.availablePermits(), 0); + } + } + } + + @Test + public void testOverrideExecutor() throws Exception + { + try (CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), timing.connection(), new RetryOneTime(1))) + { + client.start(); + CountDownLatch latch = new CountDownLatch(2); + Executor executor = proc -> { + latch.countDown(); + proc.run(); + }; + try ( CuratorCache cache = CuratorCache.builder(client, "/test").withExecutor(executor).build() ) + { + cache.listenable().addListener((type, oldData, data) -> latch.countDown()); + cache.start(); + + client.create().forPath("/test"); + + Assert.assertTrue(timing.awaitLatch(latch)); + } + } + } + + @Test + public void testClearOnClose() throws Exception + { + try (CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), timing.connection(), new RetryOneTime(1))) + { + CuratorCacheStorage storage; + client.start(); + + try ( CuratorCache cache = CuratorCache.builder(client, "/test").withOptions(DO_NOT_CLEAR_ON_CLOSE).build() ) + { + cache.start(); + storage = cache.storage(); + + client.create().forPath("/test", "foo".getBytes()); + client.create().forPath("/test/bar", "bar".getBytes()); + timing.sleepABit(); + } + Assert.assertEquals(storage.size(), 2); + + try ( CuratorCache cache = CuratorCache.build(client, "/test") ) + { + cache.start(); + storage = cache.storage(); + + timing.sleepABit(); + } + Assert.assertEquals(storage.size(), 0); + } + } +} diff --git a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestCuratorCacheConsistency.java b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestCuratorCacheConsistency.java new file mode 100644 index 0000000..ab9dbe4 --- /dev/null +++ b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestCuratorCacheConsistency.java @@ -0,0 +1,373 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.curator.framework.recipes.cache; + +import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.framework.CuratorFrameworkFactory; +import org.apache.curator.retry.ExponentialBackoffRetry; +import org.apache.curator.test.InstanceSpec; +import org.apache.curator.test.TestingCluster; +import org.apache.curator.test.compatibility.CuratorTestBase; +import org.apache.curator.utils.ZKPaths; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.testng.Assert; +import org.testng.annotations.Test; +import java.io.Closeable; +import java.time.Duration; +import java.time.Instant; +import java.util.AbstractMap; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.atomic.AtomicReference; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + +import static org.apache.curator.framework.recipes.cache.CuratorCache.Options.DO_NOT_CLEAR_ON_CLOSE; +import static org.apache.curator.framework.recipes.cache.CuratorCacheListener.builder; + +/** + * Randomly create nodes in a tree while a set of CuratorCaches listens. Afterwards, validate + * that the caches contain the same values as ZK itself + */ +@Test(groups = CuratorTestBase.zk36Group) +public class TestCuratorCacheConsistency extends CuratorTestBase +{ + private final Logger log = LoggerFactory.getLogger(getClass()); + private final ThreadLocalRandom random = ThreadLocalRandom.current(); + + private static final Duration testLength = Duration.ofSeconds(30); + private static final Duration thirdOfTestLength = Duration.ofMillis(testLength.toMillis() / 3); + private static final Duration sleepLength = Duration.ofMillis(5); + private static final int nodesPerLevel = 10; + private static final int clusterSize = 5; + private static final int maxServerKills = 2; + + private static final String BASE_PATH = "/test"; + + private class Client implements Closeable + { + private final CuratorFramework client; + private final CuratorCache cache; + private final int index; + private final Map<String, ChildData> listenerDataMap = new HashMap<>(); + + Client(int index, String connectionString, AtomicReference<Exception> errorSignal) + { + this.index = index; + client = buildClient(connectionString); + cache = CuratorCache.builder(client, BASE_PATH).withOptions(DO_NOT_CLEAR_ON_CLOSE).withExceptionHandler(errorSignal::set).build(); + + // listenerDataMap is a local data map that will hold values sent by listeners + // this way, the listener code can be tested for validity and consistency + CuratorCacheListener listener = builder().forCreates(node -> { + ChildData previous = listenerDataMap.put(node.getPath(), node); + if ( previous != null ) + { + errorSignal.set(new Exception(String.format("Client: %d - Create for existing node: %s", index, node.getPath()))); + } + }).forChanges((oldNode, node) -> { + ChildData previous = listenerDataMap.put(node.getPath(), node); + if ( (previous == null) || !Arrays.equals(previous.getData(), oldNode.getData()) ) + { + errorSignal.set(new Exception(String.format("Client: %d - Bad old value for change node: %s", index, node.getPath()))); + } + }).forDeletes(node -> { + ChildData previous = listenerDataMap.remove(node.getPath()); + if ( previous == null ) + { + errorSignal.set(new Exception(String.format("Client: %d - Delete for non-existent node: %s", index, node.getPath()))); + } + }).build(); + cache.listenable().addListener(listener); + } + + void start() + { + client.start(); + cache.start(); + } + + @Override + public void close() + { + cache.close(); + client.close(); + } + } + + @Test + public void testConsistencyAfterSimulation() throws Exception + { + int clientQty = random.nextInt(10, 20); + int maxDepth = random.nextInt(5, 10); + + log.info("clientQty: {}, maxDepth: {}", clientQty, maxDepth); + + List<Client> clients = Collections.emptyList(); + Map<String, String> actualTree; + + AtomicReference<Exception> errorSignal = new AtomicReference<>(); + try (TestingCluster cluster = new TestingCluster(clusterSize)) + { + cluster.start(); + + initializeBasePath(cluster); + try + { + clients = buildClients(cluster, clientQty, errorSignal); + workLoop(cluster, clients, maxDepth, errorSignal); + + log.info("Test complete - sleeping to allow events to complete"); + timing.sleepABit(); + } + finally + { + clients.forEach(Client::close); + } + + actualTree = buildActual(cluster); + } + + log.info("client qty: {}", clientQty); + + Map<Integer, List<String>> errorsList = clients.stream() + .map(client -> findErrors(client, actualTree)) + .filter(errorsEntry -> !errorsEntry.getValue().isEmpty()) + .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); + if ( !errorsList.isEmpty() ) + { + log.error("{} clients had errors", errorsList.size()); + errorsList.forEach((index, errorList) -> { + log.error("Client {}", index); + errorList.forEach(log::error); + log.error(""); + }); + + Assert.fail("Errors found"); + } + } + + // build a data map recursively from the actual values in ZK + private Map<String, String> buildActual(TestingCluster cluster) + { + Map<String, String> actual = new HashMap<>(); + try (CuratorFramework client = buildClient(cluster.getConnectString())) + { + client.start(); + buildActual(client, actual, BASE_PATH); + } + return actual; + } + + private void buildActual(CuratorFramework client, Map<String, String> actual, String fromPath) + { + try + { + byte[] bytes = client.getData().forPath(fromPath); + actual.put(fromPath, new String(bytes)); + client.getChildren().forPath(fromPath).forEach(child -> buildActual(client, actual, ZKPaths.makePath(fromPath, child))); + } + catch ( Exception e ) + { + Assert.fail("", e); + } + } + + private List<Client> buildClients(TestingCluster cluster, int clientQty, AtomicReference<Exception> errorSignal) + { + return IntStream.range(0, clientQty) + .mapToObj(index -> new Client(index, cluster.getConnectString(), errorSignal)) + .peek(Client::start) + .collect(Collectors.toList()); + } + + private void initializeBasePath(TestingCluster cluster) throws Exception + { + try (CuratorFramework client = buildClient(cluster.getConnectString())) + { + client.start(); + client.create().forPath(BASE_PATH, "".getBytes()); + } + } + + private void workLoop(TestingCluster cluster, List<Client> clients, int maxDepth, AtomicReference<Exception> errorSignal) throws Exception + { + Instant start = Instant.now(); + Instant lastServerKill = Instant.now(); + int serverKillIndex = 0; + while ( true ) + { + Duration elapsed = Duration.between(start, Instant.now()); + if ( elapsed.compareTo(testLength) >= 0 ) + { + break; + } + + Exception errorSignalException = errorSignal.get(); + if ( errorSignalException != null ) + { + Assert.fail("A client's error handler was called", errorSignalException); + } + + Duration elapsedFromLastServerKill = Duration.between(lastServerKill, Instant.now()); + if ( elapsedFromLastServerKill.compareTo(thirdOfTestLength) >= 0 ) + { + lastServerKill = Instant.now(); + if ( serverKillIndex < maxServerKills ) + { + doKillServer(cluster, serverKillIndex++); + } + } + + int thisDepth = random.nextInt(0, maxDepth); + String thisPath = randomPath(thisDepth); + CuratorFramework client = randomClient(clients); + if ( random.nextBoolean() ) + { + doDelete(client, thisPath); + } + else + { + doChange(client, thisPath); + } + + Thread.sleep(sleepLength.toMillis()); + } + } + + private void doChange(CuratorFramework client, String thisPath) + { + try + { + String thisData = Long.toString(random.nextLong()); + client.create().orSetData().creatingParentsIfNeeded().forPath(thisPath, thisData.getBytes()); + } + catch ( Exception e ) + { + Assert.fail("Could not create/set: " + thisPath); + } + } + + private void doDelete(CuratorFramework client, String thisPath) + { + if ( thisPath.equals(BASE_PATH) ) + { + return; + } + try + { + client.delete().quietly().deletingChildrenIfNeeded().forPath(thisPath); + } + catch ( Exception e ) + { + Assert.fail("Could not delete: " + thisPath); + } + } + + private void doKillServer(TestingCluster cluster, int serverKillIndex) throws Exception + { + log.info("Killing server {}", serverKillIndex); + InstanceSpec killSpec = new ArrayList<>(cluster.getInstances()).get(serverKillIndex); + cluster.killServer(killSpec); + } + + private CuratorFramework randomClient(List<Client> clients) + { + return clients.get(random.nextInt(clients.size())).client; + } + + private Map.Entry<Integer, List<String>> findErrors(Client client, Map<String, String> tree) + { + CuratorCacheStorage storage = client.cache.storage(); + List<String> errors = new ArrayList<>(); + if ( tree.size() != storage.size() ) + { + errors.add(String.format("Size mismatch. Expected: %d - Actual: %d", tree.size(), storage.size())); + } + tree.keySet().forEach(path -> { + if ( !storage.get(path).isPresent() ) + { + errors.add(String.format("Path %s in master but not client", path)); + } + }); + storage.stream().forEach(data -> { + String treeValue = tree.get(data.getPath()); + if ( treeValue != null ) + { + if ( !treeValue.equals(new String(data.getData())) ) + { + errors.add(String.format("Data at %s is not the same", data.getPath())); + } + + ChildData listenersMapData = client.listenerDataMap.get(data.getPath()); + if ( listenersMapData == null ) + { + errors.add(String.format("listenersMap missing data at: %s", data.getPath())); + } + else if ( !treeValue.equals(new String(listenersMapData.getData())) ) + { + errors.add(String.format("Data at %s in listenersMap is not the same", data.getPath())); + } + } + else + { + errors.add(String.format("Path %s in client but not master", data.getPath())); + } + }); + + client.listenerDataMap.keySet().forEach(path -> { + if ( !storage.get(path).isPresent() ) + { + errors.add(String.format("Path %s in listenersMap but not storage", path)); + } + }); + + return new AbstractMap.SimpleEntry<>(client.index, errors); + } + + private String randomPath(int depth) + { + StringBuilder str = new StringBuilder(BASE_PATH); + while ( depth-- > 0 ) + { + int levelNodeName = random.nextInt(nodesPerLevel); + str.append("/").append(levelNodeName); + } + return str.toString(); + } + + @Override + protected void createServer() + { + // do nothing - we'll be using TestingCluster instead + } + + private CuratorFramework buildClient(String connectionString) + { + ExponentialBackoffRetry retryPolicy = new ExponentialBackoffRetry(100, 100); + return CuratorFrameworkFactory.newClient(connectionString, timing.session(), timing.connection(), retryPolicy); + } +} \ No newline at end of file diff --git a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestCuratorCacheEventOrdering.java b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestCuratorCacheEventOrdering.java new file mode 100644 index 0000000..8baf2e2 --- /dev/null +++ b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestCuratorCacheEventOrdering.java @@ -0,0 +1,52 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.curator.framework.recipes.cache; + +import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.test.compatibility.CuratorTestBase; +import org.testng.annotations.Test; +import java.util.concurrent.BlockingQueue; + +@Test(groups = CuratorTestBase.zk36Group) +public class TestCuratorCacheEventOrdering extends TestEventOrdering<CuratorCache> +{ + @Override + protected int getActualQty(CuratorCache cache) + { + return cache.storage().size(); + } + + @Override + protected CuratorCache newCache(CuratorFramework client, String path, BlockingQueue<Event> events) + { + CuratorCache cache = CuratorCache.build(client, path); + cache.listenable().addListener((type, oldNode, node) -> { + if ( type == CuratorCacheListener.Type.NODE_CREATED ) + { + events.add(new Event(EventType.ADDED, node.getPath())); + } + else if ( type == CuratorCacheListener.Type.NODE_DELETED ) + { + events.add(new Event(EventType.DELETED, oldNode.getPath())); + } + }); + cache.start(); + return cache; + } +} diff --git a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestCuratorCacheWrappers.java b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestCuratorCacheWrappers.java new file mode 100644 index 0000000..4a75acf --- /dev/null +++ b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestCuratorCacheWrappers.java @@ -0,0 +1,162 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.curator.framework.recipes.cache; + +import com.google.common.collect.ImmutableSet; +import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.framework.CuratorFrameworkFactory; +import org.apache.curator.retry.RetryOneTime; +import org.apache.curator.test.compatibility.CuratorTestBase; +import org.testng.Assert; +import org.testng.annotations.Test; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.Semaphore; +import java.util.concurrent.TimeUnit; +import java.util.function.Supplier; + +import static org.apache.curator.framework.recipes.cache.CuratorCache.Options.SINGLE_NODE_CACHE; +import static org.apache.curator.framework.recipes.cache.CuratorCacheListener.builder; +import static org.apache.curator.framework.recipes.cache.CuratorCacheStorage.toMap; + +@Test(groups = CuratorTestBase.zk36Group) +public class TestCuratorCacheWrappers extends CuratorTestBase +{ + @Test + public void testPathChildrenCache() throws Exception // copied from TestPathChildrenCache#testBasics() + { + try (CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), timing.connection(), new RetryOneTime(1))) + { + client.start(); + client.create().forPath("/test"); + + final BlockingQueue<PathChildrenCacheEvent.Type> events = new LinkedBlockingQueue<>(); + try (CuratorCache cache = CuratorCache.build(client, "/test")) + { + PathChildrenCacheListener listener = (__, event) -> { + if ( event.getData().getPath().equals("/test/one") ) + { + events.offer(event.getType()); + } + }; + cache.listenable().addListener(builder().forPathChildrenCache(client, listener).build()); + cache.start(); + + client.create().forPath("/test/one", "hey there".getBytes()); + Assert.assertEquals(events.poll(timing.forWaiting().seconds(), TimeUnit.SECONDS), PathChildrenCacheEvent.Type.CHILD_ADDED); + + client.setData().forPath("/test/one", "sup!".getBytes()); + Assert.assertEquals(events.poll(timing.forWaiting().seconds(), TimeUnit.SECONDS), PathChildrenCacheEvent.Type.CHILD_UPDATED); + Assert.assertEquals(new String(cache.storage().get("/test/one").orElseThrow(AssertionError::new).getData()), "sup!"); + + client.delete().forPath("/test/one"); + Assert.assertEquals(events.poll(timing.forWaiting().seconds(), TimeUnit.SECONDS), PathChildrenCacheEvent.Type.CHILD_REMOVED); + } + } + } + + @Test + public void testTreeCache() throws Exception // copied from TestTreeCache#testBasics() + { + BaseTestTreeCache treeCacheBase = new BaseTestTreeCache(); + try (CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), timing.connection(), new RetryOneTime(1))) + { + client.start(); + client.create().forPath("/test"); + + try (CuratorCache cache = CuratorCache.build(client, "/test")) + { + cache.listenable().addListener(builder().forTreeCache(client, treeCacheBase.eventListener).build()); + cache.start(); + + treeCacheBase.assertEvent(TreeCacheEvent.Type.NODE_ADDED, "/test"); + treeCacheBase.assertEvent(TreeCacheEvent.Type.INITIALIZED); + Assert.assertEquals(toMap(cache.storage().streamImmediateChildren("/test")).keySet(), ImmutableSet.of()); + Assert.assertEquals(cache.storage().streamImmediateChildren("/t").count(), 0); + Assert.assertEquals(cache.storage().streamImmediateChildren("/testing").count(), 0); + + client.create().forPath("/test/one", "hey there".getBytes()); + treeCacheBase.assertEvent(TreeCacheEvent.Type.NODE_ADDED, "/test/one"); + Assert.assertEquals(toMap(cache.storage().streamImmediateChildren("/test")).keySet(), ImmutableSet.of("/test/one")); + Assert.assertEquals(new String(cache.storage().get("/test/one").orElseThrow(AssertionError::new).getData()), "hey there"); + Assert.assertEquals(toMap(cache.storage().streamImmediateChildren("/test/one")).keySet(), ImmutableSet.of()); + Assert.assertEquals(cache.storage().streamImmediateChildren("/test/o").count(), 0); + Assert.assertEquals(cache.storage().streamImmediateChildren("/test/onely").count(), 0); + + client.setData().forPath("/test/one", "sup!".getBytes()); + treeCacheBase.assertEvent(TreeCacheEvent.Type.NODE_UPDATED, "/test/one"); + Assert.assertEquals(toMap(cache.storage().streamImmediateChildren("/test")).keySet(), ImmutableSet.of("/test/one")); + Assert.assertEquals(new String(cache.storage().get("/test/one").orElseThrow(AssertionError::new).getData()), "sup!"); + + client.delete().forPath("/test/one"); + treeCacheBase.assertEvent(TreeCacheEvent.Type.NODE_REMOVED, "/test/one", "sup!".getBytes()); + Assert.assertEquals(toMap(cache.storage().streamImmediateChildren("/test")).keySet(), ImmutableSet.of()); + } + } + } + + @Test + public void testNodeCache() throws Exception // copied from TestNodeCache#testBasics() + { + try ( CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), timing.connection(), new RetryOneTime(1)) ) + { + client.start(); + client.create().forPath("/test"); + + try (CuratorCache cache = CuratorCache.build(client, "/test/node", SINGLE_NODE_CACHE)) + { + Supplier<ChildData> getRootData = () -> cache.storage().get("/test/node").orElseThrow(() -> new AssertionError("is not present")); + cache.start(); + + final Semaphore semaphore = new Semaphore(0); + cache.listenable().addListener(builder().forNodeCache(semaphore::release).build()); + try + { + getRootData.get(); + Assert.fail("Should have thrown"); + } + catch ( AssertionError expected ) + { + // expected + } + + client.create().forPath("/test/node", "a".getBytes()); + Assert.assertTrue(timing.acquireSemaphore(semaphore)); + Assert.assertEquals(getRootData.get().getData(), "a".getBytes()); + + client.setData().forPath("/test/node", "b".getBytes()); + Assert.assertTrue(timing.acquireSemaphore(semaphore)); + Assert.assertEquals(getRootData.get().getData(), "b".getBytes()); + + client.delete().forPath("/test/node"); + Assert.assertTrue(timing.acquireSemaphore(semaphore)); + try + { + getRootData.get(); + Assert.fail("Should have thrown"); + } + catch ( AssertionError expected ) + { + // expected + } + } + } + } +} diff --git a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestWrappedNodeCache.java b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestWrappedNodeCache.java new file mode 100644 index 0000000..3e81b63 --- /dev/null +++ b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestWrappedNodeCache.java @@ -0,0 +1,172 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.curator.framework.recipes.cache; + +import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.framework.CuratorFrameworkFactory; +import org.apache.curator.framework.imps.TestCleanState; +import org.apache.curator.framework.state.ConnectionState; +import org.apache.curator.retry.RetryOneTime; +import org.apache.curator.test.compatibility.CuratorTestBase; +import org.apache.curator.utils.CloseableUtils; +import org.apache.curator.utils.Compatibility; +import org.testng.Assert; +import org.testng.annotations.Test; +import java.util.Optional; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.Semaphore; +import java.util.function.Supplier; + +import static org.apache.curator.framework.recipes.cache.CuratorCacheListener.builder; + +@Test(groups = CuratorTestBase.zk36Group) +public class TestWrappedNodeCache extends CuratorTestBase +{ + @Test + public void testDeleteThenCreate() throws Exception + { + CuratorCache cache = null; + CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new RetryOneTime(1)); + try + { + client.start(); + client.create().creatingParentsIfNeeded().forPath("/test/foo", "one".getBytes()); + + final Semaphore semaphore = new Semaphore(0); + cache = CuratorCache.build(client, "/test/foo"); + NodeCacheListener listener = semaphore::release; + cache.listenable().addListener(builder().forNodeCache(listener).build()); + + Supplier<Optional<ChildData>> rootData = getRootDataProc(cache, "/test/foo"); + + cache.start(); + Assert.assertTrue(timing.acquireSemaphore(semaphore)); + + Assert.assertTrue(rootData.get().isPresent()); + Assert.assertEquals(rootData.get().get().getData(), "one".getBytes()); + + client.delete().forPath("/test/foo"); + Assert.assertTrue(timing.acquireSemaphore(semaphore)); + client.create().forPath("/test/foo", "two".getBytes()); + Assert.assertTrue(timing.acquireSemaphore(semaphore)); + + Assert.assertTrue(rootData.get().isPresent()); + Assert.assertEquals(rootData.get().get().getData(), "two".getBytes()); + } + finally + { + CloseableUtils.closeQuietly(cache); + TestCleanState.closeAndTestClean(client); + } + } + + @Test + public void testKilledSession() throws Exception + { + CuratorCache cache = null; + CuratorFramework client = null; + try + { + client = CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), timing.connection(), new RetryOneTime(1)); + client.start(); + client.create().creatingParentsIfNeeded().forPath("/test/node", "start".getBytes()); + + CountDownLatch lostLatch = new CountDownLatch(1); + client.getConnectionStateListenable().addListener((__, newState) -> { + if ( newState == ConnectionState.LOST ) + { + lostLatch.countDown(); + } + }); + + cache = CuratorCache.build(client,"/test/node"); + + Semaphore latch = new Semaphore(0); + NodeCacheListener listener = latch::release; + cache.listenable().addListener(builder().forNodeCache(listener).build()); + + Supplier<Optional<ChildData>> rootData = getRootDataProc(cache, "/test/node"); + + cache.start(); + Assert.assertTrue(timing.acquireSemaphore(latch)); + + Compatibility.injectSessionExpiration(client.getZookeeperClient().getZooKeeper()); + Assert.assertTrue(timing.awaitLatch(lostLatch)); + + Assert.assertTrue(rootData.get().isPresent()); + Assert.assertEquals(rootData.get().get().getData(), "start".getBytes()); + + client.setData().forPath("/test/node", "new data".getBytes()); + Assert.assertTrue(timing.acquireSemaphore(latch)); + Assert.assertTrue(rootData.get().isPresent()); + Assert.assertEquals(rootData.get().get().getData(), "new data".getBytes()); + } + finally + { + CloseableUtils.closeQuietly(cache); + TestCleanState.closeAndTestClean(client); + } + } + + @SuppressWarnings("ConstantConditions") + @Test + public void testBasics() throws Exception + { + CuratorCache cache = null; + CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), timing.connection(), new RetryOneTime(1)); + try + { + client.start(); + client.create().forPath("/test"); + + cache = CuratorCache.build(client, "/test/node"); + cache.start(); + + Supplier<Optional<ChildData>> rootData = getRootDataProc(cache, "/test/node"); + + final Semaphore semaphore = new Semaphore(0); + NodeCacheListener listener = semaphore::release; + cache.listenable().addListener(builder().forNodeCache(listener).build()); + + Assert.assertNull(rootData.get().orElse(null)); + + client.create().forPath("/test/node", "a".getBytes()); + Assert.assertTrue(timing.acquireSemaphore(semaphore)); + Assert.assertEquals(rootData.get().orElse(null).getData(), "a".getBytes()); + + client.setData().forPath("/test/node", "b".getBytes()); + Assert.assertTrue(timing.acquireSemaphore(semaphore)); + Assert.assertEquals(rootData.get().orElse(null).getData(), "b".getBytes()); + + client.delete().forPath("/test/node"); + Assert.assertTrue(timing.acquireSemaphore(semaphore)); + Assert.assertNull(rootData.get().orElse(null)); + } + finally + { + CloseableUtils.closeQuietly(cache); + TestCleanState.closeAndTestClean(client); + } + } + + private Supplier<Optional<ChildData>> getRootDataProc(CuratorCache cache, String rootPath) + { + return () -> cache.storage().get(rootPath); + } +} diff --git a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/watch/TestPersistentWatcher.java b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/watch/TestPersistentWatcher.java new file mode 100644 index 0000000..534c365 --- /dev/null +++ b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/watch/TestPersistentWatcher.java @@ -0,0 +1,105 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.curator.framework.recipes.watch; + +import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.framework.CuratorFrameworkFactory; +import org.apache.curator.framework.state.ConnectionState; +import org.apache.curator.retry.RetryOneTime; +import org.apache.curator.test.compatibility.CuratorTestBase; +import org.apache.zookeeper.WatchedEvent; +import org.apache.zookeeper.Watcher; +import org.testng.Assert; +import org.testng.annotations.Test; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.LinkedBlockingQueue; + +public class TestPersistentWatcher extends CuratorTestBase +{ + @Test + public void testConnectionLostRecursive() throws Exception + { + internalTest(true); + } + + @Test + public void testConnectionLost() throws Exception + { + internalTest(false); + } + + private void internalTest(boolean recursive) throws Exception + { + try ( CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), timing.connection(), new RetryOneTime(1)) ) + { + CountDownLatch lostLatch = new CountDownLatch(1); + CountDownLatch reconnectedLatch = new CountDownLatch(1); + client.start(); + client.getConnectionStateListenable().addListener((__, newState) -> { + if ( newState == ConnectionState.LOST ) + { + lostLatch.countDown(); + } + else if ( newState == ConnectionState.RECONNECTED ) + { + reconnectedLatch.countDown(); + } + }); + + try ( PersistentWatcher persistentWatcher = new PersistentWatcher(client, "/top/main", recursive) ) + { + persistentWatcher.start(); + + BlockingQueue<WatchedEvent> events = new LinkedBlockingQueue<>(); + persistentWatcher.getListenable().addListener(events::add); + + client.create().creatingParentsIfNeeded().forPath("/top/main/a"); + Assert.assertEquals(timing.takeFromQueue(events).getPath(), "/top/main"); + if ( recursive ) + { + Assert.assertEquals(timing.takeFromQueue(events).getPath(), "/top/main/a"); + } + else + { + Assert.assertEquals(timing.takeFromQueue(events).getPath(), "/top/main"); // child added + } + + server.stop(); + Assert.assertEquals(timing.takeFromQueue(events).getState(), Watcher.Event.KeeperState.Disconnected); + Assert.assertTrue(timing.awaitLatch(lostLatch)); + + server.restart(); + Assert.assertTrue(timing.awaitLatch(reconnectedLatch)); + + timing.sleepABit(); // time to allow watcher to get reset + events.clear(); + + if ( recursive ) + { + client.setData().forPath("/top/main/a", "foo".getBytes()); + Assert.assertEquals(timing.takeFromQueue(events).getType(), Watcher.Event.EventType.NodeDataChanged); + } + client.setData().forPath("/top/main", "bar".getBytes()); + Assert.assertEquals(timing.takeFromQueue(events).getPath(), "/top/main"); + } + } + } +} \ No newline at end of file
