Abstracted the TreeCache public API and then an alternate implementation that uses the new CuratorCache instead of TreeCache. This should make porting older code much easier
Project: http://git-wip-us.apache.org/repos/asf/curator/repo Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/02073a71 Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/02073a71 Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/02073a71 Branch: refs/heads/persistent-watch Commit: 02073a71a3a165babba9a7db84449ef9e7439a19 Parents: 570023d Author: randgalt <[email protected]> Authored: Sun Aug 27 19:03:28 2017 +0200 Committer: randgalt <[email protected]> Committed: Sun Aug 27 19:03:28 2017 +0200 ---------------------------------------------------------------------- .../framework/recipes/cache/ListenerBridge.java | 26 +- .../framework/recipes/cache/TreeCache.java | 88 ++-- .../recipes/cache/TreeCacheBridge.java | 49 ++ .../recipes/cache/TreeCacheBridgeImpl.java | 78 +++ .../framework/recipes/watch/CacheSelectors.java | 1 + .../framework/recipes/watch/CachedNode.java | 104 +--- .../framework/recipes/watch/CachedNodeImpl.java | 90 ++++ .../framework/recipes/watch/CachedNodeMap.java | 2 +- .../framework/recipes/watch/CuratorCache.java | 4 +- .../recipes/watch/CuratorCacheBase.java | 10 +- .../recipes/watch/CuratorCacheBuilder.java | 20 +- .../recipes/watch/InternalCuratorCache.java | 23 +- .../recipes/watch/InternalNodeCache.java | 10 +- .../recipes/cache/BaseTestTreeCache.java | 20 + .../recipes/cache/TestTreeCacheBridge.java | 199 +++++--- .../cache/TestTreeCacheBridgeRandomTree.java | 42 +- .../cache/TestTreeCacheBridgeWrapper.java | 500 +++++++++++++++++++ .../TestTreeCacheBridgeWrapperRandomTree.java | 224 +++++++++ 18 files changed, 1242 insertions(+), 248 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/curator/blob/02073a71/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/ListenerBridge.java ---------------------------------------------------------------------- diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/ListenerBridge.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/ListenerBridge.java index 29e7b6c..693e1da 100644 --- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/ListenerBridge.java +++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/ListenerBridge.java @@ -26,8 +26,12 @@ import org.apache.curator.framework.recipes.watch.CacheListener; import org.apache.curator.framework.recipes.watch.CachedNode; import org.apache.curator.framework.state.ConnectionState; import org.apache.curator.framework.state.ConnectionStateListener; +import org.apache.curator.utils.ZKPaths; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; import java.util.Objects; import java.util.concurrent.atomic.AtomicBoolean; @@ -186,7 +190,7 @@ public class ListenerBridge implements CacheListener, ConnectionStateListener */ public static ChildData toData(String path, CachedNode affectedNode) { - if ( (path != null) && (affectedNode != null) && (affectedNode.getData() != null) ) + if ( (path != null) && (affectedNode != null) ) { return new ChildData(path, affectedNode.getStat(), affectedNode.getData()); } @@ -208,6 +212,26 @@ public class ListenerBridge implements CacheListener, ConnectionStateListener return new TreeCacheEvent(type, data); } + public static Map<String, ChildData> toData(String basePath, Map<String, CachedNode> from) + { + if ( from.isEmpty() ) + { + return Collections.emptyMap(); + } + + Map<String, ChildData> mapped = new HashMap<>(); + for ( Map.Entry<String, CachedNode> entry : from.entrySet() ) + { + String path = entry.getKey(); + ChildData childData = toData(ZKPaths.makePath(basePath, path), entry.getValue()); + if ( childData != null ) + { + mapped.put(path, childData); + } + } + return mapped; + } + protected void handleException(Exception e) { log.error("Unhandled exception in listener", e); http://git-wip-us.apache.org/repos/asf/curator/blob/02073a71/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/TreeCache.java ---------------------------------------------------------------------- diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/TreeCache.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/TreeCache.java index 94e6774..4e663cd 100644 --- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/TreeCache.java +++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/TreeCache.java @@ -31,7 +31,11 @@ import org.apache.curator.framework.api.CuratorEvent; import org.apache.curator.framework.api.UnhandledErrorListener; import org.apache.curator.framework.listen.Listenable; import org.apache.curator.framework.listen.ListenerContainer; +import org.apache.curator.framework.recipes.watch.CacheAction; +import org.apache.curator.framework.recipes.watch.CacheSelector; +import org.apache.curator.framework.recipes.watch.CacheSelectors; import org.apache.curator.framework.recipes.watch.CuratorCache; +import org.apache.curator.framework.recipes.watch.CuratorCacheBuilder; import org.apache.curator.framework.state.ConnectionState; import org.apache.curator.framework.state.ConnectionStateListener; import org.apache.curator.utils.PathUtils; @@ -43,7 +47,6 @@ import org.apache.zookeeper.Watcher; import org.apache.zookeeper.data.Stat; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.Closeable; import java.util.ArrayList; import java.util.Collections; import java.util.LinkedList; @@ -73,7 +76,7 @@ import static org.apache.curator.utils.PathUtils.validatePath; * @deprecated use {@link CuratorCache} */ @Deprecated -public class TreeCache implements Closeable +public class TreeCache implements TreeCacheBridge { private static final Logger LOG = LoggerFactory.getLogger(TreeCache.class); private final boolean createParentNodes; @@ -109,6 +112,53 @@ public class TreeCache implements Closeable return new TreeCache(client, path, cacheData, dataIsCompressed, maxDepth, executor, createParentNodes, selector); } + public TreeCacheBridge buildBridge() + { + CuratorCacheBuilder builder = CuratorCacheBuilder.builder(client, path); + CacheAction cacheAction; + if ( cacheData ) + { + cacheAction = dataIsCompressed ? CacheAction.STAT_AND_UNCOMPRESSED_DATA : CacheAction.STAT_AND_DATA; + } + else + { + cacheAction = dataIsCompressed ? CacheAction.UNCOMPRESSED_STAT_ONLY : CacheAction.STAT_ONLY; + } + final CacheSelector maxDepthSelector = (maxDepth != Integer.MAX_VALUE) ? CacheSelectors.maxDepth(maxDepth) : null; + final CacheAction sealedCacheAction = cacheAction; + CacheSelector cacheSelector = new CacheSelector() + { + @Override + public boolean traverseChildren(String basePath, String fullPath) + { + //noinspection SimplifiableIfStatement + if ( (maxDepthSelector != null) && !maxDepthSelector.traverseChildren(basePath, fullPath) ) + { + return false; + } + return selector.traverseChildren(fullPath); + } + + @Override + public CacheAction actionForPath(String basePath, String fullPath) + { + if ( (maxDepthSelector != null) && !maxDepthSelector.traverseChildren(basePath, fullPath) ) + { + return CacheAction.NOT_STORED; + } + return selector.acceptChild(fullPath) ? sealedCacheAction : CacheAction.NOT_STORED; + } + }; + builder = builder.withCacheSelector(cacheSelector).withDefaultData(null); + + if ( createParentNodes ) + { + LOG.warn("setCreateParentNodes(true) is not supported by TreeCacheBridge. Use EnsureContainers or MigrationManager instead"); + } + + return new TreeCacheBridgeImpl(client, builder.build()); + } + /** * Sets whether or not to cache byte data per node; default {@code true}. */ @@ -579,12 +629,7 @@ public class TreeCache implements Closeable this.executorService = Preconditions.checkNotNull(executorService, "executorService cannot be null"); } - /** - * Start the cache. The cache is not started automatically. You must call this method. - * - * @return this - * @throws Exception errors - */ + @Override public TreeCache start() throws Exception { Preconditions.checkState(treeState.compareAndSet(TreeState.LATENT, TreeState.STARTED), "already started"); @@ -600,9 +645,6 @@ public class TreeCache implements Closeable return this; } - /** - * Close/end the cache. - */ @Override public void close() { @@ -624,11 +666,7 @@ public class TreeCache implements Closeable } } - /** - * Return the cache listenable - * - * @return listenable - */ + @Override public Listenable<TreeCacheListener> getListenable() { return listeners; @@ -680,14 +718,7 @@ public class TreeCache implements Closeable return current; } - /** - * Return the current set of children at the given path, mapped by child name. There are no - * guarantees of accuracy; this is merely the most recent view of the data. If there is no - * node at this path, {@code null} is returned. - * - * @param fullPath full path to the node to check - * @return a possibly-empty list of children if the node is alive, or null - */ + @Override public Map<String, ChildData> getCurrentChildren(String fullPath) { TreeNode node = find(fullPath); @@ -721,14 +752,7 @@ public class TreeCache implements Closeable return node.nodeState == NodeState.LIVE ? result : null; } - /** - * Return the current data for the given path. There are no guarantees of accuracy. This is - * merely the most recent view of the data. If there is no node at the given path, - * {@code null} is returned. - * - * @param fullPath full path to the node to check - * @return data if the node is alive, or null - */ + @Override public ChildData getCurrentData(String fullPath) { TreeNode node = find(fullPath); http://git-wip-us.apache.org/repos/asf/curator/blob/02073a71/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/TreeCacheBridge.java ---------------------------------------------------------------------- diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/TreeCacheBridge.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/TreeCacheBridge.java new file mode 100644 index 0000000..9c383af --- /dev/null +++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/TreeCacheBridge.java @@ -0,0 +1,49 @@ +package org.apache.curator.framework.recipes.cache; + +import org.apache.curator.framework.listen.Listenable; +import java.io.Closeable; +import java.util.Map; + +public interface TreeCacheBridge extends Closeable +{ + /** + * Start the cache. The cache is not started automatically. You must call this method. + * + * @return this + * @throws Exception errors + */ + TreeCacheBridge start() throws Exception; + + /** + * Close/end the cache. + */ + @Override + void close(); + + /** + * Return the cache listenable + * + * @return listenable + */ + Listenable<TreeCacheListener> getListenable(); + + /** + * Return the current set of children at the given path, mapped by child name. There are no + * guarantees of accuracy; this is merely the most recent view of the data. If there is no + * node at this path, {@code null} is returned. + * + * @param fullPath full path to the node to check + * @return a possibly-empty list of children if the node is alive, or null + */ + Map<String, ChildData> getCurrentChildren(String fullPath); + + /** + * Return the current data for the given path. There are no guarantees of accuracy. This is + * merely the most recent view of the data. If there is no node at the given path, + * {@code null} is returned. + * + * @param fullPath full path to the node to check + * @return data if the node is alive, or null + */ + ChildData getCurrentData(String fullPath); +} http://git-wip-us.apache.org/repos/asf/curator/blob/02073a71/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/TreeCacheBridgeImpl.java ---------------------------------------------------------------------- diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/TreeCacheBridgeImpl.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/TreeCacheBridgeImpl.java new file mode 100644 index 0000000..0198aa4 --- /dev/null +++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/TreeCacheBridgeImpl.java @@ -0,0 +1,78 @@ +package org.apache.curator.framework.recipes.cache; + +import com.google.common.util.concurrent.MoreExecutors; +import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.framework.listen.Listenable; +import org.apache.curator.framework.recipes.watch.CuratorCache; +import java.util.Map; +import java.util.Objects; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.Executor; + +public class TreeCacheBridgeImpl implements TreeCacheBridge, Listenable<TreeCacheListener> +{ + private final CuratorFramework client; + private final CuratorCache cache; + private final Map<TreeCacheListener, ListenerBridge> listenerMap = new ConcurrentHashMap<>(); + + public TreeCacheBridgeImpl(CuratorFramework client, CuratorCache cache) + { + this.client = Objects.requireNonNull(client, "client cannot be null"); + this.cache = Objects.requireNonNull(cache, "cache cannot be null"); + } + + @Override + public TreeCacheBridge start() + { + cache.start(); + return this; + } + + @Override + public void close() + { + cache.close(); + } + + @Override + public Listenable<TreeCacheListener> getListenable() + { + return this; + } + + @Override + public Map<String, ChildData> getCurrentChildren(String fullPath) + { + return ListenerBridge.toData(fullPath, cache.childrenAtPath(fullPath)); + } + + @Override + public ChildData getCurrentData(String fullPath) + { + return ListenerBridge.toData(fullPath, cache.get(fullPath)); + } + + @Override + public void addListener(TreeCacheListener listener) + { + addListener(listener, MoreExecutors.directExecutor()); + } + + @Override + public void addListener(TreeCacheListener listener, Executor executor) + { + ListenerBridge listenerBridge = ListenerBridge.wrap(client, cache.getListenable(), listener); + listenerBridge.add(); + listenerMap.put(listener, listenerBridge); + } + + @Override + public void removeListener(TreeCacheListener listener) + { + ListenerBridge listenerBridge = listenerMap.remove(listener); + if ( listenerBridge != null ) + { + listenerBridge.remove(); + } + } +} http://git-wip-us.apache.org/repos/asf/curator/blob/02073a71/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/CacheSelectors.java ---------------------------------------------------------------------- diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/CacheSelectors.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/CacheSelectors.java index eaf1145..8814e57 100644 --- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/CacheSelectors.java +++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/CacheSelectors.java @@ -23,6 +23,7 @@ import org.apache.curator.utils.ZKPaths; import org.apache.zookeeper.common.PathUtils; import org.apache.zookeeper.server.PathIterator; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collections; import java.util.Comparator; import java.util.List; http://git-wip-us.apache.org/repos/asf/curator/blob/02073a71/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/CachedNode.java ---------------------------------------------------------------------- diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/CachedNode.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/CachedNode.java index f3cd18a..b07993f 100644 --- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/CachedNode.java +++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/CachedNode.java @@ -1,108 +1,10 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ package org.apache.curator.framework.recipes.watch; import org.apache.zookeeper.data.Stat; -import java.util.Arrays; -import java.util.Objects; -/** - * Represents the data for a cached node - */ -public class CachedNode +public interface CachedNode { - private final Stat stat; - private final byte[] data; + Stat getStat(); - private static final byte[] defaultData = new byte[0]; - - /** - * Creates an empty node - */ - public CachedNode() - { - this(new Stat(), defaultData); - } - - /** - * A node with a stat but empty data - * - * @param stat the stat - */ - public CachedNode(Stat stat) - { - this(stat, defaultData); - } - - /** - * @param stat the stat - * @param data uncompressed data. If <code>null</code> an empty array is substituted. - */ - public CachedNode(Stat stat, byte[] data) - { - this.stat = Objects.requireNonNull(stat, "stat cannot be null"); - this.data = (data != null) ? data : defaultData; - } - - public Stat getStat() - { - return stat; - } - - public byte[] getData() - { - return data; - } - - @Override - public boolean equals(Object o) - { - if ( this == o ) - { - return true; - } - if ( o == null || getClass() != o.getClass() ) - { - return false; - } - - CachedNode that = (CachedNode)o; - - //noinspection SimplifiableIfStatement - if ( !stat.equals(that.stat) ) - { - return false; - } - return Arrays.equals(data, that.data); - } - - @Override - public int hashCode() - { - int result = stat.hashCode(); - result = 31 * result + Arrays.hashCode(data); - return result; - } - - @Override - public String toString() - { - return "CachedNode{" + "stat=" + stat + ", data=" + Arrays.toString(data) + '}'; - } + byte[] getData(); } http://git-wip-us.apache.org/repos/asf/curator/blob/02073a71/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/CachedNodeImpl.java ---------------------------------------------------------------------- diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/CachedNodeImpl.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/CachedNodeImpl.java new file mode 100644 index 0000000..91b6a5e --- /dev/null +++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/CachedNodeImpl.java @@ -0,0 +1,90 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.curator.framework.recipes.watch; + +import org.apache.zookeeper.data.Stat; +import java.util.Arrays; +import java.util.Objects; + +/** + * Represents the data for a cached node + */ +public class CachedNodeImpl implements CachedNode +{ + private final Stat stat; + private final byte[] data; + + /** + * @param stat the stat + * @param data uncompressed data or null - NOTE: ownership is taken of the given data object + */ + public CachedNodeImpl(Stat stat, byte[] data) + { + this.stat = Objects.requireNonNull(stat, "stat cannot be null"); + this.data = data; + } + + @Override + public Stat getStat() + { + return stat; + } + + @Override + public byte[] getData() + { + return data; + } + + @Override + public boolean equals(Object o) + { + if ( this == o ) + { + return true; + } + if ( o == null || getClass() != o.getClass() ) + { + return false; + } + + CachedNodeImpl that = (CachedNodeImpl)o; + + //noinspection SimplifiableIfStatement + if ( !stat.equals(that.stat) ) + { + return false; + } + return Arrays.equals(data, that.data); + } + + @Override + public int hashCode() + { + int result = stat.hashCode(); + result = 31 * result + Arrays.hashCode(data); + return result; + } + + @Override + public String toString() + { + return "CachedNodeImpl{" + "stat=" + stat + ", data=" + Arrays.toString(data) + '}'; + } +} http://git-wip-us.apache.org/repos/asf/curator/blob/02073a71/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/CachedNodeMap.java ---------------------------------------------------------------------- diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/CachedNodeMap.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/CachedNodeMap.java index d59ffae..adcd458 100644 --- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/CachedNodeMap.java +++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/CachedNodeMap.java @@ -22,7 +22,7 @@ import java.util.Collection; import java.util.Map; /** - * Interface for a container of path to {@link org.apache.curator.framework.recipes.watch.CachedNode} + * Interface for a container of path to {@link CachedNodeImpl} */ public interface CachedNodeMap { http://git-wip-us.apache.org/repos/asf/curator/blob/02073a71/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/CuratorCache.java ---------------------------------------------------------------------- diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/CuratorCache.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/CuratorCache.java index e97157e..2742000 100644 --- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/CuratorCache.java +++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/CuratorCache.java @@ -150,7 +150,7 @@ public interface CuratorCache extends Closeable /** * As a memory optimization, you can clear the cached data bytes for a node. Subsequent - * calls to {@link CachedNode#getData()} for this node will return <code>null</code>. + * calls to {@link CachedNodeImpl#getData()} for this node will return <code>null</code>. * * @param path the path of the node to clear */ @@ -158,7 +158,7 @@ public interface CuratorCache extends Closeable /** * As a memory optimization, you can clear the cached data bytes for a node. Subsequent - * calls to {@link CachedNode#getData()} for this node will return <code>null</code>. + * calls to {@link CachedNodeImpl#getData()} for this node will return <code>null</code>. * * @param path the path of the node to clear * @param ifVersion if non-negative, only clear the data if the data's version matches this version http://git-wip-us.apache.org/repos/asf/curator/blob/02073a71/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/CuratorCacheBase.java ---------------------------------------------------------------------- diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/CuratorCacheBase.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/CuratorCacheBase.java index 1aa5b5d..c072f7c 100644 --- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/CuratorCacheBase.java +++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/CuratorCacheBase.java @@ -35,6 +35,7 @@ import java.util.concurrent.atomic.AtomicReference; abstract class CuratorCacheBase implements CuratorCache { protected final CachedNodeMap cache; + protected final byte[] defaultData; private final String mainPath; private final ListenerContainer<CacheListener> listeners = new ListenerContainer<>(); private final AtomicReference<State> state = new AtomicReference<>(State.LATENT); @@ -54,11 +55,12 @@ abstract class CuratorCacheBase implements CuratorCache CLOSED } - protected CuratorCacheBase(String mainPath, CachedNodeMap cache, boolean sendRefreshEvents) + protected CuratorCacheBase(String mainPath, CachedNodeMap cache, boolean sendRefreshEvents, byte[] defaultData) { this.mainPath = Objects.requireNonNull(mainPath, "mainPath cannot be null"); this.cache = Objects.requireNonNull(cache, "cache cannot be null"); this.sendRefreshEvents = sendRefreshEvents; + this.defaultData = defaultData; } @Override @@ -140,7 +142,7 @@ abstract class CuratorCacheBase implements CuratorCache /** * As a memory optimization, you can clear the cached data bytes for a node. Subsequent - * calls to {@link CachedNode#getData()} for this node will return <code>null</code>. + * calls to {@link CachedNodeImpl#getData()} for this node will return <code>null</code>. * * @param path the path of the node to clear */ @@ -152,7 +154,7 @@ abstract class CuratorCacheBase implements CuratorCache /** * As a memory optimization, you can clear the cached data bytes for a node. Subsequent - * calls to {@link CachedNode#getData()} for this node will return <code>null</code>. + * calls to {@link CachedNodeImpl#getData()} for this node will return <code>null</code>. * * @param path the path of the node to clear * @param ifVersion if non-negative, only clear the data if the data's version matches this version @@ -166,7 +168,7 @@ abstract class CuratorCacheBase implements CuratorCache { if ( (ifVersion < 0) || ((data.getStat() != null) && (ifVersion == data.getStat().getVersion())) ) { - return cache.replace(path, data, new CachedNode(data.getStat())); + return cache.replace(path, data, new CachedNodeImpl(data.getStat(), defaultData)); } } return false; http://git-wip-us.apache.org/repos/asf/curator/blob/02073a71/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/CuratorCacheBuilder.java ---------------------------------------------------------------------- diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/CuratorCacheBuilder.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/CuratorCacheBuilder.java index 10f8bc2..eb8d0df 100644 --- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/CuratorCacheBuilder.java +++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/CuratorCacheBuilder.java @@ -20,6 +20,7 @@ package org.apache.curator.framework.recipes.watch; import com.google.common.base.Preconditions; import org.apache.curator.framework.CuratorFramework; +import java.util.Arrays; import java.util.Objects; import java.util.concurrent.TimeUnit; @@ -40,6 +41,7 @@ public class CuratorCacheBuilder private boolean usingSoftValues = false; private Long expiresAfterWriteMs = null; private Long expiresAfterAccessMs = null; + private byte[] defaultData = new byte[0]; /** * Start a new builder for the given client and main path @@ -66,10 +68,10 @@ public class CuratorCacheBuilder { Preconditions.checkState(cacheSelector == null, "Single node mode does not support CacheSelectors"); Preconditions.checkState(singleNodeCacheAction != CacheAction.UNCOMPRESSED_STAT_ONLY, "Single node mode does not support UNCOMPRESSED_STAT_ONLY"); - return new InternalNodeCache(client, path, singleNodeCacheAction, cachedNodeMap, sendRefreshEvents, refreshOnStart); + return new InternalNodeCache(client, path, singleNodeCacheAction, cachedNodeMap, sendRefreshEvents, refreshOnStart, defaultData); } - return new InternalCuratorCache(client, path, cacheSelector, cachedNodeMap, sendRefreshEvents, refreshOnStart, sortChildren); + return new InternalCuratorCache(client, path, cacheSelector, cachedNodeMap, sendRefreshEvents, refreshOnStart, sortChildren, defaultData); } /** @@ -215,6 +217,20 @@ public class CuratorCacheBuilder return this; } + /** + * If a node does not contain data (due to the CacheSelector or other reason), the default is + * to set {@link CachedNodeImpl#getData()} to an empty byte array (<code>byte[0]</code>). Use this + * method to change to another value or pass <code>null</code> + * + * @param data data bytes or null + * @return this + */ + public CuratorCacheBuilder withDefaultData(byte[] data) + { + this.defaultData = (data != null) ? Arrays.copyOf(data, data.length) : null; + return this; + } + private CuratorCacheBuilder(CuratorFramework client, String path) { this.client = Objects.requireNonNull(client, "client cannot be null"); http://git-wip-us.apache.org/repos/asf/curator/blob/02073a71/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/InternalCuratorCache.java ---------------------------------------------------------------------- diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/InternalCuratorCache.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/InternalCuratorCache.java index edd08b5..4b88bf4 100644 --- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/InternalCuratorCache.java +++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/InternalCuratorCache.java @@ -20,7 +20,6 @@ package org.apache.curator.framework.recipes.watch; import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.Sets; -import com.google.common.util.concurrent.SettableFuture; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.api.BackgroundCallback; import org.apache.curator.framework.api.CuratorEvent; @@ -30,6 +29,7 @@ import org.apache.curator.utils.ZKPaths; import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.WatchedEvent; import org.apache.zookeeper.Watcher; +import org.apache.zookeeper.data.Stat; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.Collection; @@ -38,11 +38,10 @@ import java.util.List; import java.util.Objects; import java.util.concurrent.CountDownLatch; import java.util.concurrent.Exchanger; -import java.util.concurrent.atomic.AtomicInteger; class InternalCuratorCache extends CuratorCacheBase implements Watcher { - private static final CachedNode nullNode = new CachedNode(); + private final CachedNode nullNode; private final Logger log = LoggerFactory.getLogger(getClass()); private final PersistentWatcher watcher; private final CuratorFramework client; @@ -64,9 +63,9 @@ class InternalCuratorCache extends CuratorCacheBase implements Watcher } }; - InternalCuratorCache(CuratorFramework client, String path, final CacheSelector cacheSelector, CachedNodeMap cache, boolean sendRefreshEvents, final boolean refreshOnStart, boolean sortChildren) + InternalCuratorCache(CuratorFramework client, String path, final CacheSelector cacheSelector, CachedNodeMap cache, boolean sendRefreshEvents, final boolean refreshOnStart, boolean sortChildren, byte[] defaultData) { - super(path, cache, sendRefreshEvents); + super(path, cache, sendRefreshEvents, defaultData); this.client = Objects.requireNonNull(client, "client cannot be null"); this.basePath = Objects.requireNonNull(path, "path cannot be null"); this.cacheSelector = Objects.requireNonNull(cacheSelector, "cacheSelector cannot be null"); @@ -84,6 +83,8 @@ class InternalCuratorCache extends CuratorCacheBase implements Watcher } }; watcher.getListenable().addListener(this); + + nullNode = new CachedNodeImpl(new Stat(), defaultData); } @Override @@ -163,7 +164,7 @@ class InternalCuratorCache extends CuratorCacheBase implements Watcher if ( event.getType() == CuratorEventType.GET_DATA ) { CacheAction cacheAction = (CacheAction)event.getContext(); - CachedNode newNode = new CachedNode(event.getStat(), event.getData()); + CachedNode newNode = new CachedNodeImpl(event.getStat(), event.getData()); CachedNode oldNode = putNewNode(path, cacheAction, newNode); if ( oldNode == null ) { @@ -292,7 +293,7 @@ class InternalCuratorCache extends CuratorCacheBase implements Watcher case STAT_ONLY: case UNCOMPRESSED_STAT_ONLY: { - putNode = new CachedNode(newNode.getStat()); + putNode = new CachedNodeImpl(newNode.getStat(), defaultData); break; } @@ -306,14 +307,6 @@ class InternalCuratorCache extends CuratorCacheBase implements Watcher return cache.put(path, putNode); } - private void decrementOutstanding(SettableFuture<Boolean> task, AtomicInteger outstandingCount) - { - if ( outstandingCount.decrementAndGet() <= 0 ) - { - task.set(true); - } - } - private void remove(String path) { CachedNode removed = cache.remove(path); http://git-wip-us.apache.org/repos/asf/curator/blob/02073a71/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/InternalNodeCache.java ---------------------------------------------------------------------- diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/InternalNodeCache.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/InternalNodeCache.java index dd880f6..5f8d750 100644 --- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/InternalNodeCache.java +++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/InternalNodeCache.java @@ -31,6 +31,7 @@ import org.apache.curator.utils.ThreadUtils; import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.WatchedEvent; import org.apache.zookeeper.Watcher; +import org.apache.zookeeper.data.Stat; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.concurrent.CountDownLatch; @@ -44,7 +45,7 @@ class InternalNodeCache extends CuratorCacheBase private final String path; private final CacheAction cacheAction; private final AtomicBoolean isConnected = new AtomicBoolean(true); - private static final CachedNode nullNode = new CachedNode(); + private final CachedNode nullNode; private final ConnectionStateListener connectionStateListener = new ConnectionStateListener() { @Override @@ -90,12 +91,13 @@ class InternalNodeCache extends CuratorCacheBase } }; - InternalNodeCache(CuratorFramework client, String path, CacheAction cacheAction, CachedNodeMap cache, boolean sendRefreshEvents, boolean refreshOnStart) + InternalNodeCache(CuratorFramework client, String path, CacheAction cacheAction, CachedNodeMap cache, boolean sendRefreshEvents, boolean refreshOnStart, byte[] defaultData) { - super(path, cache, sendRefreshEvents); + super(path, cache, sendRefreshEvents, defaultData); this.client = client.newWatcherRemoveCuratorFramework(); this.path = PathUtils.validatePath(path); this.cacheAction = cacheAction; + nullNode = new CachedNodeImpl(new Stat(), defaultData); Preconditions.checkArgument(refreshOnStart, "refreshingWhenStarted() must be true when forSingleNode() is used"); } @@ -156,7 +158,7 @@ class InternalNodeCache extends CuratorCacheBase { if ( event.getResultCode() == KeeperException.Code.OK.intValue() ) { - CachedNode cachedNode = new CachedNode(event.getStat(), event.getData()); + CachedNode cachedNode = new CachedNodeImpl(event.getStat(), event.getData()); setNewData(cachedNode); } break; http://git-wip-us.apache.org/repos/asf/curator/blob/02073a71/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/BaseTestTreeCache.java ---------------------------------------------------------------------- diff --git a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/BaseTestTreeCache.java b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/BaseTestTreeCache.java index 9cbec98..7f1f547 100644 --- a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/BaseTestTreeCache.java +++ b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/BaseTestTreeCache.java @@ -88,6 +88,16 @@ public class BaseTestTreeCache<T extends Closeable> extends BaseClassForTests } /** + * Construct a TreeCache that records exceptions and automatically listens. + */ + protected TreeCacheBridge newTreeCacheBridgeWithListeners(CuratorFramework client, String path) + { + TreeCacheBridge result = TreeCache.newBuilder(client, path).buildBridge(); + result.getListenable().addListener(eventListener); + return result; + } + + /** * Construct a CuratorCache that records exceptions and automatically listens using the bridge. */ protected CuratorCache newCacheWithListeners(CuratorFramework client, String path) @@ -118,6 +128,16 @@ public class BaseTestTreeCache<T extends Closeable> extends BaseClassForTests return result; } + /** + * Finish constructing a TreeCache that records exceptions and automatically listens. + */ + protected TreeCacheBridge buildBridgeWithListeners(TreeCache.Builder builder) + { + TreeCacheBridge result = builder.buildBridge(); + result.getListenable().addListener(eventListener); + return result; + } + @Override @BeforeMethod public void setup() throws Exception http://git-wip-us.apache.org/repos/asf/curator/blob/02073a71/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestTreeCacheBridge.java ---------------------------------------------------------------------- diff --git a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestTreeCacheBridge.java b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestTreeCacheBridge.java index 049daa5..614fa56 100644 --- a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestTreeCacheBridge.java +++ b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestTreeCacheBridge.java @@ -20,16 +20,17 @@ package org.apache.curator.framework.recipes.cache; import com.google.common.collect.ImmutableSet; +import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.recipes.cache.TreeCacheEvent.Type; -import org.apache.curator.framework.recipes.watch.CacheSelectors; -import org.apache.curator.framework.recipes.watch.CuratorCache; -import org.apache.curator.framework.recipes.watch.CuratorCacheBuilder; import org.apache.curator.test.compatibility.KillSession2; +import org.apache.curator.utils.CloseableUtils; import org.apache.zookeeper.CreateMode; import org.testng.Assert; import org.testng.annotations.Test; +import java.util.concurrent.Semaphore; -public class TestTreeCacheBridge extends BaseTestTreeCache<CuratorCache> +@SuppressWarnings("deprecation") +public class TestTreeCacheBridge extends BaseTestTreeCache<TreeCacheBridge> { @Test public void testSelector() throws Exception @@ -57,7 +58,7 @@ public class TestTreeCacheBridge extends BaseTestTreeCache<CuratorCache> return !fullPath.equals("/root/n1-c"); } }; - cache = buildCacheWithListeners(CuratorCacheBuilder.builder(client, "/root").withCacheSelector(SelectorBridge.wrap(selector))); + cache = buildBridgeWithListeners(TreeCache.newBuilder(client, "/root").setSelector(selector)); cache.start(); assertEvent(Type.NODE_ADDED, "/root"); @@ -79,7 +80,7 @@ public class TestTreeCacheBridge extends BaseTestTreeCache<CuratorCache> client.create().forPath("/test/3", "three".getBytes()); client.create().forPath("/test/2/sub", "two-sub".getBytes()); - cache = newCacheWithListeners(client, "/test"); + cache = newTreeCacheBridgeWithListeners(client, "/test"); cache.start(); assertEvent(Type.NODE_ADDED, "/test"); assertEvent(Type.NODE_ADDED, "/test/1", "one".getBytes()); @@ -89,16 +90,16 @@ public class TestTreeCacheBridge extends BaseTestTreeCache<CuratorCache> assertEvent(Type.INITIALIZED); assertNoMoreEvents(); - Assert.assertEquals(cache.childrenAtPath("/test").keySet(), ImmutableSet.of("1", "2", "3")); - Assert.assertEquals(cache.childrenAtPath("/test/1").keySet(), ImmutableSet.of()); - Assert.assertEquals(cache.childrenAtPath("/test/2").keySet(), ImmutableSet.of("sub")); - Assert.assertNull(cache.get("/test/non_exist")); + Assert.assertEquals(cache.getCurrentChildren("/test").keySet(), ImmutableSet.of("1", "2", "3")); + Assert.assertEquals(cache.getCurrentChildren("/test/1").keySet(), ImmutableSet.of()); + Assert.assertEquals(cache.getCurrentChildren("/test/2").keySet(), ImmutableSet.of("sub")); + Assert.assertTrue(cache.getCurrentChildren("/test/non_exist").isEmpty()); } @Test public void testStartEmpty() throws Exception { - cache = newCacheWithListeners(client, "/test"); + cache = newTreeCacheBridgeWithListeners(client, "/test"); cache.start(); assertEvent(Type.INITIALIZED); @@ -110,7 +111,7 @@ public class TestTreeCacheBridge extends BaseTestTreeCache<CuratorCache> @Test public void testStartEmptyDeeper() throws Exception { - cache = newCacheWithListeners(client, "/test/foo/bar"); + cache = newTreeCacheBridgeWithListeners(client, "/test/foo/bar"); cache.start(); assertEvent(Type.INITIALIZED); @@ -130,17 +131,16 @@ public class TestTreeCacheBridge extends BaseTestTreeCache<CuratorCache> client.create().forPath("/test/3", "three".getBytes()); client.create().forPath("/test/2/sub", "two-sub".getBytes()); - CuratorCacheBuilder builder = CuratorCacheBuilder.builder(client, "/test").withCacheSelector(CacheSelectors.maxDepth(0)); - cache = buildCacheWithListeners(builder); + cache = buildBridgeWithListeners(TreeCache.newBuilder(client, "/test").setMaxDepth(0)); cache.start(); assertEvent(Type.NODE_ADDED, "/test"); assertEvent(Type.INITIALIZED); assertNoMoreEvents(); - Assert.assertEquals(cache.childrenAtPath("/test").keySet(), ImmutableSet.of()); - Assert.assertNull(cache.get("/test/1")); - Assert.assertNull(cache.get("/test/1")); - Assert.assertNull(cache.get("/test/non_exist")); + Assert.assertEquals(cache.getCurrentChildren("/test").keySet(), ImmutableSet.of()); + Assert.assertNull(cache.getCurrentData("/test/1")); + Assert.assertTrue(cache.getCurrentChildren("/test/1").isEmpty()); + Assert.assertNull(cache.getCurrentData("/test/non_exist")); } @Test @@ -152,8 +152,7 @@ public class TestTreeCacheBridge extends BaseTestTreeCache<CuratorCache> client.create().forPath("/test/3", "three".getBytes()); client.create().forPath("/test/2/sub", "two-sub".getBytes()); - CuratorCacheBuilder builder = CuratorCacheBuilder.builder(client, "/test").withCacheSelector(CacheSelectors.maxDepth(1)); - cache = buildCacheWithListeners(builder); + cache = buildBridgeWithListeners(TreeCache.newBuilder(client, "/test").setMaxDepth(1)); cache.start(); assertEvent(Type.NODE_ADDED, "/test"); assertEvent(Type.NODE_ADDED, "/test/1", "one".getBytes()); @@ -162,12 +161,12 @@ public class TestTreeCacheBridge extends BaseTestTreeCache<CuratorCache> assertEvent(Type.INITIALIZED); assertNoMoreEvents(); - Assert.assertEquals(cache.childrenAtPath("/test").keySet(), ImmutableSet.of("1", "2", "3")); - Assert.assertEquals(cache.childrenAtPath("/test/1").keySet(), ImmutableSet.of()); - Assert.assertEquals(cache.childrenAtPath("/test/2").keySet(), ImmutableSet.of()); - Assert.assertNull(cache.get("/test/1/sub")); - Assert.assertNull(cache.get("/test/2/sub")); - Assert.assertNull(cache.get("/test/non_exist")); + Assert.assertEquals(cache.getCurrentChildren("/test").keySet(), ImmutableSet.of("1", "2", "3")); + Assert.assertEquals(cache.getCurrentChildren("/test/1").keySet(), ImmutableSet.of()); + Assert.assertEquals(cache.getCurrentChildren("/test/2").keySet(), ImmutableSet.of()); + Assert.assertNull(cache.getCurrentData("/test/2/sub")); + Assert.assertTrue(cache.getCurrentChildren("/test/2/sub").isEmpty()); + Assert.assertTrue(cache.getCurrentChildren("/test/non_exist").isEmpty()); } @Test @@ -181,8 +180,7 @@ public class TestTreeCacheBridge extends BaseTestTreeCache<CuratorCache> client.create().forPath("/test/foo/bar/3", "three".getBytes()); client.create().forPath("/test/foo/bar/2/sub", "two-sub".getBytes()); - CuratorCacheBuilder builder = CuratorCacheBuilder.builder(client, "/test/foo/bar").withCacheSelector(CacheSelectors.maxDepth(1)); - cache = buildCacheWithListeners(builder); + cache = buildBridgeWithListeners(TreeCache.newBuilder(client, "/test/foo/bar").setMaxDepth(1)); cache.start(); assertEvent(Type.NODE_ADDED, "/test/foo/bar"); assertEvent(Type.NODE_ADDED, "/test/foo/bar/1", "one".getBytes()); @@ -198,7 +196,7 @@ public class TestTreeCacheBridge extends BaseTestTreeCache<CuratorCache> client.create().forPath("/test"); client.create().forPath("/test/one", "hey there".getBytes()); - cache = newCacheWithListeners(client, "/test"); + cache = newTreeCacheBridgeWithListeners(client, "/test"); cache.start(); assertEvent(Type.NODE_ADDED, "/test"); assertEvent(Type.NODE_ADDED, "/test/one"); @@ -212,7 +210,7 @@ public class TestTreeCacheBridge extends BaseTestTreeCache<CuratorCache> client.create().forPath("/test"); client.create().forPath("/test/one", "hey there".getBytes()); - cache = newCacheWithListeners(client, "/"); + cache = newTreeCacheBridgeWithListeners(client, "/"); cache.start(); assertEvent(Type.NODE_ADDED, "/"); assertEvent(Type.NODE_ADDED, "/test"); @@ -220,10 +218,10 @@ public class TestTreeCacheBridge extends BaseTestTreeCache<CuratorCache> assertEvent(Type.INITIALIZED); assertNoMoreEvents(); - Assert.assertTrue(cache.childrenAtPath("/").keySet().contains("test")); - Assert.assertEquals(cache.childrenAtPath("/test").keySet(), ImmutableSet.of("one")); - Assert.assertEquals(cache.childrenAtPath("/test/one").keySet(), ImmutableSet.of()); - Assert.assertEquals(new String(cache.get("/test/one").getData()), "hey there"); + Assert.assertTrue(cache.getCurrentChildren("/").keySet().contains("test")); + Assert.assertEquals(cache.getCurrentChildren("/test").keySet(), ImmutableSet.of("one")); + Assert.assertEquals(cache.getCurrentChildren("/test/one").keySet(), ImmutableSet.of()); + Assert.assertEquals(new String(cache.getCurrentData("/test/one").getData()), "hey there"); } @Test @@ -232,18 +230,17 @@ public class TestTreeCacheBridge extends BaseTestTreeCache<CuratorCache> client.create().forPath("/test"); client.create().forPath("/test/one", "hey there".getBytes()); - CuratorCacheBuilder builder = CuratorCacheBuilder.builder(client, "/").withCacheSelector(CacheSelectors.maxDepth(1)); - cache = buildCacheWithListeners(builder); + cache = buildBridgeWithListeners(TreeCache.newBuilder(client, "/").setMaxDepth(1)); cache.start(); assertEvent(Type.NODE_ADDED, "/"); assertEvent(Type.NODE_ADDED, "/test"); assertEvent(Type.INITIALIZED); assertNoMoreEvents(); - Assert.assertTrue(cache.childrenAtPath("/").keySet().contains("test")); - Assert.assertEquals(cache.childrenAtPath("/test").keySet(), ImmutableSet.of()); - Assert.assertNull(cache.get("/test/one")); - Assert.assertNull(cache.get("/test/one")); + Assert.assertTrue(cache.getCurrentChildren("/").keySet().contains("test")); + Assert.assertEquals(cache.getCurrentChildren("/test").keySet(), ImmutableSet.of()); + Assert.assertNull(cache.getCurrentData("/test/one")); + Assert.assertTrue(cache.getCurrentChildren("/test/one").isEmpty()); } @Test @@ -254,16 +251,16 @@ public class TestTreeCacheBridge extends BaseTestTreeCache<CuratorCache> client.create().forPath("/outer/test"); client.create().forPath("/outer/test/one", "hey there".getBytes()); - cache = newCacheWithListeners(client.usingNamespace("outer"), "/test"); + cache = newTreeCacheBridgeWithListeners(client.usingNamespace("outer"), "/test"); cache.start(); assertEvent(Type.NODE_ADDED, "/test"); assertEvent(Type.NODE_ADDED, "/test/one"); assertEvent(Type.INITIALIZED); assertNoMoreEvents(); - Assert.assertEquals(cache.childrenAtPath("/test").keySet(), ImmutableSet.of("one")); - Assert.assertEquals(cache.childrenAtPath("/test/one").keySet(), ImmutableSet.of()); - Assert.assertEquals(new String(cache.get("/test/one").getData()), "hey there"); + Assert.assertEquals(cache.getCurrentChildren("/test").keySet(), ImmutableSet.of("one")); + Assert.assertEquals(cache.getCurrentChildren("/test/one").keySet(), ImmutableSet.of()); + Assert.assertEquals(new String(cache.getCurrentData("/test/one").getData()), "hey there"); } @Test @@ -274,7 +271,7 @@ public class TestTreeCacheBridge extends BaseTestTreeCache<CuratorCache> client.create().forPath("/outer/test"); client.create().forPath("/outer/test/one", "hey there".getBytes()); - cache = newCacheWithListeners(client.usingNamespace("outer"), "/"); + cache = newTreeCacheBridgeWithListeners(client.usingNamespace("outer"), "/"); cache.start(); assertEvent(Type.NODE_ADDED, "/"); assertEvent(Type.NODE_ADDED, "/foo"); @@ -282,17 +279,17 @@ public class TestTreeCacheBridge extends BaseTestTreeCache<CuratorCache> assertEvent(Type.NODE_ADDED, "/test/one"); assertEvent(Type.INITIALIZED); assertNoMoreEvents(); - Assert.assertEquals(cache.childrenAtPath("/").keySet(), ImmutableSet.of("foo", "test")); - Assert.assertEquals(cache.childrenAtPath("/foo").keySet(), ImmutableSet.of()); - Assert.assertEquals(cache.childrenAtPath("/test").keySet(), ImmutableSet.of("one")); - Assert.assertEquals(cache.childrenAtPath("/test/one").keySet(), ImmutableSet.of()); - Assert.assertEquals(new String(cache.get("/test/one").getData()), "hey there"); + Assert.assertEquals(cache.getCurrentChildren("/").keySet(), ImmutableSet.of("foo", "test")); + Assert.assertEquals(cache.getCurrentChildren("/foo").keySet(), ImmutableSet.of()); + Assert.assertEquals(cache.getCurrentChildren("/test").keySet(), ImmutableSet.of("one")); + Assert.assertEquals(cache.getCurrentChildren("/test/one").keySet(), ImmutableSet.of()); + Assert.assertEquals(new String(cache.getCurrentData("/test/one").getData()), "hey there"); } @Test public void testSyncInitialPopulation() throws Exception { - cache = newCacheWithListeners(client, "/test"); + cache = newTreeCacheBridgeWithListeners(client, "/test"); cache.start(); assertEvent(Type.INITIALIZED); @@ -311,7 +308,7 @@ public class TestTreeCacheBridge extends BaseTestTreeCache<CuratorCache> client.create().forPath("/test/2", "2".getBytes()); client.create().forPath("/test/3", "3".getBytes()); - cache = newCacheWithListeners(client, "/test"); + cache = newTreeCacheBridgeWithListeners(client, "/test"); cache.start(); assertEvent(Type.NODE_ADDED, "/test"); assertEvent(Type.NODE_ADDED, "/test/1"); @@ -326,7 +323,7 @@ public class TestTreeCacheBridge extends BaseTestTreeCache<CuratorCache> { client.create().forPath("/test"); - cache = buildCacheWithListeners(CuratorCacheBuilder.builder(client, "/test").withCacheSelector(CacheSelectors.statOnly())); + cache = buildBridgeWithListeners(TreeCache.newBuilder(client, "/test").setCacheData(false)); cache.start(); assertEvent(Type.NODE_ADDED, "/test"); assertEvent(Type.INITIALIZED); @@ -338,9 +335,9 @@ public class TestTreeCacheBridge extends BaseTestTreeCache<CuratorCache> assertEvent(Type.NODE_UPDATED, "/test/foo"); assertNoMoreEvents(); - Assert.assertNotNull(cache.get("/test/foo")); + Assert.assertNotNull(cache.getCurrentData("/test/foo")); // No byte data querying the tree because we're not caching data. - Assert.assertEquals(cache.get("/test/foo").getData().length, 0); + Assert.assertNull(cache.getCurrentData("/test/foo").getData()); } @Test @@ -349,7 +346,7 @@ public class TestTreeCacheBridge extends BaseTestTreeCache<CuratorCache> client.create().forPath("/test"); client.create().forPath("/test/foo", "one".getBytes()); - cache = newCacheWithListeners(client, "/test"); + cache = newTreeCacheBridgeWithListeners(client, "/test"); cache.start(); assertEvent(Type.NODE_ADDED, "/test"); assertEvent(Type.NODE_ADDED, "/test/foo"); @@ -374,7 +371,7 @@ public class TestTreeCacheBridge extends BaseTestTreeCache<CuratorCache> client.create().forPath("/test"); client.create().forPath("/test/foo", "one".getBytes()); - cache = newCacheWithListeners(client, "/test/foo"); + cache = newTreeCacheBridgeWithListeners(client, "/test/foo"); cache.start(); assertEvent(Type.NODE_ADDED, "/test/foo"); assertEvent(Type.INITIALIZED); @@ -397,7 +394,7 @@ public class TestTreeCacheBridge extends BaseTestTreeCache<CuratorCache> { client.create().forPath("/test"); - cache = newCacheWithListeners(client, "/test"); + cache = newTreeCacheBridgeWithListeners(client, "/test"); cache.start(); assertEvent(Type.NODE_ADDED, "/test"); assertEvent(Type.INITIALIZED); @@ -421,47 +418,103 @@ public class TestTreeCacheBridge extends BaseTestTreeCache<CuratorCache> { client.create().forPath("/test"); - cache = newCacheWithListeners(client, "/test"); + cache = newTreeCacheBridgeWithListeners(client, "/test"); cache.start(); assertEvent(Type.NODE_ADDED, "/test"); assertEvent(Type.INITIALIZED); - Assert.assertEquals(cache.childrenAtPath("/test").keySet(), ImmutableSet.of()); - Assert.assertNull(cache.get("/t")); - Assert.assertNull(cache.get("/testing")); + Assert.assertEquals(cache.getCurrentChildren("/test").keySet(), ImmutableSet.of()); + Assert.assertTrue(cache.getCurrentChildren("/t").isEmpty()); + Assert.assertTrue(cache.getCurrentChildren("/testing").isEmpty()); client.create().forPath("/test/one", "hey there".getBytes()); assertEvent(Type.NODE_ADDED, "/test/one"); - Assert.assertEquals(cache.childrenAtPath("/test").keySet(), ImmutableSet.of("one")); - Assert.assertEquals(new String(cache.get("/test/one").getData()), "hey there"); - Assert.assertEquals(cache.childrenAtPath("/test/one").keySet(), ImmutableSet.of()); - Assert.assertNull(cache.get("/test/o")); - Assert.assertNull(cache.get("/test/onely")); + Assert.assertEquals(cache.getCurrentChildren("/test").keySet(), ImmutableSet.of("one")); + Assert.assertEquals(new String(cache.getCurrentData("/test/one").getData()), "hey there"); + Assert.assertEquals(cache.getCurrentChildren("/test/one").keySet(), ImmutableSet.of()); + Assert.assertTrue(cache.getCurrentChildren("/test/o").isEmpty()); + Assert.assertTrue(cache.getCurrentChildren("/test/onely").isEmpty()); client.setData().forPath("/test/one", "sup!".getBytes()); assertEvent(Type.NODE_UPDATED, "/test/one"); - Assert.assertEquals(cache.childrenAtPath("/test").keySet(), ImmutableSet.of("one")); - Assert.assertEquals(new String(cache.get("/test/one").getData()), "sup!"); + Assert.assertEquals(cache.getCurrentChildren("/test").keySet(), ImmutableSet.of("one")); + Assert.assertEquals(new String(cache.getCurrentData("/test/one").getData()), "sup!"); client.delete().forPath("/test/one"); assertEvent(Type.NODE_REMOVED, "/test/one", "sup!".getBytes()); - Assert.assertEquals(cache.childrenAtPath("/test").keySet(), ImmutableSet.of()); + Assert.assertEquals(cache.getCurrentChildren("/test").keySet(), ImmutableSet.of()); assertNoMoreEvents(); } @Test + public void testBasicsOnTwoCaches() throws Exception + { + TreeCacheBridge cache2 = newTreeCacheBridgeWithListeners(client, "/test"); + cache2.getListenable().removeListener(eventListener); // Don't listen on the second cache. + + // Just ensures the same event count; enables test flow control on cache2. + final Semaphore semaphore = new Semaphore(0); + cache2.getListenable().addListener(new TreeCacheListener() + { + @Override + public void childEvent(CuratorFramework client, TreeCacheEvent event) throws Exception + { + semaphore.release(); + } + }); + + try + { + client.create().forPath("/test"); + + cache = newTreeCacheBridgeWithListeners(client, "/test"); + cache.start(); + cache2.start(); + + assertEvent(Type.NODE_ADDED, "/test"); + assertEvent(Type.INITIALIZED); + semaphore.acquire(2); + + client.create().forPath("/test/one", "hey there".getBytes()); + assertEvent(Type.NODE_ADDED, "/test/one"); + Assert.assertEquals(new String(cache.getCurrentData("/test/one").getData()), "hey there"); + semaphore.acquire(); + Assert.assertEquals(new String(cache2.getCurrentData("/test/one").getData()), "hey there"); + + client.setData().forPath("/test/one", "sup!".getBytes()); + assertEvent(Type.NODE_UPDATED, "/test/one"); + Assert.assertEquals(new String(cache.getCurrentData("/test/one").getData()), "sup!"); + semaphore.acquire(); + Assert.assertEquals(new String(cache2.getCurrentData("/test/one").getData()), "sup!"); + + client.delete().forPath("/test/one"); + assertEvent(Type.NODE_REMOVED, "/test/one", "sup!".getBytes()); + Assert.assertNull(cache.getCurrentData("/test/one")); + semaphore.acquire(); + Assert.assertNull(cache2.getCurrentData("/test/one")); + + assertNoMoreEvents(); + Assert.assertEquals(semaphore.availablePermits(), 0); + } + finally + { + CloseableUtils.closeQuietly(cache2); + } + } + + @Test public void testDeleteNodeAfterCloseDoesntCallExecutor() throws Exception { client.create().forPath("/test"); - cache = newCacheWithListeners(client, "/test"); + cache = newTreeCacheBridgeWithListeners(client, "/test"); cache.start(); assertEvent(Type.NODE_ADDED, "/test"); assertEvent(Type.INITIALIZED); client.create().forPath("/test/one", "hey there".getBytes()); assertEvent(Type.NODE_ADDED, "/test/one"); - Assert.assertEquals(new String(cache.get("/test/one").getData()), "hey there"); + Assert.assertEquals(new String(cache.getCurrentData("/test/one").getData()), "hey there"); cache.close(); assertNoMoreEvents(); @@ -484,7 +537,7 @@ public class TestTreeCacheBridge extends BaseTestTreeCache<CuratorCache> initCuratorFramework(); // Start the client disconnected. - cache = newCacheWithListeners(client, "/test"); + cache = newTreeCacheBridgeWithListeners(client, "/test"); cache.start(); assertNoMoreEvents(); http://git-wip-us.apache.org/repos/asf/curator/blob/02073a71/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestTreeCacheBridgeRandomTree.java ---------------------------------------------------------------------- diff --git a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestTreeCacheBridgeRandomTree.java b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestTreeCacheBridgeRandomTree.java index f304c24..140ee64 100644 --- a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestTreeCacheBridgeRandomTree.java +++ b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestTreeCacheBridgeRandomTree.java @@ -21,8 +21,6 @@ package org.apache.curator.framework.recipes.cache; import com.google.common.collect.Iterables; import org.apache.curator.framework.CuratorFramework; -import org.apache.curator.framework.recipes.watch.CachedNode; -import org.apache.curator.framework.recipes.watch.CuratorCache; import org.apache.curator.utils.ZKPaths; import org.testng.Assert; import org.testng.annotations.Test; @@ -31,7 +29,8 @@ import java.util.HashMap; import java.util.Map; import java.util.Random; -public class TestTreeCacheBridgeRandomTree extends BaseTestTreeCache<CuratorCache> +@SuppressWarnings("deprecation") +public class TestTreeCacheBridgeRandomTree extends BaseTestTreeCache<TreeCacheBridge> { /** * A randomly generated source-of-truth node for {@link #testGiantRandomDeepTree()} @@ -57,22 +56,39 @@ public class TestTreeCacheBridgeRandomTree extends BaseTestTreeCache<CuratorCach private final Random random = new Random(); private boolean withDepth = false; + @Test + public void testGiantRandomDeepTree() throws Exception { + doTestGiantRandomDeepTree(); + } + + @Test + public void testGiantRandomDeepTreeWithDepth() throws Exception { + withDepth = true; + doTestGiantRandomDeepTree(); + } + /** * Randomly construct a large tree of test data in memory, mirror it into ZK, and then use * a TreeCache to follow the changes. At each step, assert that TreeCache matches our * source-of-truth test data, and that we see exactly the set of events we expect to see. */ - - @Test - public void testGiantRandomDeepTree() throws Exception { + private void doTestGiantRandomDeepTree() throws Exception + { client.create().forPath("/tree", null); CuratorFramework cl = client.usingNamespace("tree"); - cache = newCacheWithListeners(cl, "/"); + if ( withDepth ) + { + cache = buildBridgeWithListeners(TreeCache.newBuilder(cl, "/").setMaxDepth(TEST_DEPTH)); + } + else + { + cache = newTreeCacheBridgeWithListeners(cl, "/"); + } cache.start(); assertEvent(TreeCacheEvent.Type.NODE_ADDED, "/"); assertEvent(TreeCacheEvent.Type.INITIALIZED); - TestNode root = new TestNode("/", new byte[0]); + TestNode root = new TestNode("/", null); int maxDepth = 0; int adds = 0; int removals = 0; @@ -169,7 +185,7 @@ public class TestTreeCacheBridgeRandomTree extends BaseTestTreeCache<CuratorCach } // Each iteration, ensure the cached state matches our source-of-truth tree. - assertNodeEquals(ListenerBridge.toData("/", cache.get("/")), root); + assertNodeEquals(cache.getCurrentData("/"), root); assertTreeEquals(cache, root, 0); } @@ -190,10 +206,10 @@ public class TestTreeCacheBridgeRandomTree extends BaseTestTreeCache<CuratorCach /** * Recursively assert that current children equal expected children. */ - private void assertTreeEquals(CuratorCache cache, TestNode expectedNode, int depth) + private void assertTreeEquals(TreeCacheBridge cache, TestNode expectedNode, int depth) { String path = expectedNode.fullPath; - Map<String, CachedNode> cacheChildren = cache.childrenAtPath(path); + Map<String, ChildData> cacheChildren = cache.getCurrentChildren(path); Assert.assertNotNull(cacheChildren, path); if (withDepth && depth == TEST_DEPTH) { @@ -205,9 +221,9 @@ public class TestTreeCacheBridgeRandomTree extends BaseTestTreeCache<CuratorCach for ( Map.Entry<String, TestNode> entry : expectedNode.children.entrySet() ) { String nodeName = entry.getKey(); - CachedNode childData = cacheChildren.get(nodeName); + ChildData childData = cacheChildren.get(nodeName); TestNode expectedChild = entry.getValue(); - assertNodeEquals(ListenerBridge.toData(expectedChild.fullPath, childData), expectedChild); + assertNodeEquals(childData, expectedChild); assertTreeEquals(cache, expectedChild, depth + 1); } }
