This is an automated email from the ASF dual-hosted git repository.

randgalt pushed a commit to branch persistent-watcher-cache
in repository https://gitbox.apache.org/repos/asf/curator.git

commit 0210ab765dc256b811a9a3a16cacac7f0c0fa573
Author: randgalt <[email protected]>
AuthorDate: Mon Oct 7 15:48:57 2019 +0300

    wip
---
 .../framework/recipes/cache/CuratorCache.java      | 141 ++++++++
 .../framework/recipes/cache/CuratorCacheImpl.java  | 273 +++++++++++++++
 .../recipes/cache/CuratorCacheListener.java        | 236 +++++++++++++
 .../recipes/cache/CuratorCacheStorage.java         | 140 ++++++++
 .../curator/framework/recipes/cache/NodeCache.java |   3 +
 .../recipes/cache/NodeCacheListenerWrapper.java    |  47 +++
 .../framework/recipes/cache/PathChildrenCache.java |   3 +
 .../cache/PathChildrenCacheListenerWrapper.java    |  71 ++++
 .../recipes/cache/StandardCuratorCacheStorage.java | 111 ++++++
 .../curator/framework/recipes/cache/TreeCache.java |   3 +
 .../recipes/cache/TreeCacheListenerWrapper.java    |  71 ++++
 .../framework/recipes/watch/PersistentWatcher.java |  39 ++-
 .../framework/recipes/cache/TestCuratorCache.java  | 110 ++++++
 .../recipes/cache/TestCuratorCacheConsistency.java | 384 +++++++++++++++++++++
 .../cache/TestCuratorCacheEventOrdering.java       |  49 +++
 .../recipes/cache/TestCuratorCacheWrappers.java    | 114 ++++++
 .../recipes/cache/TestWrappedNodeCache.java        | 165 +++++++++
 .../recipes/watch/TestPersistentWatcher.java       |  20 ++
 .../org/apache/curator/test/TestingCluster.java    |   2 +-
 .../x/async/api/AsyncPersistentWatchBuilder.java   |  18 +
 .../details/AsyncPersistentWatchBuilderImpl.java   |  75 ----
 21 files changed, 1997 insertions(+), 78 deletions(-)

diff --git 
a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/CuratorCache.java
 
b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/CuratorCache.java
new file mode 100644
index 0000000..44465c8
--- /dev/null
+++ 
b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/CuratorCache.java
@@ -0,0 +1,141 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.curator.framework.recipes.cache;
+
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.listen.Listenable;
+import java.io.Closeable;
+import java.util.Optional;
+import java.util.function.Consumer;
+
+/**
+ * <p>
+ *     A utility that attempts to keep the data from a node locally cached. 
Optionally the entire
+ *     tree of children below the node can also be cached. Will respond to 
update/create/delete events, pull
+ *     down the data, etc. You can register listeners that will get notified 
when changes occur.
+ * </p>
+ *
+ * <p>
+ *     <b>IMPORTANT</b> - it's not possible to stay transactionally in sync. 
Users of this class must
+ *     be prepared for false-positives and false-negatives. Additionally, 
always use the version number
+ *     when updating data to avoid overwriting another process' change.
+ * </p>
+ */
+public interface CuratorCache extends Closeable
+{
+    /**
+     * cache build options
+     */
+    enum Options
+    {
+        /**
+         * Cache the entire tree of nodes starting at the given node
+         */
+        RECURSIVE,
+
+        /**
+         * Decompress data via {@link 
org.apache.curator.framework.api.GetDataBuilder#decompressed()}
+         */
+        COMPRESSED_DATA
+    }
+
+    /**
+     * Return a Curator cache for the given path with the given options using 
a standard storage instance
+     *
+     * @param client Curator client
+     * @param path path to cache
+     * @param options any options
+     * @return cache (note it must be started via {@link #start()}
+     */
+    static CuratorCache build(CuratorFramework client, String path, Options... 
options)
+    {
+        return build(client, CuratorCacheStorage.standard(), path, options);
+    }
+
+    /**
+     * Return a Curator cache for the given path with the given options and 
the given storage instance
+     *
+     * @param client Curator client
+     * @param storage storage to use
+     * @param path path to cache
+     * @param options any options
+     * @return cache (note it must be started via {@link #start()}
+     */
+    static CuratorCache build(CuratorFramework client, CuratorCacheStorage 
storage, String path, Options... options)
+    {
+        return new CuratorCacheImpl(client, storage, path, options);
+    }
+
+    /**
+     * Start the cache. This will cause a complete refresh from the cache's 
root node and generate
+     * events for all nodes found, etc.
+     */
+    void start();
+
+    /**
+     * Close the cache, stop responding to events, etc. Note: also calls 
{@link CuratorCacheStorage#close()}
+     */
+    @Override
+    void close();
+
+    /**
+     * Utility to force a rebuild of the cache. Normally, this should not ever 
be needed
+     */
+    void forceRebuild();
+
+    /**
+     * Return the storage instance being used
+     *
+     * @return storage
+     */
+    CuratorCacheStorage storage();
+
+    /**
+     * Return the root node being cached (i.e. the node passed to the builder)
+     *
+     * @return root node path
+     */
+    String getRootPath();
+
+    /**
+     * Convenience to return the root node data
+     *
+     * @return data (if it's in the cache)
+     */
+    default Optional<ChildData> getRootData()
+    {
+        return storage().get(getRootPath());
+    }
+
+    /**
+     * Return the listener container so that listeners can be registered to be 
notified of changes to the cache
+     *
+     * @return listener container
+     */
+    Listenable<CuratorCacheListener> listenable();
+
+    /**
+     * By default any unexpected exception is handled by logging the 
exception. You can change
+     * so that a handler is called instead. Under normal circumstances, this 
shouldn't be necessary.
+     *
+     * @param newHandler new exception handler or {@code null} to reset to the 
default logging
+     */
+    void setExceptionHandler(Consumer<Exception> newHandler);
+}
diff --git 
a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/CuratorCacheImpl.java
 
b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/CuratorCacheImpl.java
new file mode 100644
index 0000000..a31f841
--- /dev/null
+++ 
b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/CuratorCacheImpl.java
@@ -0,0 +1,273 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.curator.framework.recipes.cache;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Sets;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.api.BackgroundCallback;
+import org.apache.curator.framework.api.CuratorEvent;
+import org.apache.curator.framework.listen.Listenable;
+import org.apache.curator.framework.listen.StandardListenerManager;
+import org.apache.curator.framework.recipes.watch.PersistentWatcher;
+import org.apache.curator.utils.ThreadUtils;
+import org.apache.curator.utils.ZKPaths;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.WatchedEvent;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import java.util.Collections;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Consumer;
+
+import static org.apache.zookeeper.KeeperException.Code.NONODE;
+import static org.apache.zookeeper.KeeperException.Code.OK;
+
+class CuratorCacheImpl implements CuratorCache
+{
+    private final Logger log = LoggerFactory.getLogger(getClass());
+    private final AtomicReference<State> state = new 
AtomicReference<>(State.LATENT);
+    private final PersistentWatcher persistentWatcher;
+    private final CuratorFramework client;
+    private final CuratorCacheStorage storage;
+    private final String path;
+    private final boolean recursive;
+    private final boolean compressedData;
+    private final StandardListenerManager<CuratorCacheListener> 
listenerManager = StandardListenerManager.standard();
+    private volatile Consumer<Exception> exceptionHandler;
+
+    private enum State
+    {
+        LATENT,
+        STARTED,
+        CLOSED
+    }
+
+    CuratorCacheImpl(CuratorFramework client, CuratorCacheStorage storage, 
String path, Options... optionsArg)
+    {
+        Set<Options> options = (optionsArg != null) ? 
Sets.newHashSet(optionsArg) : Collections.emptySet();
+        this.client = client;
+        this.storage = storage;
+        this.path = path;
+        this.recursive = options.contains(Options.RECURSIVE);
+        this.compressedData = options.contains(Options.COMPRESSED_DATA);
+        persistentWatcher = new PersistentWatcher(client, path, recursive);
+        persistentWatcher.getListenable().addListener(this::processEvent);
+        persistentWatcher.getResetListenable().addListener(this::forceRebuild);
+        setExceptionHandler(null);
+    }
+
+    @Override
+    public void start()
+    {
+        Preconditions.checkState(state.compareAndSet(State.LATENT, 
State.STARTED), "Already started");
+        persistentWatcher.start();
+    }
+
+    @Override
+    public void close()
+    {
+        if ( state.compareAndSet(State.STARTED, State.CLOSED) )
+        {
+            persistentWatcher.close();
+            storage.close();
+        }
+    }
+
+    @Override
+    public void forceRebuild()
+    {
+        if ( state.get() != State.STARTED )
+        {
+            return;
+        }
+
+        nodeChanged(path);
+        storage.stream()
+            .map(ChildData::getPath)
+            .filter(p -> !p.equals(path))
+            .forEach(this::nodeChanged);
+    }
+
+    @Override
+    public String getRootPath()
+    {
+        return path;
+    }
+
+    @Override
+    public CuratorCacheStorage storage()
+    {
+        return storage;
+    }
+
+    @Override
+    public Listenable<CuratorCacheListener> listenable()
+    {
+        return listenerManager;
+    }
+
+    @Override
+    public void setExceptionHandler(Consumer<Exception> newHandler)
+    {
+        this.exceptionHandler = (newHandler != null) ? newHandler : e -> 
log.error("CuratorCache error", e);
+    }
+
+    private void processEvent(WatchedEvent event)
+    {
+        if ( state.get() != State.STARTED )
+        {
+            return;
+        }
+
+        switch ( event.getType() )
+        {
+            case NodeDataChanged:
+            case NodeCreated:
+            {
+                nodeChanged(event.getPath());
+                break;
+            }
+
+            case NodeDeleted:
+            {
+                removeStorage(event.getPath());
+                break;
+            }
+
+            case NodeChildrenChanged:
+            {
+                nodeChildrenChanged(event.getPath());
+                break;
+            }
+        }
+    }
+
+    private void nodeChildrenChanged(String fromPath)
+    {
+        if ( (state.get() != State.STARTED) || !recursive )
+        {
+            return;
+        }
+
+        try
+        {
+            BackgroundCallback callback = (__, event) -> {
+                if ( event.getResultCode() == OK.intValue() )
+                {
+                    event.getChildren().forEach(child -> 
nodeChanged(ZKPaths.makePath(fromPath, child)));
+                }
+                else if ( event.getResultCode() == NONODE.intValue() )
+                {
+                    removeStorage(event.getPath());
+                }
+                else
+                {
+                    handleException(event);
+                }
+            };
+            client.getChildren().inBackground(callback).forPath(fromPath);
+        }
+        catch ( Exception e )
+        {
+            handleException(e);
+        }
+    }
+
+    private void nodeChanged(String fromPath)
+    {
+        if ( state.get() != State.STARTED )
+        {
+            return;
+        }
+
+        try
+        {
+            BackgroundCallback callback = (__, event) -> {
+                if ( event.getResultCode() == OK.intValue() )
+                {
+                    putStorage(new ChildData(event.getPath(), event.getStat(), 
event.getData()));
+                    nodeChildrenChanged(event.getPath());
+                }
+                else if ( event.getResultCode() == NONODE.intValue() )
+                {
+                    removeStorage(event.getPath());
+                }
+                else
+                {
+                    handleException(event);
+                }
+            };
+            if ( compressedData )
+            {
+                
client.getData().decompressed().inBackground(callback).forPath(fromPath);
+            }
+            else
+            {
+                client.getData().inBackground(callback).forPath(fromPath);
+            }
+        }
+        catch ( Exception e )
+        {
+            handleException(e);
+        }
+    }
+
+    private void putStorage(ChildData data)
+    {
+        Optional<ChildData> previousData = storage.put(data);
+        if ( previousData.isPresent() )
+        {
+            if ( previousData.get().getStat().getMzxid() != 
data.getStat().getMzxid() )
+            {
+                callListeners(l -> l.nodeChanged(previousData.get(), data));
+            }
+        }
+        else
+        {
+            callListeners(l -> l.nodeChanged(null, data));
+        }
+    }
+
+    private void removeStorage(String path)
+    {
+        storage.remove(path).ifPresent(previousData -> callListeners(l -> 
l.nodeChanged(previousData, null)));
+    }
+
+    private void callListeners(Consumer<CuratorCacheListener> proc)
+    {
+        if ( state.get() == State.STARTED )
+        {
+            client.runSafe(() -> listenerManager.forEach(proc));
+        }
+    }
+
+    private void handleException(CuratorEvent event)
+    {
+        
handleException(KeeperException.create(KeeperException.Code.get(event.getResultCode())));
+    }
+
+    private void handleException(Exception e)
+    {
+        ThreadUtils.checkInterrupted(e);
+        exceptionHandler.accept(e);
+    }
+}
diff --git 
a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/CuratorCacheListener.java
 
b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/CuratorCacheListener.java
new file mode 100644
index 0000000..cd9b66d
--- /dev/null
+++ 
b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/CuratorCacheListener.java
@@ -0,0 +1,236 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.curator.framework.recipes.cache;
+
+import org.apache.curator.framework.CuratorFramework;
+import java.util.function.Consumer;
+
+/**
+ * Listener for {@link CuratorCache} events. The main functional interface is 
general purpose
+ * but various event specific modifiers are provided. See the doc for each one 
for details
+ */
+@SuppressWarnings("deprecation")
+@FunctionalInterface
+public interface CuratorCacheListener
+{
+    /**
+     * <p>
+     *     Called when a node is created, changed or deleted.
+     *     The general contract is that the type of change is implied by the 
arguments. If
+     *     both {@code oldNode} and {@code node} are non-null, this is an 
existing node in the cache
+     *     that has changed. If only {@code oldNode} is non-null, this is an 
existing node that is
+     *     being removed from the cache. If only {@code node} is non-null, 
this is a new node. Note:
+     *     {@code oldNode} and {@code node} will never both be null.
+     * </p>
+     *
+     * <p>
+     *     You can get a descriptive
+     *     type for the operation by calling {@link 
org.apache.curator.framework.recipes.cache.CuratorCacheListener.Type#get(ChildData,
 ChildData)}
+     *     with the {@code oldNode} and {@code node} arguments
+     * </p>
+     *
+     * @param oldNode the old node or null
+     * @param node the new node or null
+     */
+    void nodeChanged(ChildData oldNode, ChildData node);
+
+    /**
+     * An enumerated type that describes a change
+     */
+    enum Type
+    {
+        /**
+         * A new node was added to the cache
+         */
+        NODE_CREATED,
+
+        /**
+         * A node already in the cache has changed
+         */
+        NODE_CHANGED,
+
+        /**
+         * A node already in the cache was deleted
+         */
+        NODE_DELETED
+        ;
+
+        /**
+         * Given arguments to {@link 
org.apache.curator.framework.recipes.cache.CuratorCacheListener#nodeChanged(ChildData,
 ChildData)}
+         * return the enumerated event type
+         *
+         * @param oldNode the old node or null
+         * @param node the main node or null
+         * @return event type
+         * @throws IllegalArgumentException if both oldNode and node are null
+         */
+        public static Type get(ChildData oldNode, ChildData node)
+        {
+            if ( (oldNode != null) && (node != null) )
+            {
+                return NODE_CHANGED;
+            }
+            else if ( oldNode != null )
+            {
+                return NODE_DELETED;
+            }
+            else if ( node != null )
+            {
+                return NODE_CREATED;
+            }
+            throw new IllegalArgumentException("oldNode and node cannot both 
be null");
+        }
+    }
+
+    /**
+     * Return a listener that only responds to {@link Type#NODE_CREATED} events
+     *
+     * @param listener listener to wrap
+     * @return wrapped listener
+     */
+    static CuratorCacheListener forCreates(Consumer<ChildData> listener)
+    {
+        return (oldNode, node) -> {
+            if ( Type.get(oldNode, node) == Type.NODE_CREATED )
+            {
+                listener.accept(node);
+            }
+        };
+    }
+
+    /**
+     * Return a listener that only responds to {@link Type#NODE_CHANGED} events
+     *
+     * @param listener listener to wrap
+     * @return wrapped listener
+     */
+    static CuratorCacheListener forChanges(CuratorCacheListener listener)
+    {
+        return (oldNode, node) -> {
+            if ( Type.get(oldNode, node) == Type.NODE_CHANGED )
+            {
+                listener.nodeChanged(oldNode, node);
+            }
+        };
+    }
+
+    /**
+     * Return a listener that only responds to {@link Type#NODE_DELETED} events
+     *
+     * @param listener listener to wrap
+     * @return wrapped listener
+     */
+    static CuratorCacheListener forDeletes(Consumer<ChildData> listener)
+    {
+        return (oldNode, node) -> {
+            if ( Type.get(oldNode, node) == Type.NODE_DELETED )
+            {
+                listener.accept(oldNode);
+            }
+        };
+    }
+
+    /**
+     * Return a listener that forwards all event types to the non-null 
provided listeners.
+     *
+     * @param createsListener if not null, called for {@link 
Type#NODE_CREATED} events
+     * @param changesListener if not null, called for {@link 
Type#NODE_CHANGED} events
+     * @param deletesListener if not null, called for {@link 
Type#NODE_DELETED} events
+     * @return wrapped listener
+     */
+    static CuratorCacheListener forAll(Consumer<ChildData> createsListener, 
CuratorCacheListener changesListener, Consumer<ChildData> deletesListener)
+    {
+        return (oldNode, node) -> {
+            switch ( Type.get(oldNode, node) )
+            {
+                case NODE_CREATED:
+                {
+                    if ( createsListener != null )
+                    {
+                        createsListener.accept(node);
+                    }
+                    break;
+                }
+
+                case NODE_CHANGED:
+                {
+                    if ( changesListener != null )
+                    {
+                        changesListener.nodeChanged(oldNode, node);
+                    }
+                    break;
+                }
+
+                case NODE_DELETED:
+                {
+                    if ( deletesListener != null )
+                    {
+                        deletesListener.accept(oldNode);
+                    }
+                    break;
+                }
+            }
+        };
+    }
+
+    /**
+     * Bridge listener. You can reuse old-style {@link 
org.apache.curator.framework.recipes.cache.PathChildrenCacheListener}s
+     * with CuratorCache. IMPORTANT: the connection state methods in the 
listener will never be called as CuratorCache
+     * does not register the listener with the connection state listener 
container. Also note that CuratorCache
+     * behaves differently than {@link 
org.apache.curator.framework.recipes.cache.PathChildrenCache} so
+     * things such as event ordering will likely be different.
+     *
+     * @param client the curator client
+     * @param listener the listener to wrap
+     * @return a CuratorCacheListener that forwards to the given listener
+     */
+    static CuratorCacheListener wrap(CuratorFramework client, 
PathChildrenCacheListener listener)
+    {
+        return new PathChildrenCacheListenerWrapper(listener, client);
+    }
+
+    /**
+     * Bridge listener. You can reuse old-style {@link 
org.apache.curator.framework.recipes.cache.TreeCacheListener}s
+     * with CuratorCache. IMPORTANT: the connection state methods in the 
listener will never be called as CuratorCache
+     * does not register the listener with the connection state listener 
container. Also note that CuratorCache
+     * behaves differently than {@link 
org.apache.curator.framework.recipes.cache.TreeCache} so
+     * things such as event ordering will likely be different.
+     *
+     * @param client the curator client
+     * @param listener the listener to wrap
+     * @return a CuratorCacheListener that forwards to the given listener
+     */
+    static CuratorCacheListener wrap(CuratorFramework client, 
TreeCacheListener listener)
+    {
+        return new TreeCacheListenerWrapper(client, listener);
+    }
+
+    /**
+     * Bridge listener. You can reuse old-style {@link 
org.apache.curator.framework.recipes.cache.NodeCacheListener}s
+     * with CuratorCache.
+     *
+     * @param listener the listener to wrap
+     * @return a CuratorCacheListener that forwards to the given listener
+     */
+    static CuratorCacheListener wrap(NodeCacheListener listener)
+    {
+        return new NodeCacheListenerWrapper(listener);
+    }
+
+}
diff --git 
a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/CuratorCacheStorage.java
 
b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/CuratorCacheStorage.java
new file mode 100644
index 0000000..1044a4a
--- /dev/null
+++ 
b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/CuratorCacheStorage.java
@@ -0,0 +1,140 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.curator.framework.recipes.cache;
+
+import java.io.Closeable;
+import java.util.AbstractMap;
+import java.util.Map;
+import java.util.Optional;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+/**
+ * Interface for maintaining data in a {@link 
org.apache.curator.framework.recipes.cache.CuratorCache}
+ */
+public interface CuratorCacheStorage extends Closeable
+{
+    /**
+     * Return a new standard storage instance
+     *
+     * @return storage instance
+     */
+    static CuratorCacheStorage standard() {
+        return new StandardCuratorCacheStorage(true);
+    }
+
+    /**
+     * Return a new storage instance that does not retain the data bytes. i.e. 
ChildData objects
+     * returned by this storage will always return {@code null} for {@link 
ChildData#getData()}.
+     *
+     * @return storage instance that does not retain data bytes
+     */
+    static CuratorCacheStorage bytesNotCached() {
+        return new StandardCuratorCacheStorage(false);
+    }
+
+    /**
+     * Returns a new copy of this storage that does not clear its internal 
data when it is closed.
+     * Useful for retaining the cache after it is closed.
+     *
+     * @return new copy that does not clear on close
+     */
+    CuratorCacheStorage doNotClearOnClose();
+
+    /**
+     * Add an entry to storage and return any previous entry at that path
+     *
+     * @param data entry to add
+     * @return previous entry or {@code empty()}
+     */
+    Optional<ChildData> put(ChildData data);
+
+    /**
+     * Remove the entry from storage and return any previous entry at that path
+     *
+     * @param path path to remove
+     * @return previous entry or {@code empty()}
+     */
+    Optional<ChildData> remove(String path);
+
+    /**
+     * Return an entry from storage
+     *
+     * @param path path to get
+     * @return entry or {@code empty()}
+     */
+    Optional<ChildData> get(String path);
+
+    /**
+     * Return true if the storage currently has an entry for the given path
+     *
+     * @param path path to check
+     * @return true/false
+     */
+    boolean containsPath(String path);
+
+    /**
+     * Return the current number of entries in storage
+     *
+     * @return number of entries
+     */
+    int size();
+
+    /**
+     * Return a stream over the storage entries. Note: for a standard storage 
instance, the stream
+     * behaves like a stream returned by {@link 
java.util.concurrent.ConcurrentHashMap#entrySet()}
+     *
+     * @return stream over entries
+     */
+    Stream<ChildData> stream();
+
+    /**
+     * Return a stream over the storage entries that are the immediate 
children of the given node.
+     *
+     * @return stream over entries
+     */
+    Stream<ChildData> streamImmediateChildren(String fromParent);
+
+    /**
+     * Utility - given a stream of child nodes, build a map. Note: it is 
assumed that each child
+     * data in the stream has a unique path
+     *
+     * @param stream stream of child nodes with unique paths
+     * @return map
+     */
+    static Map<String, ChildData> toMap(Stream<ChildData> stream)
+    {
+        return stream.map(data -> new 
AbstractMap.SimpleEntry<>(data.getPath(), data))
+            .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
+    }
+
+    /**
+     * Reset the storage to zero entries
+     */
+    void clear();
+
+    /**
+     * Close the storage. For a standard storage instance, the storage is 
cleared.
+     */
+    @Override
+    default void close()
+    {
+        clear();
+    }
+}
diff --git 
a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/NodeCache.java
 
b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/NodeCache.java
index 9687e1b..f730f78 100644
--- 
a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/NodeCache.java
+++ 
b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/NodeCache.java
@@ -53,7 +53,10 @@ import java.util.concurrent.atomic.AtomicReference;
  * <p><b>IMPORTANT</b> - it's not possible to stay transactionally in sync. 
Users of this class must
  * be prepared for false-positives and false-negatives. Additionally, always 
use the version number
  * when updating data to avoid overwriting another process' change.</p>
+ *
+ * @deprecated Use {@link CuratorCache}
  */
+@Deprecated
 public class NodeCache implements Closeable
 {
     private final Logger log = LoggerFactory.getLogger(getClass());
diff --git 
a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/NodeCacheListenerWrapper.java
 
b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/NodeCacheListenerWrapper.java
new file mode 100644
index 0000000..89360aa
--- /dev/null
+++ 
b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/NodeCacheListenerWrapper.java
@@ -0,0 +1,47 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.curator.framework.recipes.cache;
+
+public class NodeCacheListenerWrapper implements CuratorCacheListener
+{
+    private final NodeCacheListener listener;
+
+    public NodeCacheListenerWrapper(NodeCacheListener listener)
+    {
+        this.listener = listener;
+    }
+
+    @Override
+    public void nodeChanged(ChildData oldNode, ChildData node)
+    {
+        callListener();
+    }
+
+    private void callListener()
+    {
+        try
+        {
+            listener.nodeChanged();
+        }
+        catch ( Exception e )
+        {
+            throw new RuntimeException(e);
+        }
+    }
+}
diff --git 
a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/PathChildrenCache.java
 
b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/PathChildrenCache.java
index bdc73cc..eb37936 100644
--- 
a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/PathChildrenCache.java
+++ 
b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/PathChildrenCache.java
@@ -64,8 +64,11 @@ import java.util.concurrent.atomic.AtomicReference;
  * <p><b>IMPORTANT</b> - it's not possible to stay transactionally in sync. 
Users of this class must
  * be prepared for false-positives and false-negatives. Additionally, always 
use the version number
  * when updating data to avoid overwriting another process' change.</p>
+ *
+ * @deprecated Use {@link CuratorCache}
  */
 @SuppressWarnings("NullableProblems")
+@Deprecated
 public class PathChildrenCache implements Closeable
 {
     private final Logger log = LoggerFactory.getLogger(getClass());
diff --git 
a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/PathChildrenCacheListenerWrapper.java
 
b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/PathChildrenCacheListenerWrapper.java
new file mode 100644
index 0000000..0393204
--- /dev/null
+++ 
b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/PathChildrenCacheListenerWrapper.java
@@ -0,0 +1,71 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.curator.framework.recipes.cache;
+
+import org.apache.curator.framework.CuratorFramework;
+
+public class PathChildrenCacheListenerWrapper implements CuratorCacheListener
+{
+    private final PathChildrenCacheListener listener;
+    private final CuratorFramework client;
+
+    public PathChildrenCacheListenerWrapper(PathChildrenCacheListener 
listener, CuratorFramework client)
+    {
+        this.listener = listener;
+        this.client = client;
+    }
+
+    @Override
+    public void nodeChanged(ChildData oldData, ChildData node)
+    {
+        switch ( CuratorCacheListener.Type.get(oldData, node) )
+        {
+            case NODE_CREATED:
+            {
+                sendEvent(node, PathChildrenCacheEvent.Type.CHILD_ADDED);
+                break;
+            }
+
+            case NODE_CHANGED:
+            {
+                sendEvent(node, PathChildrenCacheEvent.Type.CHILD_UPDATED);
+                break;
+            }
+
+            case NODE_DELETED:
+            {
+                sendEvent(oldData, PathChildrenCacheEvent.Type.CHILD_REMOVED);
+                break;
+            }
+        }
+    }
+
+    private void sendEvent(ChildData node, PathChildrenCacheEvent.Type type)
+    {
+        PathChildrenCacheEvent event = new PathChildrenCacheEvent(type, node);
+        try
+        {
+            listener.childEvent(client, event);
+        }
+        catch ( Exception e )
+        {
+            throw new RuntimeException(e);
+        }
+    }
+}
diff --git 
a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/StandardCuratorCacheStorage.java
 
b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/StandardCuratorCacheStorage.java
new file mode 100644
index 0000000..d92eddf
--- /dev/null
+++ 
b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/StandardCuratorCacheStorage.java
@@ -0,0 +1,111 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.curator.framework.recipes.cache;
+
+import org.apache.curator.utils.ZKPaths;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.stream.Stream;
+
+class StandardCuratorCacheStorage implements CuratorCacheStorage
+{
+    private final Map<String, ChildData> dataMap;
+    private final boolean cacheBytes;
+
+    StandardCuratorCacheStorage(boolean cacheBytes)
+    {
+        this(new ConcurrentHashMap<>(), cacheBytes);
+    }
+
+    @Override
+    public CuratorCacheStorage doNotClearOnClose()
+    {
+        Map<String, ChildData> copyMap = new ConcurrentHashMap<>(dataMap);
+        return new StandardCuratorCacheStorage(copyMap, cacheBytes)
+        {
+            @Override
+            public void close()
+            {
+                // NOP
+            }
+        };
+    }
+
+    @Override
+    public Optional<ChildData> put(ChildData data)
+    {
+        ChildData localData = cacheBytes ? data : new 
ChildData(data.getPath(), data.getStat(), null);
+        return Optional.ofNullable(dataMap.put(data.getPath(), localData));
+    }
+
+    @Override
+    public Optional<ChildData> remove(String path)
+    {
+        return Optional.ofNullable(dataMap.remove(path));
+    }
+
+    @Override
+    public Optional<ChildData> get(String path)
+    {
+        return Optional.ofNullable(dataMap.get(path));
+    }
+
+    @Override
+    public boolean containsPath(String path)
+    {
+        return dataMap.containsKey(path);
+    }
+
+    @Override
+    public int size()
+    {
+        return dataMap.size();
+    }
+
+    @Override
+    public Stream<ChildData> stream()
+    {
+        return dataMap.values().stream();
+    }
+
+    @Override
+    public Stream<ChildData> streamImmediateChildren(String fromParent)
+    {
+        return dataMap.entrySet()
+            .stream()
+            .filter(entry -> {
+                ZKPaths.PathAndNode pathAndNode = 
ZKPaths.getPathAndNode(entry.getKey());
+                return pathAndNode.getPath().equals(fromParent);
+            })
+            .map(Map.Entry::getValue);
+    }
+
+    @Override
+    public void clear()
+    {
+        dataMap.clear();
+    }
+
+    private StandardCuratorCacheStorage(Map<String, ChildData> dataMap, 
boolean cacheBytes)
+    {
+        this.dataMap = dataMap;
+        this.cacheBytes = cacheBytes;
+    }
+}
diff --git 
a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/TreeCache.java
 
b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/TreeCache.java
index f42c1d5..e321eba 100644
--- 
a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/TreeCache.java
+++ 
b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/TreeCache.java
@@ -71,7 +71,10 @@ import static 
org.apache.curator.utils.PathUtils.validatePath;
  * <p><b>IMPORTANT</b> - it's not possible to stay transactionally in sync. 
Users of this class must
  * be prepared for false-positives and false-negatives. Additionally, always 
use the version number
  * when updating data to avoid overwriting another process' change.</p>
+ *
+ * @deprecated Use {@link CuratorCache}
  */
+@Deprecated
 public class TreeCache implements Closeable
 {
     private static final Logger LOG = LoggerFactory.getLogger(TreeCache.class);
diff --git 
a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/TreeCacheListenerWrapper.java
 
b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/TreeCacheListenerWrapper.java
new file mode 100644
index 0000000..0c3a75b
--- /dev/null
+++ 
b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/TreeCacheListenerWrapper.java
@@ -0,0 +1,71 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.curator.framework.recipes.cache;
+
+import org.apache.curator.framework.CuratorFramework;
+
+public class TreeCacheListenerWrapper implements CuratorCacheListener
+{
+    private final CuratorFramework client;
+    private final TreeCacheListener listener;
+
+    public TreeCacheListenerWrapper(CuratorFramework client, TreeCacheListener 
listener)
+    {
+        this.client = client;
+        this.listener = listener;
+    }
+
+    @Override
+    public void nodeChanged(ChildData oldData, ChildData node)
+    {
+        switch ( CuratorCacheListener.Type.get(oldData, node) )
+        {
+        case NODE_CREATED:
+        {
+            sendEvent(node, TreeCacheEvent.Type.NODE_ADDED);
+            break;
+        }
+
+        case NODE_CHANGED:
+        {
+            sendEvent(node, TreeCacheEvent.Type.NODE_UPDATED);
+            break;
+        }
+
+        case NODE_DELETED:
+        {
+            sendEvent(oldData, TreeCacheEvent.Type.NODE_REMOVED);
+            break;
+        }
+        }
+    }
+
+    private void sendEvent(ChildData node, TreeCacheEvent.Type type)
+    {
+        TreeCacheEvent event = new TreeCacheEvent(type, node);
+        try
+        {
+            listener.childEvent(client, event);
+        }
+        catch ( Exception e )
+        {
+            throw new RuntimeException(e);
+        }
+    }
+}
diff --git 
a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/PersistentWatcher.java
 
b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/PersistentWatcher.java
index 2c97490..40174fa 100644
--- 
a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/PersistentWatcher.java
+++ 
b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/PersistentWatcher.java
@@ -1,4 +1,22 @@
 /**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/**
   * Licensed to the Apache Software Foundation (ASF) under one
   * or more contributor license agreements.  See the NOTICE file
   * distributed with this work for additional information
@@ -43,6 +61,7 @@ import java.util.concurrent.atomic.AtomicReference;
      private final Logger log = LoggerFactory.getLogger(getClass());
      private final AtomicReference<State> state = new 
AtomicReference<>(State.LATENT);
      private final StandardListenerManager<Watcher> listeners = 
StandardListenerManager.standard();
+     private final StandardListenerManager<Runnable> resetListeners = 
StandardListenerManager.standard();
      private final ConnectionStateListener connectionStateListener = (client, 
newState) -> {
          if ( newState.isConnected() )
          {
@@ -115,13 +134,29 @@ import java.util.concurrent.atomic.AtomicReference;
          return listeners;
      }
 
+     /**
+      * Listeners are called when the persistent watcher has been successfully 
registered
+      * or re-registered after a connection disruption
+      *
+      * @return listener container
+      */
+     public StandardListenerManager<Runnable> getResetListenable()
+     {
+         return resetListeners;
+     }
+
      private void reset()
      {
          try
          {
              BackgroundCallback callback = (__, event) -> {
-                 if ( event.getResultCode() != 
KeeperException.Code.OK.intValue() ) {
-                     client.runSafe(this::reset);
+                 if ( event.getResultCode() != 
KeeperException.Code.OK.intValue() )
+                 {
+                     reset();
+                 }
+                 else
+                 {
+                     resetListeners.forEach(Runnable::run);
                  }
              };
              client.addWatch().withMode(recursive ? 
AddWatchMode.PERSISTENT_RECURSIVE : 
AddWatchMode.PERSISTENT).inBackground(callback).usingWatcher(watcher).forPath(basePath);
diff --git 
a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestCuratorCache.java
 
b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestCuratorCache.java
new file mode 100644
index 0000000..ae7a1d8
--- /dev/null
+++ 
b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestCuratorCache.java
@@ -0,0 +1,110 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.curator.framework.recipes.cache;
+
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.CuratorFrameworkFactory;
+import org.apache.curator.framework.state.ConnectionState;
+import org.apache.curator.retry.RetryOneTime;
+import org.apache.curator.test.BaseClassForTests;
+import org.apache.curator.test.InstanceSpec;
+import org.apache.curator.test.TestingCluster;
+import org.apache.curator.test.compatibility.Timing2;
+import org.apache.curator.test.compatibility.Zk35MethodInterceptor;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+import java.util.concurrent.CountDownLatch;
+
+import static 
org.apache.curator.framework.recipes.cache.CuratorCache.Options.RECURSIVE;
+
+@Test(groups = Zk35MethodInterceptor.zk35Group)
+public class TestCuratorCache extends BaseClassForTests
+{
+    private final Timing2 timing = new Timing2();
+
+    @Test
+    public void testServerLoss() throws Exception   // mostly copied from 
TestPathChildrenCacheInCluster
+    {
+        try ( TestingCluster cluster = new TestingCluster(3) )
+        {
+            cluster.start();
+
+            try ( CuratorFramework client = 
CuratorFrameworkFactory.newClient(cluster.getConnectString(), timing.session(), 
timing.connection(), new RetryOneTime(1)) )
+            {
+                client.start();
+                client.create().creatingParentsIfNeeded().forPath("/test");
+
+                try (CuratorCache cache = CuratorCache.build(client, "/test", 
RECURSIVE))
+                {
+                    cache.start();
+
+                    CountDownLatch reconnectLatch = new CountDownLatch(1);
+                    client.getConnectionStateListenable().addListener((__, 
newState) -> {
+                        if ( newState == ConnectionState.RECONNECTED )
+                        {
+                            reconnectLatch.countDown();
+                        }
+                    });
+                    CountDownLatch latch = new CountDownLatch(3);
+                    cache.listenable().addListener((__, ___) -> 
latch.countDown());
+
+                    client.create().forPath("/test/one");
+                    client.create().forPath("/test/two");
+                    client.create().forPath("/test/three");
+
+                    Assert.assertTrue(timing.awaitLatch(latch));
+
+                    InstanceSpec connectionInstance = 
cluster.findConnectionInstance(client.getZookeeperClient().getZooKeeper());
+                    cluster.killServer(connectionInstance);
+
+                    Assert.assertTrue(timing.awaitLatch(reconnectLatch));
+
+                    timing.sleepABit();
+
+                    Assert.assertEquals(cache.storage().stream().count(), 4);
+                }
+            }
+        }
+    }
+
+    @Test
+    public void testUpdateWhenNotCachingData() throws Exception // mostly 
copied from TestPathChildrenCache
+    {
+        CuratorCacheStorage storage = new StandardCuratorCacheStorage(false);
+        try ( CuratorFramework client = 
CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), 
timing.connection(), new RetryOneTime(1)) )
+        {
+            client.start();
+            final CountDownLatch updatedLatch = new CountDownLatch(1);
+            final CountDownLatch addedLatch = new CountDownLatch(1);
+            client.create().creatingParentsIfNeeded().forPath("/test");
+            try (CuratorCache cache = CuratorCache.build(client, storage, 
"/test", RECURSIVE))
+            {
+                
cache.listenable().addListener(CuratorCacheListener.forChanges((__, ___) -> 
updatedLatch.countDown()));
+                
cache.listenable().addListener(CuratorCacheListener.forCreates(__ -> 
addedLatch.countDown()));
+                cache.start();
+
+                client.create().forPath("/test/foo", "first".getBytes());
+                Assert.assertTrue(timing.awaitLatch(addedLatch));
+
+                client.setData().forPath("/test/foo", "something 
new".getBytes());
+                Assert.assertTrue(timing.awaitLatch(updatedLatch));
+            }
+        }
+    }
+}
diff --git 
a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestCuratorCacheConsistency.java
 
b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestCuratorCacheConsistency.java
new file mode 100644
index 0000000..dcb1ff3
--- /dev/null
+++ 
b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestCuratorCacheConsistency.java
@@ -0,0 +1,384 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.curator.framework.recipes.cache;
+
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.CuratorFrameworkFactory;
+import org.apache.curator.retry.ExponentialBackoffRetry;
+import org.apache.curator.test.BaseClassForTests;
+import org.apache.curator.test.InstanceSpec;
+import org.apache.curator.test.TestingCluster;
+import org.apache.curator.test.compatibility.Timing2;
+import org.apache.curator.test.compatibility.Zk35MethodInterceptor;
+import org.apache.curator.utils.ZKPaths;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+import java.io.Closeable;
+import java.time.Duration;
+import java.time.Instant;
+import java.util.AbstractMap;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+import static 
org.apache.curator.framework.recipes.cache.CuratorCache.Options.RECURSIVE;
+
+/**
+ * Randomly create nodes in a tree while a set of CuratorCaches listens. 
Afterwards, validate
+ * that the caches contain the same values as ZK itself
+ */
+@Test(groups = Zk35MethodInterceptor.zk35Group)
+public class TestCuratorCacheConsistency extends BaseClassForTests
+{
+    private final Logger log = LoggerFactory.getLogger(getClass());
+    private final ThreadLocalRandom random = ThreadLocalRandom.current();
+
+    private static final Timing2 timing = new Timing2();
+    private static final Duration testLength = Duration.ofSeconds(30);
+    private static final Duration thirdOfTestLength = 
Duration.ofMillis(testLength.toMillis() / 3);
+    private static final Duration sleepLength = Duration.ofMillis(5);
+    private static final int nodesPerLevel = 10;
+    private static final int clusterSize = 5;
+    private static final int maxServerKills = 2;
+
+    private static final String BASE_PATH = "/test";
+
+    private static class Client implements Closeable
+    {
+        private final CuratorFramework client;
+        private final CuratorCache cache;
+        private final int index;
+        private final Map<String, ChildData> listenerDataMap = new HashMap<>();
+
+        Client(int index, String connectionString, AtomicReference<Exception> 
errorSignal)
+        {
+            this.index = index;
+            client = buildClient(connectionString);
+            cache = CuratorCache.build(client, 
CuratorCacheStorage.standard().doNotClearOnClose(), BASE_PATH, RECURSIVE);
+            cache.setExceptionHandler(errorSignal::set);
+
+            // listenerDataMap is a local data map that will hold values sent 
by listeners
+            // this way, the listener code can be tested for validity and 
consistency
+            cache.listenable().addListener(CuratorCacheListener.forAll(
+                // creates
+                node -> {
+                    ChildData previous = listenerDataMap.put(node.getPath(), 
node);
+                    if ( previous != null )
+                    {
+                        errorSignal.set(new Exception(String.format("Client: 
%d - Create for existing node: %s", index, node.getPath())));
+                    }
+                },
+
+                // changes
+                (oldNode, node) -> {
+                    ChildData previous = listenerDataMap.put(node.getPath(), 
node);
+                    if ( (previous == null) || 
!Arrays.equals(previous.getData(), oldNode.getData()) )
+                    {
+                        errorSignal.set(new Exception(String.format("Client: 
%d - Bad old value for change node: %s", index, node.getPath())));
+                    }
+                },
+
+                // deletes
+                node -> {
+                    ChildData previous = 
listenerDataMap.remove(node.getPath());
+                    if ( previous == null )
+                    {
+                        errorSignal.set(new Exception(String.format("Client: 
%d - Delete for non-existent node: %s", index, node.getPath())));
+                    }
+                }
+            ));
+        }
+
+        void start()
+        {
+            client.start();
+            cache.start();
+        }
+
+        @Override
+        public void close()
+        {
+            cache.close();
+            client.close();
+        }
+    }
+
+    @Test
+    public void testConsistencyAfterSimulation() throws Exception
+    {
+        int clientQty = random.nextInt(10, 20);
+        int maxDepth = random.nextInt(5, 10);
+
+        log.info("clientQty: {}, maxDepth: {}", clientQty, maxDepth);
+
+        List<Client> clients = Collections.emptyList();
+        Map<String, String> actualTree;
+
+        AtomicReference<Exception> errorSignal = new AtomicReference<>();
+        try (TestingCluster cluster = new TestingCluster(clusterSize))
+        {
+            cluster.start();
+
+            initializeBasePath(cluster);
+            try
+            {
+                clients = buildClients(cluster, clientQty, errorSignal);
+                workLoop(cluster, clients, maxDepth, errorSignal);
+
+                log.info("Test complete - sleeping to allow events to 
complete");
+                timing.sleepABit();
+            }
+            finally
+            {
+                clients.forEach(Client::close);
+            }
+
+            actualTree = buildActual(cluster);
+        }
+
+        log.info("client qty: {}", clientQty);
+
+        Map<Integer, List<String>> errorsList = clients.stream()
+            .map(client ->  findErrors(client, actualTree))
+            .filter(errorsEntry -> !errorsEntry.getValue().isEmpty())
+            .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
+        if ( !errorsList.isEmpty() )
+        {
+            log.error("{} clients had errors", errorsList.size());
+            errorsList.forEach((index, errorList) -> {
+                log.error("Client {}", index);
+                errorList.forEach(log::error);
+                log.error("");
+            });
+
+            Assert.fail("Errors found");
+        }
+    }
+
+    // build a data map recursively from the actual values in ZK
+    private Map<String, String> buildActual(TestingCluster cluster)
+    {
+        Map<String, String> actual = new HashMap<>();
+        try (CuratorFramework client = buildClient(cluster.getConnectString()))
+        {
+            client.start();
+            buildActual(client, actual, BASE_PATH);
+        }
+        return actual;
+    }
+
+    private void buildActual(CuratorFramework client, Map<String, String> 
actual, String fromPath)
+    {
+        try
+        {
+            byte[] bytes = client.getData().forPath(fromPath);
+            actual.put(fromPath, new String(bytes));
+            client.getChildren().forPath(fromPath).forEach(child -> 
buildActual(client, actual, ZKPaths.makePath(fromPath, child)));
+        }
+        catch ( Exception e )
+        {
+            Assert.fail("", e);
+        }
+    }
+
+    private List<Client> buildClients(TestingCluster cluster, int clientQty, 
AtomicReference<Exception> errorSignal)
+    {
+        return IntStream.range(0, clientQty)
+            .mapToObj(index -> new Client(index, cluster.getConnectString(), 
errorSignal))
+            .peek(Client::start)
+            .collect(Collectors.toList());
+    }
+
+    private void initializeBasePath(TestingCluster cluster) throws Exception
+    {
+        try (CuratorFramework client = buildClient(cluster.getConnectString()))
+        {
+            client.start();
+            client.create().forPath(BASE_PATH, "".getBytes());
+        }
+    }
+
+    private void workLoop(TestingCluster cluster, List<Client> clients, int 
maxDepth, AtomicReference<Exception> errorSignal) throws Exception
+    {
+        Instant start = Instant.now();
+        Instant lastServerKill = Instant.now();
+        int serverKillIndex = 0;
+        while ( true )
+        {
+            Duration elapsed = Duration.between(start, Instant.now());
+            if ( elapsed.compareTo(testLength) >= 0 )
+            {
+                break;
+            }
+
+            Exception errorSignalException = errorSignal.get();
+            if ( errorSignalException != null )
+            {
+                Assert.fail("A client's error handler was called", 
errorSignalException);
+            }
+
+            Duration elapsedFromLastServerKill = 
Duration.between(lastServerKill, Instant.now());
+            if ( elapsedFromLastServerKill.compareTo(thirdOfTestLength) >= 0 )
+            {
+                lastServerKill = Instant.now();
+                if ( serverKillIndex < maxServerKills )
+                {
+                    doKillServer(cluster, serverKillIndex++);
+                }
+            }
+
+            int thisDepth = random.nextInt(0, maxDepth);
+            String thisPath = randomPath(thisDepth);
+            CuratorFramework client = randomClient(clients);
+            if ( random.nextBoolean() )
+            {
+                doDelete(client, thisPath);
+            }
+            else
+            {
+                doChange(client, thisPath);
+            }
+
+            Thread.sleep(sleepLength.toMillis());
+        }
+    }
+
+    private void doChange(CuratorFramework client, String thisPath)
+    {
+        try
+        {
+            String thisData = Long.toString(random.nextLong());
+            
client.create().orSetData().creatingParentsIfNeeded().forPath(thisPath, 
thisData.getBytes());
+        }
+        catch ( Exception e )
+        {
+            Assert.fail("Could not create/set: " + thisPath);
+        }
+    }
+
+    private void doDelete(CuratorFramework client, String thisPath)
+    {
+        if ( thisPath.equals(BASE_PATH) )
+        {
+            return;
+        }
+        try
+        {
+            
client.delete().quietly().deletingChildrenIfNeeded().forPath(thisPath);
+        }
+        catch ( Exception e )
+        {
+            Assert.fail("Could not delete: " + thisPath);
+        }
+    }
+
+    private void doKillServer(TestingCluster cluster, int serverKillIndex) 
throws Exception
+    {
+        log.info("Killing server {}", serverKillIndex);
+        InstanceSpec killSpec = new 
ArrayList<>(cluster.getInstances()).get(serverKillIndex);
+        cluster.killServer(killSpec);
+    }
+
+    private CuratorFramework randomClient(List<Client> clients)
+    {
+        return clients.get(random.nextInt(clients.size())).client;
+    }
+
+    private Map.Entry<Integer, List<String>> findErrors(Client client, 
Map<String, String> tree)
+    {
+        CuratorCacheStorage storage = client.cache.storage();
+        List<String> errors = new ArrayList<>();
+        if ( tree.size() != storage.size() )
+        {
+            errors.add(String.format("Size mismatch. Expected: %d - Actual: 
%d", tree.size(), storage.size()));
+        }
+        tree.keySet().forEach(path -> {
+            if ( !storage.containsPath(path) )
+            {
+                errors.add(String.format("Path %s in master but not client", 
path));
+            }
+        });
+        storage.stream().forEach(data -> {
+            String treeValue = tree.get(data.getPath());
+            if ( treeValue != null )
+            {
+                if ( !treeValue.equals(new String(data.getData())) )
+                {
+                    errors.add(String.format("Data at %s is not the same", 
data.getPath()));
+                }
+
+                ChildData listenersMapData = 
client.listenerDataMap.get(data.getPath());
+                if ( listenersMapData == null )
+                {
+                    errors.add(String.format("listenersMap missing data at: 
%s", data.getPath()));
+                }
+                else if ( !treeValue.equals(new 
String(listenersMapData.getData())) )
+                {
+                    errors.add(String.format("Data at %s in listenersMap is 
not the same", data.getPath()));
+                }
+            }
+            else
+            {
+                errors.add(String.format("Path %s in client but not master", 
data.getPath()));
+            }
+        });
+
+        client.listenerDataMap.keySet().forEach(path -> {
+            if ( !storage.containsPath(path) )
+            {
+                errors.add(String.format("Path %s in listenersMap but not 
storage", path));
+            }
+        });
+
+        return new AbstractMap.SimpleEntry<>(client.index, errors);
+    }
+
+    private String randomPath(int depth)
+    {
+        StringBuilder str = new StringBuilder(BASE_PATH);
+        while ( depth-- > 0 )
+        {
+            int levelNodeName = random.nextInt(nodesPerLevel);
+            str.append("/").append(levelNodeName);
+        }
+        return str.toString();
+    }
+
+    @Override
+    protected void createServer()
+    {
+        // do nothing - we'll be using TestingCluster instead
+    }
+
+    private static CuratorFramework buildClient(String connectionString)
+    {
+        ExponentialBackoffRetry retryPolicy = new ExponentialBackoffRetry(100, 
100);
+        return CuratorFrameworkFactory.newClient(connectionString, 
timing.session(), timing.connection(), retryPolicy);
+    }
+}
diff --git 
a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestCuratorCacheEventOrdering.java
 
b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestCuratorCacheEventOrdering.java
new file mode 100644
index 0000000..915e6aa
--- /dev/null
+++ 
b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestCuratorCacheEventOrdering.java
@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.curator.framework.recipes.cache;
+
+import org.apache.curator.framework.CuratorFramework;
+import java.util.concurrent.BlockingQueue;
+
+public class TestCuratorCacheEventOrdering extends 
TestEventOrdering<CuratorCache>
+{
+    @Override
+    protected int getActualQty(CuratorCache cache)
+    {
+        return cache.storage().size();
+    }
+
+    @Override
+    protected CuratorCache newCache(CuratorFramework client, String path, 
BlockingQueue<Event> events)
+    {
+        CuratorCache cache = CuratorCache.build(client, path, 
CuratorCache.Options.RECURSIVE);
+        cache.listenable().addListener((oldNode, node) -> {
+            if ( (oldNode == null) && (node != null) )
+            {
+                events.add(new Event(EventType.ADDED, node.getPath()));
+            }
+            else if ( (oldNode != null) && (node == null) )
+            {
+                events.add(new Event(EventType.DELETED, oldNode.getPath()));
+            }
+        });
+        cache.start();
+        return cache;
+    }
+}
diff --git 
a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestCuratorCacheWrappers.java
 
b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestCuratorCacheWrappers.java
new file mode 100644
index 0000000..95f8fbc
--- /dev/null
+++ 
b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestCuratorCacheWrappers.java
@@ -0,0 +1,114 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.curator.framework.recipes.cache;
+
+import com.google.common.collect.ImmutableSet;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.CuratorFrameworkFactory;
+import org.apache.curator.retry.RetryOneTime;
+import org.apache.curator.test.BaseClassForTests;
+import org.apache.curator.test.compatibility.Timing2;
+import org.apache.curator.test.compatibility.Zk35MethodInterceptor;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+
+import static 
org.apache.curator.framework.recipes.cache.CuratorCache.Options.RECURSIVE;
+import static 
org.apache.curator.framework.recipes.cache.CuratorCacheStorage.toMap;
+
+@Test(groups = Zk35MethodInterceptor.zk35Group)
+public class TestCuratorCacheWrappers extends BaseClassForTests
+{
+    private final Timing2 timing = new Timing2();
+
+    @Test
+    public void testPathChildrenCache() throws Exception    // copied from 
TestPathChildrenCache#testBasics()
+    {
+        try ( CuratorFramework client = 
CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), 
timing.connection(), new RetryOneTime(1)) )
+        {
+            client.start();
+            client.create().forPath("/test");
+
+            final BlockingQueue<PathChildrenCacheEvent.Type> events = new 
LinkedBlockingQueue<>();
+            try ( CuratorCache cache = CuratorCache.build(client, "/test", 
RECURSIVE) )
+            {
+                PathChildrenCacheListener listener = (__, event) -> {
+                    if ( event.getData().getPath().equals("/test/one") )
+                    {
+                        events.offer(event.getType());
+                    }
+                };
+                
cache.listenable().addListener(CuratorCacheListener.wrap(client, listener));
+                cache.start();
+
+                client.create().forPath("/test/one", "hey there".getBytes());
+                Assert.assertEquals(events.poll(timing.forWaiting().seconds(), 
TimeUnit.SECONDS), PathChildrenCacheEvent.Type.CHILD_ADDED);
+
+                client.setData().forPath("/test/one", "sup!".getBytes());
+                Assert.assertEquals(events.poll(timing.forWaiting().seconds(), 
TimeUnit.SECONDS), PathChildrenCacheEvent.Type.CHILD_UPDATED);
+                Assert.assertEquals(new 
String(cache.storage().get("/test/one").orElse(null).getData()), "sup!");
+
+                client.delete().forPath("/test/one");
+                Assert.assertEquals(events.poll(timing.forWaiting().seconds(), 
TimeUnit.SECONDS), PathChildrenCacheEvent.Type.CHILD_REMOVED);
+            }
+        }
+    }
+
+    @Test
+    public void testTreeCache() throws Exception    // copied from 
TestTreeCache#testBasics()
+    {
+        BaseTestTreeCache treeCacheBase = new BaseTestTreeCache();
+        try ( CuratorFramework client = 
CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), 
timing.connection(), new RetryOneTime(1)) )
+        {
+            client.start();
+            client.create().forPath("/test");
+
+            try ( CuratorCache cache = CuratorCache.build(client, "/test", 
RECURSIVE) )
+            {
+                
cache.listenable().addListener(CuratorCacheListener.wrap(client, 
treeCacheBase.eventListener));
+                cache.start();
+
+                treeCacheBase.assertEvent(TreeCacheEvent.Type.NODE_ADDED, 
"/test");
+                // assertEvent(TreeCacheEvent.Type.INITIALIZED); CuratorCache 
doesn't support this
+                
Assert.assertEquals(toMap(cache.storage().streamImmediateChildren("/test")).keySet(),
 ImmutableSet.of());
+                
Assert.assertEquals(cache.storage().streamImmediateChildren("/t").count(), 0);
+                
Assert.assertEquals(cache.storage().streamImmediateChildren("/testing").count(),
 0);
+
+                client.create().forPath("/test/one", "hey there".getBytes());
+                treeCacheBase.assertEvent(TreeCacheEvent.Type.NODE_ADDED, 
"/test/one");
+                
Assert.assertEquals(toMap(cache.storage().streamImmediateChildren("/test")).keySet(),
 ImmutableSet.of("/test/one"));
+                Assert.assertEquals(new 
String(cache.storage().get("/test/one").orElse(null).getData()), "hey there");
+                
Assert.assertEquals(toMap(cache.storage().streamImmediateChildren("/test/one")).keySet(),
 ImmutableSet.of());
+                
Assert.assertEquals(cache.storage().streamImmediateChildren("/test/o").count(), 
0);
+                
Assert.assertEquals(cache.storage().streamImmediateChildren("/test/onely").count(),
 0);
+
+                client.setData().forPath("/test/one", "sup!".getBytes());
+                treeCacheBase.assertEvent(TreeCacheEvent.Type.NODE_UPDATED, 
"/test/one");
+                
Assert.assertEquals(toMap(cache.storage().streamImmediateChildren("/test")).keySet(),
 ImmutableSet.of("/test/one"));
+                Assert.assertEquals(new 
String(cache.storage().get("/test/one").orElse(null).getData()), "sup!");
+
+                client.delete().forPath("/test/one");
+                treeCacheBase.assertEvent(TreeCacheEvent.Type.NODE_REMOVED, 
"/test/one", "sup!".getBytes());
+                
Assert.assertEquals(toMap(cache.storage().streamImmediateChildren("/test")).keySet(),
 ImmutableSet.of());
+            }
+        }
+    }
+}
diff --git 
a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestWrappedNodeCache.java
 
b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestWrappedNodeCache.java
new file mode 100644
index 0000000..d595e3c
--- /dev/null
+++ 
b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestWrappedNodeCache.java
@@ -0,0 +1,165 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.curator.framework.recipes.cache;
+
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.CuratorFrameworkFactory;
+import org.apache.curator.framework.imps.TestCleanState;
+import org.apache.curator.framework.recipes.cache.CuratorCache;
+import org.apache.curator.framework.recipes.cache.CuratorCacheListener;
+import org.apache.curator.framework.recipes.cache.NodeCacheListener;
+import org.apache.curator.framework.state.ConnectionState;
+import org.apache.curator.retry.RetryOneTime;
+import org.apache.curator.test.BaseClassForTests;
+import org.apache.curator.test.compatibility.Timing2;
+import org.apache.curator.test.compatibility.Zk35MethodInterceptor;
+import org.apache.curator.utils.CloseableUtils;
+import org.apache.curator.utils.Compatibility;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Semaphore;
+
+import static 
org.apache.curator.framework.recipes.cache.CuratorCache.Options.RECURSIVE;
+
+@Test(groups = Zk35MethodInterceptor.zk35Group)
+public class TestWrappedNodeCache extends BaseClassForTests
+{
+    private final Timing2 timing = new Timing2();
+
+    @Test
+    public void testDeleteThenCreate() throws Exception
+    {
+        CuratorCache cache = null;
+        CuratorFramework client = 
CuratorFrameworkFactory.newClient(server.getConnectString(), new 
RetryOneTime(1));
+        try
+        {
+            client.start();
+            client.create().creatingParentsIfNeeded().forPath("/test/foo", 
"one".getBytes());
+
+            final Semaphore semaphore = new Semaphore(0);
+            cache = CuratorCache.build(client, "/test/foo", RECURSIVE);
+            NodeCacheListener listener = semaphore::release;
+            
cache.listenable().addListener(CuratorCacheListener.wrap(listener));
+
+            cache.start();
+            Assert.assertTrue(timing.acquireSemaphore(semaphore));
+
+            Assert.assertTrue(cache.getRootData().isPresent());
+            Assert.assertEquals(cache.getRootData().get().getData(), 
"one".getBytes());
+
+            client.delete().forPath("/test/foo");
+            Assert.assertTrue(timing.acquireSemaphore(semaphore));
+            client.create().forPath("/test/foo", "two".getBytes());
+            Assert.assertTrue(timing.acquireSemaphore(semaphore));
+
+            Assert.assertTrue(cache.getRootData().isPresent());
+            Assert.assertEquals(cache.getRootData().get().getData(), 
"two".getBytes());
+        }
+        finally
+        {
+            CloseableUtils.closeQuietly(cache);
+            TestCleanState.closeAndTestClean(client);
+        }
+    }
+
+    @Test
+    public void testKilledSession() throws Exception
+    {
+        CuratorCache cache = null;
+        CuratorFramework client = null;
+        try
+        {
+            client = 
CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), 
timing.connection(), new RetryOneTime(1));
+            client.start();
+            client.create().creatingParentsIfNeeded().forPath("/test/node", 
"start".getBytes());
+
+            CountDownLatch lostLatch = new CountDownLatch(1);
+            client.getConnectionStateListenable().addListener((__, newState) 
-> {
+                if ( newState == ConnectionState.LOST )
+                {
+                    lostLatch.countDown();
+                }
+            });
+
+            cache = CuratorCache.build(client,"/test/node", RECURSIVE);
+
+            Semaphore latch = new Semaphore(0);
+            NodeCacheListener listener = latch::release;
+            
cache.listenable().addListener(CuratorCacheListener.wrap(listener));
+
+            cache.start();
+            Assert.assertTrue(timing.acquireSemaphore(latch));
+
+            
Compatibility.injectSessionExpiration(client.getZookeeperClient().getZooKeeper());
+            Assert.assertTrue(timing.awaitLatch(lostLatch));
+
+            Assert.assertTrue(cache.getRootData().isPresent());
+            Assert.assertEquals(cache.getRootData().get().getData(), 
"start".getBytes());
+
+            client.setData().forPath("/test/node", "new data".getBytes());
+            Assert.assertTrue(timing.acquireSemaphore(latch));
+            Assert.assertTrue(cache.getRootData().isPresent());
+            Assert.assertEquals(cache.getRootData().get().getData(), "new 
data".getBytes());
+        }
+        finally
+        {
+            CloseableUtils.closeQuietly(cache);
+            TestCleanState.closeAndTestClean(client);
+        }
+    }
+
+    @Test
+    public void testBasics() throws Exception
+    {
+        CuratorCache cache = null;
+        CuratorFramework client = 
CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), 
timing.connection(), new RetryOneTime(1));
+        try
+        {
+            client.start();
+            client.create().forPath("/test");
+
+            cache = CuratorCache.build(client, "/test/node", RECURSIVE);
+            cache.start();
+
+            final Semaphore semaphore = new Semaphore(0);
+            NodeCacheListener listener = semaphore::release;
+            
cache.listenable().addListener(CuratorCacheListener.wrap(listener));
+
+            Assert.assertNull(cache.getRootData().orElse(null));
+
+            client.create().forPath("/test/node", "a".getBytes());
+            Assert.assertTrue(timing.acquireSemaphore(semaphore));
+            Assert.assertEquals(cache.getRootData().orElse(null).getData(), 
"a".getBytes());
+
+            client.setData().forPath("/test/node", "b".getBytes());
+            Assert.assertTrue(timing.acquireSemaphore(semaphore));
+            Assert.assertEquals(cache.getRootData().orElse(null).getData(), 
"b".getBytes());
+
+            client.delete().forPath("/test/node");
+            Assert.assertTrue(timing.acquireSemaphore(semaphore));
+            Assert.assertNull(cache.getRootData().orElse(null));
+        }
+        finally
+        {
+            CloseableUtils.closeQuietly(cache);
+            TestCleanState.closeAndTestClean(client);
+        }
+    }
+}
diff --git 
a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/watch/TestPersistentWatcher.java
 
b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/watch/TestPersistentWatcher.java
index df18de5..b3d5701 100644
--- 
a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/watch/TestPersistentWatcher.java
+++ 
b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/watch/TestPersistentWatcher.java
@@ -1,3 +1,21 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
 package org.apache.curator.framework.recipes.watch;
 
 import org.apache.curator.framework.CuratorFramework;
@@ -6,6 +24,7 @@ import org.apache.curator.framework.state.ConnectionState;
 import org.apache.curator.retry.RetryOneTime;
 import org.apache.curator.test.BaseClassForTests;
 import org.apache.curator.test.compatibility.Timing2;
+import org.apache.curator.test.compatibility.Zk35MethodInterceptor;
 import org.apache.zookeeper.WatchedEvent;
 import org.apache.zookeeper.Watcher;
 import org.testng.Assert;
@@ -14,6 +33,7 @@ import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.LinkedBlockingQueue;
 
+@Test(groups = Zk35MethodInterceptor.zk35Group)
 public class TestPersistentWatcher extends BaseClassForTests
 {
     private final Timing2 timing = new Timing2();
diff --git 
a/curator-test/src/main/java/org/apache/curator/test/TestingCluster.java 
b/curator-test/src/main/java/org/apache/curator/test/TestingCluster.java
index 3d38fe1..58da2c0 100644
--- a/curator-test/src/main/java/org/apache/curator/test/TestingCluster.java
+++ b/curator-test/src/main/java/org/apache/curator/test/TestingCluster.java
@@ -225,7 +225,7 @@ public class TestingCluster implements Closeable
      */
     public InstanceSpec findConnectionInstance(ZooKeeper client) throws 
Exception
     {
-        Method              m = 
client.getClass().getDeclaredMethod("testableRemoteSocketAddress");
+        Method              m = 
ZooKeeper.class.getDeclaredMethod("testableRemoteSocketAddress");
         m.setAccessible(true);
         InetSocketAddress   address = (InetSocketAddress)m.invoke(client);
         if ( address != null )
diff --git 
a/curator-x-async/src/main/java/org/apache/curator/x/async/api/AsyncPersistentWatchBuilder.java
 
b/curator-x-async/src/main/java/org/apache/curator/x/async/api/AsyncPersistentWatchBuilder.java
index 0f29233..143a2c8 100644
--- 
a/curator-x-async/src/main/java/org/apache/curator/x/async/api/AsyncPersistentWatchBuilder.java
+++ 
b/curator-x-async/src/main/java/org/apache/curator/x/async/api/AsyncPersistentWatchBuilder.java
@@ -1,4 +1,22 @@
 /**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/**
   * Licensed to the Apache Software Foundation (ASF) under one
   * or more contributor license agreements.  See the NOTICE file
   * distributed with this work for additional information
diff --git 
a/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncPersistentWatchBuilderImpl.java
 
b/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncPersistentWatchBuilderImpl.java
deleted file mode 100644
index 14f3e30..0000000
--- 
a/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncPersistentWatchBuilderImpl.java
+++ /dev/null
@@ -1,75 +0,0 @@
-/**
-  * Licensed to the Apache Software Foundation (ASF) under one
-  * or more contributor license agreements.  See the NOTICE file
-  * distributed with this work for additional information
-  * regarding copyright ownership.  The ASF licenses this file
-  * to you under the Apache License, Version 2.0 (the
-  * "License"); you may not use this file except in compliance
-  * with the License.  You may obtain a copy of the License at
-  *
-  *   http://www.apache.org/licenses/LICENSE-2.0
-  *
-  * Unless required by applicable law or agreed to in writing,
-  * software distributed under the License is distributed on an
-  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-  * KIND, either express or implied.  See the License for the
-  * specific language governing permissions and limitations
-  * under the License.
-  */
- package org.apache.curator.x.async.details;
-
- import org.apache.curator.framework.api.AddPersistentWatchable;
- import org.apache.curator.framework.api.CuratorWatcher;
- import org.apache.curator.framework.imps.AddPersistentWatchBuilderImpl;
- import org.apache.curator.framework.imps.CuratorFrameworkImpl;
- import org.apache.curator.framework.imps.Watching;
- import org.apache.curator.x.async.AsyncStage;
- import org.apache.curator.x.async.api.AsyncPathable;
- import org.apache.curator.x.async.api.AsyncPersistentWatchBuilder;
- import org.apache.zookeeper.Watcher;
-
- import static org.apache.curator.x.async.details.BackgroundProcs.ignoredProc;
- import static org.apache.curator.x.async.details.BackgroundProcs.safeCall;
-
- class AsyncPersistentWatchBuilderImpl implements AsyncPersistentWatchBuilder, 
AddPersistentWatchable<AsyncPathable<AsyncStage<Void>>>, 
AsyncPathable<AsyncStage<Void>>
- {
-     private final CuratorFrameworkImpl client;
-     private final Filters filters;
-     private Watching watching = null;
-     private boolean recursive = false;
-
-     AsyncPersistentWatchBuilderImpl(CuratorFrameworkImpl client, Filters 
filters)
-     {
-         this.client = client;
-         this.filters = filters;
-     }
-
-     @Override
-     public AddPersistentWatchable<AsyncPathable<AsyncStage<Void>>> recursive()
-     {
-         recursive = true;
-         return this;
-     }
-
-     @Override
-     public AsyncPathable<AsyncStage<Void>> usingWatcher(Watcher watcher)
-     {
-         watching = new Watching(client, watcher);
-         return this;
-     }
-
-     @Override
-     public AsyncPathable<AsyncStage<Void>> usingWatcher(CuratorWatcher 
watcher)
-     {
-         watching = new Watching(client, watcher);
-         return this;
-     }
-
-     @Override
-     public AsyncStage<Void> forPath(String path)
-     {
-         BuilderCommon<Void> common = new BuilderCommon<>(filters, 
ignoredProc);
-         AddPersistentWatchBuilderImpl builder = new 
AddPersistentWatchBuilderImpl(client, watching, common.backgrounding, 
recursive);
-         return safeCall(common.internalCallback, () -> builder.forPath(path));
-     }
- }
\ No newline at end of file

Reply via email to