Repository: curator Updated Branches: refs/heads/persistent-watch 9a05598c4 -> d7bf1a246
1. Persistent watches are now optionally recursive - support this. 2. Added bridge classes to help TreeCache users switch to CuratorCache Project: http://git-wip-us.apache.org/repos/asf/curator/repo Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/a83e3e0b Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/a83e3e0b Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/a83e3e0b Branch: refs/heads/persistent-watch Commit: a83e3e0b5f1a8ea031fcf2cb32f745880ecaa8b3 Parents: 9a05598 Author: randgalt <[email protected]> Authored: Wed Aug 23 08:04:01 2017 +0200 Committer: randgalt <[email protected]> Committed: Wed Aug 23 08:04:01 2017 +0200 ---------------------------------------------------------------------- .../api/AddPersistentWatchBuilder.java | 11 +- .../api/AddPersistentWatchBuilder2.java | 25 + .../imps/AddPersistentWatchBuilderImpl.java | 12 +- .../framework/recipes/cache/ListenerBridge.java | 197 ++++++++ .../framework/recipes/cache/SelectorBridge.java | 55 ++ .../recipes/watch/InternalCuratorCache.java | 2 +- .../recipes/watch/PersistentWatcher.java | 9 +- .../recipes/cache/BaseTestTreeCache.java | 28 +- .../framework/recipes/cache/TestTreeCache.java | 2 +- .../recipes/cache/TestTreeCacheBridge.java | 500 +++++++++++++++++++ .../cache/TestTreeCacheBridgeRandomTree.java | 224 +++++++++ .../recipes/cache/TestTreeCacheRandomTree.java | 2 +- .../org/apache/curator/test/WatchersDebug.java | 9 + 13 files changed, 1064 insertions(+), 12 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/curator/blob/a83e3e0b/curator-framework/src/main/java/org/apache/curator/framework/api/AddPersistentWatchBuilder.java ---------------------------------------------------------------------- diff --git a/curator-framework/src/main/java/org/apache/curator/framework/api/AddPersistentWatchBuilder.java b/curator-framework/src/main/java/org/apache/curator/framework/api/AddPersistentWatchBuilder.java index 4927afc..057919e 100644 --- a/curator-framework/src/main/java/org/apache/curator/framework/api/AddPersistentWatchBuilder.java +++ b/curator-framework/src/main/java/org/apache/curator/framework/api/AddPersistentWatchBuilder.java @@ -18,8 +18,13 @@ */ package org.apache.curator.framework.api; -public interface AddPersistentWatchBuilder extends - Backgroundable<AddPersistentWatchable<Pathable<Void>>>, - AddPersistentWatchable<Pathable<Void>> +public interface AddPersistentWatchBuilder extends AddPersistentWatchBuilder2 { + /** + * ZooKeeper persistent watches can optionally be recursive. See + * {@link org.apache.zookeeper.ZooKeeper#addPersistentWatch(String, org.apache.zookeeper.Watcher, boolean)} + * + * @return this + */ + AddPersistentWatchBuilder2 recursive(); } http://git-wip-us.apache.org/repos/asf/curator/blob/a83e3e0b/curator-framework/src/main/java/org/apache/curator/framework/api/AddPersistentWatchBuilder2.java ---------------------------------------------------------------------- diff --git a/curator-framework/src/main/java/org/apache/curator/framework/api/AddPersistentWatchBuilder2.java b/curator-framework/src/main/java/org/apache/curator/framework/api/AddPersistentWatchBuilder2.java new file mode 100644 index 0000000..ce1ffed --- /dev/null +++ b/curator-framework/src/main/java/org/apache/curator/framework/api/AddPersistentWatchBuilder2.java @@ -0,0 +1,25 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.curator.framework.api; + +public interface AddPersistentWatchBuilder2 extends + Backgroundable<AddPersistentWatchable<Pathable<Void>>>, + AddPersistentWatchable<Pathable<Void>> +{ +} http://git-wip-us.apache.org/repos/asf/curator/blob/a83e3e0b/curator-framework/src/main/java/org/apache/curator/framework/imps/AddPersistentWatchBuilderImpl.java ---------------------------------------------------------------------- diff --git a/curator-framework/src/main/java/org/apache/curator/framework/imps/AddPersistentWatchBuilderImpl.java b/curator-framework/src/main/java/org/apache/curator/framework/imps/AddPersistentWatchBuilderImpl.java index bf4dfb6..56f8f79 100644 --- a/curator-framework/src/main/java/org/apache/curator/framework/imps/AddPersistentWatchBuilderImpl.java +++ b/curator-framework/src/main/java/org/apache/curator/framework/imps/AddPersistentWatchBuilderImpl.java @@ -21,6 +21,7 @@ package org.apache.curator.framework.imps; import org.apache.curator.RetryLoop; import org.apache.curator.drivers.OperationTrace; import org.apache.curator.framework.api.AddPersistentWatchBuilder; +import org.apache.curator.framework.api.AddPersistentWatchBuilder2; import org.apache.curator.framework.api.AddPersistentWatchable; import org.apache.curator.framework.api.BackgroundCallback; import org.apache.curator.framework.api.CuratorEvent; @@ -37,6 +38,7 @@ class AddPersistentWatchBuilderImpl implements AddPersistentWatchBuilder, Pathab private final CuratorFrameworkImpl client; private Watching watching = null; private Backgrounding backgrounding = new Backgrounding(); + private boolean recursive = false; AddPersistentWatchBuilderImpl(CuratorFrameworkImpl client) { @@ -51,6 +53,13 @@ class AddPersistentWatchBuilderImpl implements AddPersistentWatchBuilder, Pathab } @Override + public AddPersistentWatchBuilder2 recursive() + { + recursive = true; + return this; + } + + @Override public Pathable<Void> usingWatcher(Watcher watcher) { watching = new Watching(client, watcher); @@ -125,6 +134,7 @@ class AddPersistentWatchBuilderImpl implements AddPersistentWatchBuilder, Pathab ( fixedPath, watching.getWatcher(path), + recursive, new AsyncCallback.VoidCallback() { @Override @@ -156,7 +166,7 @@ class AddPersistentWatchBuilderImpl implements AddPersistentWatchBuilder, Pathab @Override public Void call() throws Exception { - client.getZooKeeper().addPersistentWatch(fixedPath, watching.getWatcher(path)); + client.getZooKeeper().addPersistentWatch(fixedPath, watching.getWatcher(path), recursive); return null; } } http://git-wip-us.apache.org/repos/asf/curator/blob/a83e3e0b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/ListenerBridge.java ---------------------------------------------------------------------- diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/ListenerBridge.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/ListenerBridge.java new file mode 100644 index 0000000..8a2d665 --- /dev/null +++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/ListenerBridge.java @@ -0,0 +1,197 @@ +package org.apache.curator.framework.recipes.cache; + +import com.google.common.base.Preconditions; +import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.framework.listen.Listenable; +import org.apache.curator.framework.recipes.watch.CacheEvent; +import org.apache.curator.framework.recipes.watch.CacheListener; +import org.apache.curator.framework.recipes.watch.CachedNode; +import org.apache.curator.framework.state.ConnectionState; +import org.apache.curator.framework.state.ConnectionStateListener; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import java.util.Objects; +import java.util.concurrent.atomic.AtomicBoolean; + +/** + * <p> + * Utility to bridge old TreeCache {@link org.apache.curator.framework.recipes.cache.TreeCacheListener} + * instances with new {@link org.apache.curator.framework.recipes.watch.CacheListener} so that you can + * use existing listeners without rewriting them. + * </p> + * + * <p> + * Create a ListenerBridge from your existing TreeCacheListener. You can then call {@link #add()} + * to add the bridge listener to a CuratorCache. + * </p> + */ +public class ListenerBridge implements CacheListener, ConnectionStateListener +{ + private final Logger log = LoggerFactory.getLogger(getClass()); + private final CuratorFramework client; + private final Listenable<CacheListener> listenable; + private final TreeCacheListener listener; + private final AtomicBoolean added = new AtomicBoolean(false); + + /** + * Builder style constructor + * + * @param client the client + * @param listenable CuratorCache listener container + * @param listener the old TreeCacheListener + * @return listener bridge + */ + public static ListenerBridge wrap(CuratorFramework client, Listenable<CacheListener> listenable, TreeCacheListener listener) + { + return new ListenerBridge(client, listenable, listener); + } + + /** + * @param client the client + * @param listenable CuratorCache listener container + * @param listener the old TreeCacheListener + */ + public ListenerBridge(CuratorFramework client, Listenable<CacheListener> listenable, TreeCacheListener listener) + { + this.client = Objects.requireNonNull(client, "client cannot be null"); + this.listenable = Objects.requireNonNull(listenable, "listenable cannot be null"); + this.listener = Objects.requireNonNull(listener, "listener cannot be null"); + } + + @Override + public void process(CacheEvent event, String path, CachedNode affectedNode) + { + try + { + listener.childEvent(client, toEvent(event, path, affectedNode)); + } + catch ( Exception e ) + { + handleException(e); + } + } + + @Override + public void stateChanged(CuratorFramework client, ConnectionState newState) + { + TreeCacheEvent.Type type = toType(newState); + if ( type != null ) + { + try + { + listener.childEvent(client, new TreeCacheEvent(type, null)); + } + catch ( Exception e ) + { + handleException(e); + } + } + } + + /** + * Add this listener to the listener container. Note: this method is not idempotent + */ + public void add() + { + Preconditions.checkState(added.compareAndSet(false, true), "Already added"); + client.getConnectionStateListenable().addListener(this); + listenable.addListener(this); + } + + /** + * Remove this listener from the listener container + */ + public void remove() + { + if ( added.compareAndSet(true, false) ) + { + client.getConnectionStateListenable().removeListener(this); + listenable.removeListener(this); + } + } + + /** + * Utility - convert a new CuratorCache event to an old TreeCache event + * + * @param event event to convert + * @return new value + */ + public static TreeCacheEvent.Type toType(CacheEvent event) + { + switch ( event ) + { + case NODE_CREATED: + return TreeCacheEvent.Type.NODE_ADDED; + + case NODE_DELETED: + return TreeCacheEvent.Type.NODE_REMOVED; + + case NODE_CHANGED: + return TreeCacheEvent.Type.NODE_UPDATED; + + case CACHE_REFRESHED: + return TreeCacheEvent.Type.INITIALIZED; + } + + throw new IllegalStateException("Unknown event: " + event); + } + + /** + * Utility - convert a connection state event to an old TreeCache event + * + * @param state event to convert + * @return new value or null if there is no corresponding TreeCache value + */ + public static TreeCacheEvent.Type toType(ConnectionState state) + { + switch ( state ) + { + case RECONNECTED: + return TreeCacheEvent.Type.CONNECTION_RECONNECTED; + + case SUSPENDED: + return TreeCacheEvent.Type.CONNECTION_SUSPENDED; + + case LOST: + return TreeCacheEvent.Type.CONNECTION_LOST; + } + + return null; + } + + /** + * Convert Curator Cache listener values to TreeCache data + * + * @param path the affected path (can be null) + * @param affectedNode the node (can be null) + * @return TreeCache data or null + */ + public static ChildData toData(String path, CachedNode affectedNode) + { + if ( (path != null) && (affectedNode != null) && (affectedNode.getData() != null) ) + { + return new ChildData(path, affectedNode.getStat(), affectedNode.getData()); + } + return null; + } + + /** + * Generate a TreeCacheEvent from Curator cache event data + * + * @param event event type + * @param path affected path (can be null) + * @param affectedNode affected data (can be null) + * @return event + */ + public static TreeCacheEvent toEvent(CacheEvent event, String path, CachedNode affectedNode) + { + TreeCacheEvent.Type type = toType(event); + ChildData data = (event == CacheEvent.CACHE_REFRESHED) ? null : toData(path, affectedNode); + return new TreeCacheEvent(type, data); + } + + protected void handleException(Exception e) + { + log.error("Unhandled exception in listener", e); + } +} http://git-wip-us.apache.org/repos/asf/curator/blob/a83e3e0b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/SelectorBridge.java ---------------------------------------------------------------------- diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/SelectorBridge.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/SelectorBridge.java new file mode 100644 index 0000000..0c6af08 --- /dev/null +++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/SelectorBridge.java @@ -0,0 +1,55 @@ +package org.apache.curator.framework.recipes.cache; + +import org.apache.curator.framework.recipes.watch.CacheAction; +import org.apache.curator.framework.recipes.watch.CacheSelector; +import java.util.Objects; + +/** + * Utility to bridge an old TreeCacheSelector to a new CuratorCache selector + */ +public class SelectorBridge implements CacheSelector +{ + private final TreeCacheSelector selector; + private final CacheAction action; + + /** + * Builder style constructor + * + * @param selector the old TreeCacheSelector to bridge + * @return bridged selector + */ + public static SelectorBridge wrap(TreeCacheSelector selector) + { + return new SelectorBridge(selector); + } + + /** + * @param selector the old TreeCacheSelector to bridge + */ + public SelectorBridge(TreeCacheSelector selector) + { + this(selector, CacheAction.STAT_AND_DATA); + } + + /** + * @param selector the old TreeCacheSelector to bridge + * @param action value to return for active paths + */ + public SelectorBridge(TreeCacheSelector selector, CacheAction action) + { + this.selector = Objects.requireNonNull(selector, "selector cannot be null"); + this.action = Objects.requireNonNull(action, "action cannot be null"); + } + + @Override + public boolean traverseChildren(String basePath, String fullPath) + { + return selector.traverseChildren(fullPath); + } + + @Override + public CacheAction actionForPath(String basePath, String fullPath) + { + return selector.acceptChild(fullPath) ? action : CacheAction.NOT_STORED; + } +} http://git-wip-us.apache.org/repos/asf/curator/blob/a83e3e0b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/InternalCuratorCache.java ---------------------------------------------------------------------- diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/InternalCuratorCache.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/InternalCuratorCache.java index e2b1bf3..edd08b5 100644 --- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/InternalCuratorCache.java +++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/InternalCuratorCache.java @@ -71,7 +71,7 @@ class InternalCuratorCache extends CuratorCacheBase implements Watcher this.basePath = Objects.requireNonNull(path, "path cannot be null"); this.cacheSelector = Objects.requireNonNull(cacheSelector, "cacheSelector cannot be null"); this.sortChildren = sortChildren; - watcher = new PersistentWatcher(client, path) + watcher = new PersistentWatcher(client, path, true) { @Override protected void noteWatcherReset() http://git-wip-us.apache.org/repos/asf/curator/blob/a83e3e0b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/PersistentWatcher.java ---------------------------------------------------------------------- diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/PersistentWatcher.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/PersistentWatcher.java index 310478a..3884a69 100644 --- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/PersistentWatcher.java +++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/PersistentWatcher.java @@ -21,6 +21,7 @@ package org.apache.curator.framework.recipes.watch; import com.google.common.base.Function; import com.google.common.base.Preconditions; import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.framework.api.AddPersistentWatchBuilder2; import org.apache.curator.framework.api.BackgroundCallback; import org.apache.curator.framework.api.CuratorEvent; import org.apache.curator.framework.api.CuratorEventType; @@ -83,6 +84,7 @@ public class PersistentWatcher implements Closeable }; private final CuratorFramework client; private final String basePath; + private final boolean recursive; private final BackgroundCallback backgroundCallback = new BackgroundCallback() { @Override @@ -115,11 +117,13 @@ public class PersistentWatcher implements Closeable /** * @param client client * @param basePath path to set the watch on + * @param recursive ZooKeeper persistent watches can optionally be recursive */ - public PersistentWatcher(CuratorFramework client, String basePath) + public PersistentWatcher(CuratorFramework client, String basePath, boolean recursive) { this.client = Objects.requireNonNull(client, "client cannot be null"); this.basePath = Objects.requireNonNull(basePath, "basePath cannot be null"); + this.recursive = recursive; } /** @@ -176,7 +180,8 @@ public class PersistentWatcher implements Closeable { try { - client.addPersistentWatch().inBackground(backgroundCallback).usingWatcher(watcher).forPath(basePath); + AddPersistentWatchBuilder2 builder = recursive ? client.addPersistentWatch().recursive() : client.addPersistentWatch(); + builder.inBackground(backgroundCallback).usingWatcher(watcher).forPath(basePath); } catch ( Exception e ) { http://git-wip-us.apache.org/repos/asf/curator/blob/a83e3e0b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/BaseTestTreeCache.java ---------------------------------------------------------------------- diff --git a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/BaseTestTreeCache.java b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/BaseTestTreeCache.java index b984624..9cbec98 100644 --- a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/BaseTestTreeCache.java +++ b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/BaseTestTreeCache.java @@ -23,23 +23,25 @@ import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; import org.apache.curator.framework.api.UnhandledErrorListener; import org.apache.curator.framework.imps.TestCleanState; +import org.apache.curator.framework.recipes.watch.CuratorCache; +import org.apache.curator.framework.recipes.watch.CuratorCacheBuilder; import org.apache.curator.retry.RetryOneTime; import org.apache.curator.test.BaseClassForTests; import org.apache.curator.test.Timing; -import org.apache.curator.utils.CloseableExecutorService; import org.apache.curator.utils.CloseableUtils; import org.testng.Assert; import org.testng.annotations.AfterMethod; import org.testng.annotations.BeforeMethod; +import java.io.Closeable; import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; -public class BaseTestTreeCache extends BaseClassForTests +public class BaseTestTreeCache<T extends Closeable> extends BaseClassForTests { CuratorFramework client; - TreeCache cache; + T cache; protected final AtomicBoolean hadBackgroundException = new AtomicBoolean(false); private final BlockingQueue<TreeCacheEvent> events = new LinkedBlockingQueue<TreeCacheEvent>(); private final Timing timing = new Timing(); @@ -86,6 +88,26 @@ public class BaseTestTreeCache extends BaseClassForTests } /** + * Construct a CuratorCache that records exceptions and automatically listens using the bridge. + */ + protected CuratorCache newCacheWithListeners(CuratorFramework client, String path) + { + CuratorCache result = CuratorCacheBuilder.builder(client, path).build(); + ListenerBridge.wrap(client, result.getListenable(), eventListener).add(); + return result; + } + + /** + * Finish constructing a CuratorCache that records exceptions and automatically listens. + */ + protected CuratorCache buildCacheWithListeners(CuratorCacheBuilder builder) + { + CuratorCache result = builder.build(); + ListenerBridge.wrap(client, result.getListenable(), eventListener).add(); + return result; + } + + /** * Finish constructing a TreeCache that records exceptions and automatically listens. */ protected TreeCache buildWithListeners(TreeCache.Builder builder) http://git-wip-us.apache.org/repos/asf/curator/blob/a83e3e0b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestTreeCache.java ---------------------------------------------------------------------- diff --git a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestTreeCache.java b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestTreeCache.java index ebaf43e..ee5e918 100644 --- a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestTreeCache.java +++ b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestTreeCache.java @@ -33,7 +33,7 @@ import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; -public class TestTreeCache extends BaseTestTreeCache +public class TestTreeCache extends BaseTestTreeCache<TreeCache> { @Test public void testSelector() throws Exception http://git-wip-us.apache.org/repos/asf/curator/blob/a83e3e0b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestTreeCacheBridge.java ---------------------------------------------------------------------- diff --git a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestTreeCacheBridge.java b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestTreeCacheBridge.java new file mode 100644 index 0000000..049daa5 --- /dev/null +++ b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestTreeCacheBridge.java @@ -0,0 +1,500 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.curator.framework.recipes.cache; + +import com.google.common.collect.ImmutableSet; +import org.apache.curator.framework.recipes.cache.TreeCacheEvent.Type; +import org.apache.curator.framework.recipes.watch.CacheSelectors; +import org.apache.curator.framework.recipes.watch.CuratorCache; +import org.apache.curator.framework.recipes.watch.CuratorCacheBuilder; +import org.apache.curator.test.compatibility.KillSession2; +import org.apache.zookeeper.CreateMode; +import org.testng.Assert; +import org.testng.annotations.Test; + +public class TestTreeCacheBridge extends BaseTestTreeCache<CuratorCache> +{ + @Test + public void testSelector() throws Exception + { + client.create().forPath("/root"); + client.create().forPath("/root/n1-a"); + client.create().forPath("/root/n1-b"); + client.create().forPath("/root/n1-b/n2-a"); + client.create().forPath("/root/n1-b/n2-b"); + client.create().forPath("/root/n1-b/n2-b/n3-a"); + client.create().forPath("/root/n1-c"); + client.create().forPath("/root/n1-d"); + + TreeCacheSelector selector = new TreeCacheSelector() + { + @Override + public boolean traverseChildren(String fullPath) + { + return !fullPath.equals("/root/n1-b/n2-b"); + } + + @Override + public boolean acceptChild(String fullPath) + { + return !fullPath.equals("/root/n1-c"); + } + }; + cache = buildCacheWithListeners(CuratorCacheBuilder.builder(client, "/root").withCacheSelector(SelectorBridge.wrap(selector))); + cache.start(); + + assertEvent(Type.NODE_ADDED, "/root"); + assertEvent(Type.NODE_ADDED, "/root/n1-a"); + assertEvent(Type.NODE_ADDED, "/root/n1-b"); + assertEvent(Type.NODE_ADDED, "/root/n1-d"); + assertEvent(Type.NODE_ADDED, "/root/n1-b/n2-a"); + assertEvent(Type.NODE_ADDED, "/root/n1-b/n2-b"); + assertEvent(Type.INITIALIZED); + assertNoMoreEvents(); + } + + @Test + public void testStartup() throws Exception + { + client.create().forPath("/test"); + client.create().forPath("/test/1", "one".getBytes()); + client.create().forPath("/test/2", "two".getBytes()); + client.create().forPath("/test/3", "three".getBytes()); + client.create().forPath("/test/2/sub", "two-sub".getBytes()); + + cache = newCacheWithListeners(client, "/test"); + cache.start(); + assertEvent(Type.NODE_ADDED, "/test"); + assertEvent(Type.NODE_ADDED, "/test/1", "one".getBytes()); + assertEvent(Type.NODE_ADDED, "/test/2", "two".getBytes()); + assertEvent(Type.NODE_ADDED, "/test/3", "three".getBytes()); + assertEvent(Type.NODE_ADDED, "/test/2/sub", "two-sub".getBytes()); + assertEvent(Type.INITIALIZED); + assertNoMoreEvents(); + + Assert.assertEquals(cache.childrenAtPath("/test").keySet(), ImmutableSet.of("1", "2", "3")); + Assert.assertEquals(cache.childrenAtPath("/test/1").keySet(), ImmutableSet.of()); + Assert.assertEquals(cache.childrenAtPath("/test/2").keySet(), ImmutableSet.of("sub")); + Assert.assertNull(cache.get("/test/non_exist")); + } + + @Test + public void testStartEmpty() throws Exception + { + cache = newCacheWithListeners(client, "/test"); + cache.start(); + assertEvent(Type.INITIALIZED); + + client.create().forPath("/test"); + assertEvent(Type.NODE_ADDED, "/test"); + assertNoMoreEvents(); + } + + @Test + public void testStartEmptyDeeper() throws Exception + { + cache = newCacheWithListeners(client, "/test/foo/bar"); + cache.start(); + assertEvent(Type.INITIALIZED); + + client.create().creatingParentsIfNeeded().forPath("/test/foo"); + assertNoMoreEvents(); + client.create().forPath("/test/foo/bar"); + assertEvent(Type.NODE_ADDED, "/test/foo/bar"); + assertNoMoreEvents(); + } + + @Test + public void testDepth0() throws Exception + { + client.create().forPath("/test"); + client.create().forPath("/test/1", "one".getBytes()); + client.create().forPath("/test/2", "two".getBytes()); + client.create().forPath("/test/3", "three".getBytes()); + client.create().forPath("/test/2/sub", "two-sub".getBytes()); + + CuratorCacheBuilder builder = CuratorCacheBuilder.builder(client, "/test").withCacheSelector(CacheSelectors.maxDepth(0)); + cache = buildCacheWithListeners(builder); + cache.start(); + assertEvent(Type.NODE_ADDED, "/test"); + assertEvent(Type.INITIALIZED); + assertNoMoreEvents(); + + Assert.assertEquals(cache.childrenAtPath("/test").keySet(), ImmutableSet.of()); + Assert.assertNull(cache.get("/test/1")); + Assert.assertNull(cache.get("/test/1")); + Assert.assertNull(cache.get("/test/non_exist")); + } + + @Test + public void testDepth1() throws Exception + { + client.create().forPath("/test"); + client.create().forPath("/test/1", "one".getBytes()); + client.create().forPath("/test/2", "two".getBytes()); + client.create().forPath("/test/3", "three".getBytes()); + client.create().forPath("/test/2/sub", "two-sub".getBytes()); + + CuratorCacheBuilder builder = CuratorCacheBuilder.builder(client, "/test").withCacheSelector(CacheSelectors.maxDepth(1)); + cache = buildCacheWithListeners(builder); + cache.start(); + assertEvent(Type.NODE_ADDED, "/test"); + assertEvent(Type.NODE_ADDED, "/test/1", "one".getBytes()); + assertEvent(Type.NODE_ADDED, "/test/2", "two".getBytes()); + assertEvent(Type.NODE_ADDED, "/test/3", "three".getBytes()); + assertEvent(Type.INITIALIZED); + assertNoMoreEvents(); + + Assert.assertEquals(cache.childrenAtPath("/test").keySet(), ImmutableSet.of("1", "2", "3")); + Assert.assertEquals(cache.childrenAtPath("/test/1").keySet(), ImmutableSet.of()); + Assert.assertEquals(cache.childrenAtPath("/test/2").keySet(), ImmutableSet.of()); + Assert.assertNull(cache.get("/test/1/sub")); + Assert.assertNull(cache.get("/test/2/sub")); + Assert.assertNull(cache.get("/test/non_exist")); + } + + @Test + public void testDepth1Deeper() throws Exception + { + client.create().forPath("/test"); + client.create().forPath("/test/foo"); + client.create().forPath("/test/foo/bar"); + client.create().forPath("/test/foo/bar/1", "one".getBytes()); + client.create().forPath("/test/foo/bar/2", "two".getBytes()); + client.create().forPath("/test/foo/bar/3", "three".getBytes()); + client.create().forPath("/test/foo/bar/2/sub", "two-sub".getBytes()); + + CuratorCacheBuilder builder = CuratorCacheBuilder.builder(client, "/test/foo/bar").withCacheSelector(CacheSelectors.maxDepth(1)); + cache = buildCacheWithListeners(builder); + cache.start(); + assertEvent(Type.NODE_ADDED, "/test/foo/bar"); + assertEvent(Type.NODE_ADDED, "/test/foo/bar/1", "one".getBytes()); + assertEvent(Type.NODE_ADDED, "/test/foo/bar/2", "two".getBytes()); + assertEvent(Type.NODE_ADDED, "/test/foo/bar/3", "three".getBytes()); + assertEvent(Type.INITIALIZED); + assertNoMoreEvents(); + } + + @Test + public void testAsyncInitialPopulation() throws Exception + { + client.create().forPath("/test"); + client.create().forPath("/test/one", "hey there".getBytes()); + + cache = newCacheWithListeners(client, "/test"); + cache.start(); + assertEvent(Type.NODE_ADDED, "/test"); + assertEvent(Type.NODE_ADDED, "/test/one"); + assertEvent(Type.INITIALIZED); + assertNoMoreEvents(); + } + + @Test + public void testFromRoot() throws Exception + { + client.create().forPath("/test"); + client.create().forPath("/test/one", "hey there".getBytes()); + + cache = newCacheWithListeners(client, "/"); + cache.start(); + assertEvent(Type.NODE_ADDED, "/"); + assertEvent(Type.NODE_ADDED, "/test"); + assertEvent(Type.NODE_ADDED, "/test/one"); + assertEvent(Type.INITIALIZED); + assertNoMoreEvents(); + + Assert.assertTrue(cache.childrenAtPath("/").keySet().contains("test")); + Assert.assertEquals(cache.childrenAtPath("/test").keySet(), ImmutableSet.of("one")); + Assert.assertEquals(cache.childrenAtPath("/test/one").keySet(), ImmutableSet.of()); + Assert.assertEquals(new String(cache.get("/test/one").getData()), "hey there"); + } + + @Test + public void testFromRootWithDepth() throws Exception + { + client.create().forPath("/test"); + client.create().forPath("/test/one", "hey there".getBytes()); + + CuratorCacheBuilder builder = CuratorCacheBuilder.builder(client, "/").withCacheSelector(CacheSelectors.maxDepth(1)); + cache = buildCacheWithListeners(builder); + cache.start(); + assertEvent(Type.NODE_ADDED, "/"); + assertEvent(Type.NODE_ADDED, "/test"); + assertEvent(Type.INITIALIZED); + assertNoMoreEvents(); + + Assert.assertTrue(cache.childrenAtPath("/").keySet().contains("test")); + Assert.assertEquals(cache.childrenAtPath("/test").keySet(), ImmutableSet.of()); + Assert.assertNull(cache.get("/test/one")); + Assert.assertNull(cache.get("/test/one")); + } + + @Test + public void testWithNamespace() throws Exception + { + client.create().forPath("/outer"); + client.create().forPath("/outer/foo"); + client.create().forPath("/outer/test"); + client.create().forPath("/outer/test/one", "hey there".getBytes()); + + cache = newCacheWithListeners(client.usingNamespace("outer"), "/test"); + cache.start(); + assertEvent(Type.NODE_ADDED, "/test"); + assertEvent(Type.NODE_ADDED, "/test/one"); + assertEvent(Type.INITIALIZED); + assertNoMoreEvents(); + + Assert.assertEquals(cache.childrenAtPath("/test").keySet(), ImmutableSet.of("one")); + Assert.assertEquals(cache.childrenAtPath("/test/one").keySet(), ImmutableSet.of()); + Assert.assertEquals(new String(cache.get("/test/one").getData()), "hey there"); + } + + @Test + public void testWithNamespaceAtRoot() throws Exception + { + client.create().forPath("/outer"); + client.create().forPath("/outer/foo"); + client.create().forPath("/outer/test"); + client.create().forPath("/outer/test/one", "hey there".getBytes()); + + cache = newCacheWithListeners(client.usingNamespace("outer"), "/"); + cache.start(); + assertEvent(Type.NODE_ADDED, "/"); + assertEvent(Type.NODE_ADDED, "/foo"); + assertEvent(Type.NODE_ADDED, "/test"); + assertEvent(Type.NODE_ADDED, "/test/one"); + assertEvent(Type.INITIALIZED); + assertNoMoreEvents(); + Assert.assertEquals(cache.childrenAtPath("/").keySet(), ImmutableSet.of("foo", "test")); + Assert.assertEquals(cache.childrenAtPath("/foo").keySet(), ImmutableSet.of()); + Assert.assertEquals(cache.childrenAtPath("/test").keySet(), ImmutableSet.of("one")); + Assert.assertEquals(cache.childrenAtPath("/test/one").keySet(), ImmutableSet.of()); + Assert.assertEquals(new String(cache.get("/test/one").getData()), "hey there"); + } + + @Test + public void testSyncInitialPopulation() throws Exception + { + cache = newCacheWithListeners(client, "/test"); + cache.start(); + assertEvent(Type.INITIALIZED); + + client.create().forPath("/test"); + client.create().forPath("/test/one", "hey there".getBytes()); + assertEvent(Type.NODE_ADDED, "/test"); + assertEvent(Type.NODE_ADDED, "/test/one"); + assertNoMoreEvents(); + } + + @Test + public void testChildrenInitialized() throws Exception + { + client.create().forPath("/test", "".getBytes()); + client.create().forPath("/test/1", "1".getBytes()); + client.create().forPath("/test/2", "2".getBytes()); + client.create().forPath("/test/3", "3".getBytes()); + + cache = newCacheWithListeners(client, "/test"); + cache.start(); + assertEvent(Type.NODE_ADDED, "/test"); + assertEvent(Type.NODE_ADDED, "/test/1"); + assertEvent(Type.NODE_ADDED, "/test/2"); + assertEvent(Type.NODE_ADDED, "/test/3"); + assertEvent(Type.INITIALIZED); + assertNoMoreEvents(); + } + + @Test + public void testUpdateWhenNotCachingData() throws Exception + { + client.create().forPath("/test"); + + cache = buildCacheWithListeners(CuratorCacheBuilder.builder(client, "/test").withCacheSelector(CacheSelectors.statOnly())); + cache.start(); + assertEvent(Type.NODE_ADDED, "/test"); + assertEvent(Type.INITIALIZED); + + client.create().forPath("/test/foo", "first".getBytes()); + assertEvent(Type.NODE_ADDED, "/test/foo"); + + client.setData().forPath("/test/foo", "something new".getBytes()); + assertEvent(Type.NODE_UPDATED, "/test/foo"); + assertNoMoreEvents(); + + Assert.assertNotNull(cache.get("/test/foo")); + // No byte data querying the tree because we're not caching data. + Assert.assertEquals(cache.get("/test/foo").getData().length, 0); + } + + @Test + public void testDeleteThenCreate() throws Exception + { + client.create().forPath("/test"); + client.create().forPath("/test/foo", "one".getBytes()); + + cache = newCacheWithListeners(client, "/test"); + cache.start(); + assertEvent(Type.NODE_ADDED, "/test"); + assertEvent(Type.NODE_ADDED, "/test/foo"); + assertEvent(Type.INITIALIZED); + + client.delete().forPath("/test/foo"); + assertEvent(Type.NODE_REMOVED, "/test/foo", "one".getBytes()); + client.create().forPath("/test/foo", "two".getBytes()); + assertEvent(Type.NODE_ADDED, "/test/foo"); + + client.delete().forPath("/test/foo"); + assertEvent(Type.NODE_REMOVED, "/test/foo", "two".getBytes()); + client.create().forPath("/test/foo", "two".getBytes()); + assertEvent(Type.NODE_ADDED, "/test/foo"); + + assertNoMoreEvents(); + } + + @Test + public void testDeleteThenCreateRoot() throws Exception + { + client.create().forPath("/test"); + client.create().forPath("/test/foo", "one".getBytes()); + + cache = newCacheWithListeners(client, "/test/foo"); + cache.start(); + assertEvent(Type.NODE_ADDED, "/test/foo"); + assertEvent(Type.INITIALIZED); + + client.delete().forPath("/test/foo"); + assertEvent(Type.NODE_REMOVED, "/test/foo"); + client.create().forPath("/test/foo", "two".getBytes()); + assertEvent(Type.NODE_ADDED, "/test/foo"); + + client.delete().forPath("/test/foo"); + assertEvent(Type.NODE_REMOVED, "/test/foo"); + client.create().forPath("/test/foo", "two".getBytes()); + assertEvent(Type.NODE_ADDED, "/test/foo"); + + assertNoMoreEvents(); + } + + @Test + public void testKilledSession() throws Exception + { + client.create().forPath("/test"); + + cache = newCacheWithListeners(client, "/test"); + cache.start(); + assertEvent(Type.NODE_ADDED, "/test"); + assertEvent(Type.INITIALIZED); + + client.create().forPath("/test/foo", "foo".getBytes()); + assertEvent(Type.NODE_ADDED, "/test/foo"); + client.create().withMode(CreateMode.EPHEMERAL).forPath("/test/me", "data".getBytes()); + assertEvent(Type.NODE_ADDED, "/test/me"); + + KillSession2.kill(client.getZookeeperClient().getZooKeeper()); + assertEvent(Type.CONNECTION_LOST); + assertEvent(Type.CONNECTION_RECONNECTED); + assertEvent(Type.INITIALIZED); + assertEvent(Type.NODE_REMOVED, "/test/me", "data".getBytes()); + + assertNoMoreEvents(); + } + + @Test + public void testBasics() throws Exception + { + client.create().forPath("/test"); + + cache = newCacheWithListeners(client, "/test"); + cache.start(); + assertEvent(Type.NODE_ADDED, "/test"); + assertEvent(Type.INITIALIZED); + Assert.assertEquals(cache.childrenAtPath("/test").keySet(), ImmutableSet.of()); + Assert.assertNull(cache.get("/t")); + Assert.assertNull(cache.get("/testing")); + + client.create().forPath("/test/one", "hey there".getBytes()); + assertEvent(Type.NODE_ADDED, "/test/one"); + Assert.assertEquals(cache.childrenAtPath("/test").keySet(), ImmutableSet.of("one")); + Assert.assertEquals(new String(cache.get("/test/one").getData()), "hey there"); + Assert.assertEquals(cache.childrenAtPath("/test/one").keySet(), ImmutableSet.of()); + Assert.assertNull(cache.get("/test/o")); + Assert.assertNull(cache.get("/test/onely")); + + client.setData().forPath("/test/one", "sup!".getBytes()); + assertEvent(Type.NODE_UPDATED, "/test/one"); + Assert.assertEquals(cache.childrenAtPath("/test").keySet(), ImmutableSet.of("one")); + Assert.assertEquals(new String(cache.get("/test/one").getData()), "sup!"); + + client.delete().forPath("/test/one"); + assertEvent(Type.NODE_REMOVED, "/test/one", "sup!".getBytes()); + Assert.assertEquals(cache.childrenAtPath("/test").keySet(), ImmutableSet.of()); + + assertNoMoreEvents(); + } + + @Test + public void testDeleteNodeAfterCloseDoesntCallExecutor() throws Exception + { + client.create().forPath("/test"); + + cache = newCacheWithListeners(client, "/test"); + cache.start(); + assertEvent(Type.NODE_ADDED, "/test"); + assertEvent(Type.INITIALIZED); + + client.create().forPath("/test/one", "hey there".getBytes()); + assertEvent(Type.NODE_ADDED, "/test/one"); + Assert.assertEquals(new String(cache.get("/test/one").getData()), "hey there"); + + cache.close(); + assertNoMoreEvents(); + + client.delete().forPath("/test/one"); + assertNoMoreEvents(); + } + + /** + * Make sure TreeCache gets to a sane state when we can't initially connect to server. + */ + @Test + public void testServerNotStartedYet() throws Exception + { + // Stop the existing server. + server.stop(); + + // Shutdown the existing client and re-create it started. + client.close(); + initCuratorFramework(); + + // Start the client disconnected. + cache = newCacheWithListeners(client, "/test"); + cache.start(); + assertNoMoreEvents(); + + // Now restart the server. + server.restart(); + assertEvent(Type.INITIALIZED); + + client.create().forPath("/test"); + + assertEvent(Type.NODE_ADDED, "/test"); + assertNoMoreEvents(); + } +} http://git-wip-us.apache.org/repos/asf/curator/blob/a83e3e0b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestTreeCacheBridgeRandomTree.java ---------------------------------------------------------------------- diff --git a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestTreeCacheBridgeRandomTree.java b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestTreeCacheBridgeRandomTree.java new file mode 100644 index 0000000..f304c24 --- /dev/null +++ b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestTreeCacheBridgeRandomTree.java @@ -0,0 +1,224 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.curator.framework.recipes.cache; + +import com.google.common.collect.Iterables; +import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.framework.recipes.watch.CachedNode; +import org.apache.curator.framework.recipes.watch.CuratorCache; +import org.apache.curator.utils.ZKPaths; +import org.testng.Assert; +import org.testng.annotations.Test; +import java.util.Arrays; +import java.util.HashMap; +import java.util.Map; +import java.util.Random; + +public class TestTreeCacheBridgeRandomTree extends BaseTestTreeCache<CuratorCache> +{ + /** + * A randomly generated source-of-truth node for {@link #testGiantRandomDeepTree()} + */ + private static final class TestNode + { + String fullPath; + byte[] data; + Map<String, TestNode> children = new HashMap<String, TestNode>(); + + TestNode(String fullPath, byte[] data) + { + this.fullPath = fullPath; + this.data = data; + } + } + + // These constants will produce a tree about 10 levels deep. + private static final int ITERATIONS = 1000; + private static final double DIVE_CHANCE = 0.9; + private static final int TEST_DEPTH = 5; + + private final Random random = new Random(); + private boolean withDepth = false; + + /** + * Randomly construct a large tree of test data in memory, mirror it into ZK, and then use + * a TreeCache to follow the changes. At each step, assert that TreeCache matches our + * source-of-truth test data, and that we see exactly the set of events we expect to see. + */ + + @Test + public void testGiantRandomDeepTree() throws Exception { + client.create().forPath("/tree", null); + CuratorFramework cl = client.usingNamespace("tree"); + cache = newCacheWithListeners(cl, "/"); + cache.start(); + assertEvent(TreeCacheEvent.Type.NODE_ADDED, "/"); + assertEvent(TreeCacheEvent.Type.INITIALIZED); + + TestNode root = new TestNode("/", new byte[0]); + int maxDepth = 0; + int adds = 0; + int removals = 0; + int updates = 0; + + for ( int i = 0; i < ITERATIONS; ++i ) + { + // Select a node to update, randomly navigate down through the tree + int depth = 0; + TestNode last = null; + TestNode node = root; + while ( !node.children.isEmpty() && random.nextDouble() < DIVE_CHANCE ) + { + // Go down a level in the tree. Select a random child for the next iteration. + last = node; + node = Iterables.get(node.children.values(), random.nextInt(node.children.size())); + ++depth; + } + maxDepth = Math.max(depth, maxDepth); + + // Okay we found a node, let's do something interesting with it. + switch ( random.nextInt(3) ) + { + case 0: + // Try a removal if we have no children and we're not the root node. + if ( node != root && node.children.isEmpty() ) + { + // Delete myself from parent. + TestNode removed = last.children.remove(ZKPaths.getNodeFromPath(node.fullPath)); + Assert.assertSame(node, removed); + + // Delete from ZK + cl.delete().forPath(node.fullPath); + + // TreeCache should see the delete. + if (shouldSeeEventAt(node.fullPath)) + { + assertEvent(TreeCacheEvent.Type.NODE_REMOVED, node.fullPath); + } + ++removals; + } + break; + case 1: + // Do an update. + byte[] newData = new byte[10]; + random.nextBytes(newData); + + if ( Arrays.equals(node.data, newData) ) + { + // Randomly generated the same data! Very small chance, just skip. + continue; + } + + // Update source-of-truth. + node.data = newData; + + // Update in ZK. + cl.setData().forPath(node.fullPath, node.data); + + // TreeCache should see the update. + if (shouldSeeEventAt(node.fullPath)) + { + assertEvent(TreeCacheEvent.Type.NODE_UPDATED, node.fullPath, node.data); + } + + ++updates; + break; + case 2: + // Add a new child. + String name = Long.toHexString(random.nextLong()); + if ( node.children.containsKey(name) ) + { + // Randomly generated the same name! Very small chance, just skip. + continue; + } + + // Add a new child to our test tree. + byte[] data = new byte[10]; + random.nextBytes(data); + TestNode child = new TestNode(ZKPaths.makePath(node.fullPath, name), data); + node.children.put(name, child); + + // Add to ZK. + cl.create().forPath(child.fullPath, child.data); + + // TreeCache should see the add. + if (shouldSeeEventAt(child.fullPath)) + { + assertEvent(TreeCacheEvent.Type.NODE_ADDED, child.fullPath, child.data); + } + + ++adds; + break; + } + + // Each iteration, ensure the cached state matches our source-of-truth tree. + assertNodeEquals(ListenerBridge.toData("/", cache.get("/")), root); + assertTreeEquals(cache, root, 0); + } + + // Typical stats for this test: maxDepth: 10, adds: 349, removals: 198, updates: 320 + // We get more adds than removals because removals only happen if we're at a leaf. + System.out.println(String.format("maxDepth: %s, adds: %s, removals: %s, updates: %s", maxDepth, adds, removals, updates)); + assertNoMoreEvents(); + } + + /** + * Returns true we should see an event at this path based on maxDepth, false otherwise. + */ + private boolean shouldSeeEventAt(String fullPath) + { + return !withDepth || ZKPaths.split(fullPath).size() <= TEST_DEPTH; + } + + /** + * Recursively assert that current children equal expected children. + */ + private void assertTreeEquals(CuratorCache cache, TestNode expectedNode, int depth) + { + String path = expectedNode.fullPath; + Map<String, CachedNode> cacheChildren = cache.childrenAtPath(path); + Assert.assertNotNull(cacheChildren, path); + + if (withDepth && depth == TEST_DEPTH) { + return; + } + + Assert.assertEquals(cacheChildren.keySet(), expectedNode.children.keySet(), path); + + for ( Map.Entry<String, TestNode> entry : expectedNode.children.entrySet() ) + { + String nodeName = entry.getKey(); + CachedNode childData = cacheChildren.get(nodeName); + TestNode expectedChild = entry.getValue(); + assertNodeEquals(ListenerBridge.toData(expectedChild.fullPath, childData), expectedChild); + assertTreeEquals(cache, expectedChild, depth + 1); + } + } + + /** + * Assert that the given node data matches expected test node data. + */ + private static void assertNodeEquals(ChildData actualChild, TestNode expectedNode) + { + String path = expectedNode.fullPath; + Assert.assertNotNull(actualChild, path); + Assert.assertEquals(actualChild.getData(), expectedNode.data, path); + } +} http://git-wip-us.apache.org/repos/asf/curator/blob/a83e3e0b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestTreeCacheRandomTree.java ---------------------------------------------------------------------- diff --git a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestTreeCacheRandomTree.java b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestTreeCacheRandomTree.java index 96ce75c..1a9e366 100644 --- a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestTreeCacheRandomTree.java +++ b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestTreeCacheRandomTree.java @@ -29,7 +29,7 @@ import java.util.HashMap; import java.util.Map; import java.util.Random; -public class TestTreeCacheRandomTree extends BaseTestTreeCache +public class TestTreeCacheRandomTree extends BaseTestTreeCache<TreeCache> { /** * A randomly generated source-of-truth node for {@link #testGiantRandomDeepTree()} http://git-wip-us.apache.org/repos/asf/curator/blob/a83e3e0b/curator-test/src/main/java/org/apache/curator/test/WatchersDebug.java ---------------------------------------------------------------------- diff --git a/curator-test/src/main/java/org/apache/curator/test/WatchersDebug.java b/curator-test/src/main/java/org/apache/curator/test/WatchersDebug.java index e4c3b7e..e884b8c 100644 --- a/curator-test/src/main/java/org/apache/curator/test/WatchersDebug.java +++ b/curator-test/src/main/java/org/apache/curator/test/WatchersDebug.java @@ -27,16 +27,19 @@ public class WatchersDebug private static final Method getDataWatches; private static final Method getExistWatches; private static final Method getChildWatches; + private static final Method getPersistentWatches; static { Method localGetDataWatches = null; Method localGetExistWatches = null; Method localGetChildWatches = null; + Method localGetPersistentWatches = null; try { localGetDataWatches = getMethod("getDataWatches"); localGetExistWatches = getMethod("getExistWatches"); localGetChildWatches = getMethod("getChildWatches"); + localGetPersistentWatches = getMethod("getPersistentWatches"); } catch ( NoSuchMethodException e ) { @@ -45,6 +48,7 @@ public class WatchersDebug getDataWatches = localGetDataWatches; getExistWatches = localGetExistWatches; getChildWatches = localGetChildWatches; + getPersistentWatches = localGetPersistentWatches; } public static List<String> getDataWatches(ZooKeeper zooKeeper) @@ -62,6 +66,11 @@ public class WatchersDebug return callMethod(zooKeeper, getChildWatches); } + public static List<String> getPersistentWatches(ZooKeeper zooKeeper) + { + return callMethod(zooKeeper, getPersistentWatches); + } + private WatchersDebug() { }
