This is an automated email from the ASF dual-hosted git repository.

randgalt pushed a commit to branch persistent-watcher-cache
in repository https://gitbox.apache.org/repos/asf/curator.git

commit ac601997f817a1f646b3547b07394c00bdc622dc
Author: randgalt <[email protected]>
AuthorDate: Mon Oct 7 15:48:57 2019 +0300

    wip
---
 .../framework/recipes/cache/CuratorCache.java      | 141 +++++++++
 .../framework/recipes/cache/CuratorCacheImpl.java  | 273 ++++++++++++++++++
 .../recipes/cache/CuratorCacheListener.java        | 145 ++++++++++
 .../recipes/cache/CuratorCacheStorage.java         | 117 ++++++++
 .../curator/framework/recipes/cache/NodeCache.java |   3 +
 .../recipes/cache/NodeCacheListenerWrapper.java    |  47 +++
 .../framework/recipes/cache/PathChildrenCache.java |   3 +
 .../cache/PathChildrenCacheListenerWrapper.java    |  71 +++++
 .../recipes/cache/StandardCuratorCacheStorage.java |  98 +++++++
 .../curator/framework/recipes/cache/TreeCache.java |   3 +
 .../recipes/cache/TreeCacheListenerWrapper.java    |  71 +++++
 .../framework/recipes/watch/PersistentWatcher.java |  39 ++-
 .../framework/recipes/cache/TestCuratorCache.java  | 112 ++++++++
 .../recipes/cache/TestCuratorCacheConsistency.java | 317 +++++++++++++++++++++
 .../recipes/cache/TestWrappedNodeCache.java        | 165 +++++++++++
 .../recipes/watch/TestPersistentWatcher.java       |  20 ++
 .../org/apache/curator/test/TestingCluster.java    |   2 +-
 .../x/async/api/AsyncPersistentWatchBuilder.java   |  18 ++
 .../details/AsyncPersistentWatchBuilderImpl.java   |  75 -----
 19 files changed, 1642 insertions(+), 78 deletions(-)

diff --git 
a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/CuratorCache.java
 
b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/CuratorCache.java
new file mode 100644
index 0000000..44465c8
--- /dev/null
+++ 
b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/CuratorCache.java
@@ -0,0 +1,141 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.curator.framework.recipes.cache;
+
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.listen.Listenable;
+import java.io.Closeable;
+import java.util.Optional;
+import java.util.function.Consumer;
+
+/**
+ * <p>
+ *     A utility that attempts to keep the data from a node locally cached. 
Optionally the entire
+ *     tree of children below the node can also be cached. Will respond to 
update/create/delete events, pull
+ *     down the data, etc. You can register listeners that will get notified 
when changes occur.
+ * </p>
+ *
+ * <p>
+ *     <b>IMPORTANT</b> - it's not possible to stay transactionally in sync. 
Users of this class must
+ *     be prepared for false-positives and false-negatives. Additionally, 
always use the version number
+ *     when updating data to avoid overwriting another process' change.
+ * </p>
+ */
+public interface CuratorCache extends Closeable
+{
+    /**
+     * cache build options
+     */
+    enum Options
+    {
+        /**
+         * Cache the entire tree of nodes starting at the given node
+         */
+        RECURSIVE,
+
+        /**
+         * Decompress data via {@link 
org.apache.curator.framework.api.GetDataBuilder#decompressed()}
+         */
+        COMPRESSED_DATA
+    }
+
+    /**
+     * Return a Curator cache for the given path with the given options using 
a standard storage instance
+     *
+     * @param client Curator client
+     * @param path path to cache
+     * @param options any options
+     * @return cache (note it must be started via {@link #start()}
+     */
+    static CuratorCache build(CuratorFramework client, String path, Options... 
options)
+    {
+        return build(client, CuratorCacheStorage.standard(), path, options);
+    }
+
+    /**
+     * Return a Curator cache for the given path with the given options and 
the given storage instance
+     *
+     * @param client Curator client
+     * @param storage storage to use
+     * @param path path to cache
+     * @param options any options
+     * @return cache (note it must be started via {@link #start()}
+     */
+    static CuratorCache build(CuratorFramework client, CuratorCacheStorage 
storage, String path, Options... options)
+    {
+        return new CuratorCacheImpl(client, storage, path, options);
+    }
+
+    /**
+     * Start the cache. This will cause a complete refresh from the cache's 
root node and generate
+     * events for all nodes found, etc.
+     */
+    void start();
+
+    /**
+     * Close the cache, stop responding to events, etc. Note: also calls 
{@link CuratorCacheStorage#close()}
+     */
+    @Override
+    void close();
+
+    /**
+     * Utility to force a rebuild of the cache. Normally, this should not ever 
be needed
+     */
+    void forceRebuild();
+
+    /**
+     * Return the storage instance being used
+     *
+     * @return storage
+     */
+    CuratorCacheStorage storage();
+
+    /**
+     * Return the root node being cached (i.e. the node passed to the builder)
+     *
+     * @return root node path
+     */
+    String getRootPath();
+
+    /**
+     * Convenience to return the root node data
+     *
+     * @return data (if it's in the cache)
+     */
+    default Optional<ChildData> getRootData()
+    {
+        return storage().get(getRootPath());
+    }
+
+    /**
+     * Return the listener container so that listeners can be registered to be 
notified of changes to the cache
+     *
+     * @return listener container
+     */
+    Listenable<CuratorCacheListener> listenable();
+
+    /**
+     * By default any unexpected exception is handled by logging the 
exception. You can change
+     * so that a handler is called instead. Under normal circumstances, this 
shouldn't be necessary.
+     *
+     * @param newHandler new exception handler or {@code null} to reset to the 
default logging
+     */
+    void setExceptionHandler(Consumer<Exception> newHandler);
+}
diff --git 
a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/CuratorCacheImpl.java
 
b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/CuratorCacheImpl.java
new file mode 100644
index 0000000..a31f841
--- /dev/null
+++ 
b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/CuratorCacheImpl.java
@@ -0,0 +1,273 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.curator.framework.recipes.cache;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Sets;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.api.BackgroundCallback;
+import org.apache.curator.framework.api.CuratorEvent;
+import org.apache.curator.framework.listen.Listenable;
+import org.apache.curator.framework.listen.StandardListenerManager;
+import org.apache.curator.framework.recipes.watch.PersistentWatcher;
+import org.apache.curator.utils.ThreadUtils;
+import org.apache.curator.utils.ZKPaths;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.WatchedEvent;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import java.util.Collections;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Consumer;
+
+import static org.apache.zookeeper.KeeperException.Code.NONODE;
+import static org.apache.zookeeper.KeeperException.Code.OK;
+
+class CuratorCacheImpl implements CuratorCache
+{
+    private final Logger log = LoggerFactory.getLogger(getClass());
+    private final AtomicReference<State> state = new 
AtomicReference<>(State.LATENT);
+    private final PersistentWatcher persistentWatcher;
+    private final CuratorFramework client;
+    private final CuratorCacheStorage storage;
+    private final String path;
+    private final boolean recursive;
+    private final boolean compressedData;
+    private final StandardListenerManager<CuratorCacheListener> 
listenerManager = StandardListenerManager.standard();
+    private volatile Consumer<Exception> exceptionHandler;
+
+    private enum State
+    {
+        LATENT,
+        STARTED,
+        CLOSED
+    }
+
+    CuratorCacheImpl(CuratorFramework client, CuratorCacheStorage storage, 
String path, Options... optionsArg)
+    {
+        Set<Options> options = (optionsArg != null) ? 
Sets.newHashSet(optionsArg) : Collections.emptySet();
+        this.client = client;
+        this.storage = storage;
+        this.path = path;
+        this.recursive = options.contains(Options.RECURSIVE);
+        this.compressedData = options.contains(Options.COMPRESSED_DATA);
+        persistentWatcher = new PersistentWatcher(client, path, recursive);
+        persistentWatcher.getListenable().addListener(this::processEvent);
+        persistentWatcher.getResetListenable().addListener(this::forceRebuild);
+        setExceptionHandler(null);
+    }
+
+    @Override
+    public void start()
+    {
+        Preconditions.checkState(state.compareAndSet(State.LATENT, 
State.STARTED), "Already started");
+        persistentWatcher.start();
+    }
+
+    @Override
+    public void close()
+    {
+        if ( state.compareAndSet(State.STARTED, State.CLOSED) )
+        {
+            persistentWatcher.close();
+            storage.close();
+        }
+    }
+
+    @Override
+    public void forceRebuild()
+    {
+        if ( state.get() != State.STARTED )
+        {
+            return;
+        }
+
+        nodeChanged(path);
+        storage.stream()
+            .map(ChildData::getPath)
+            .filter(p -> !p.equals(path))
+            .forEach(this::nodeChanged);
+    }
+
+    @Override
+    public String getRootPath()
+    {
+        return path;
+    }
+
+    @Override
+    public CuratorCacheStorage storage()
+    {
+        return storage;
+    }
+
+    @Override
+    public Listenable<CuratorCacheListener> listenable()
+    {
+        return listenerManager;
+    }
+
+    @Override
+    public void setExceptionHandler(Consumer<Exception> newHandler)
+    {
+        this.exceptionHandler = (newHandler != null) ? newHandler : e -> 
log.error("CuratorCache error", e);
+    }
+
+    private void processEvent(WatchedEvent event)
+    {
+        if ( state.get() != State.STARTED )
+        {
+            return;
+        }
+
+        switch ( event.getType() )
+        {
+            case NodeDataChanged:
+            case NodeCreated:
+            {
+                nodeChanged(event.getPath());
+                break;
+            }
+
+            case NodeDeleted:
+            {
+                removeStorage(event.getPath());
+                break;
+            }
+
+            case NodeChildrenChanged:
+            {
+                nodeChildrenChanged(event.getPath());
+                break;
+            }
+        }
+    }
+
+    private void nodeChildrenChanged(String fromPath)
+    {
+        if ( (state.get() != State.STARTED) || !recursive )
+        {
+            return;
+        }
+
+        try
+        {
+            BackgroundCallback callback = (__, event) -> {
+                if ( event.getResultCode() == OK.intValue() )
+                {
+                    event.getChildren().forEach(child -> 
nodeChanged(ZKPaths.makePath(fromPath, child)));
+                }
+                else if ( event.getResultCode() == NONODE.intValue() )
+                {
+                    removeStorage(event.getPath());
+                }
+                else
+                {
+                    handleException(event);
+                }
+            };
+            client.getChildren().inBackground(callback).forPath(fromPath);
+        }
+        catch ( Exception e )
+        {
+            handleException(e);
+        }
+    }
+
+    private void nodeChanged(String fromPath)
+    {
+        if ( state.get() != State.STARTED )
+        {
+            return;
+        }
+
+        try
+        {
+            BackgroundCallback callback = (__, event) -> {
+                if ( event.getResultCode() == OK.intValue() )
+                {
+                    putStorage(new ChildData(event.getPath(), event.getStat(), 
event.getData()));
+                    nodeChildrenChanged(event.getPath());
+                }
+                else if ( event.getResultCode() == NONODE.intValue() )
+                {
+                    removeStorage(event.getPath());
+                }
+                else
+                {
+                    handleException(event);
+                }
+            };
+            if ( compressedData )
+            {
+                
client.getData().decompressed().inBackground(callback).forPath(fromPath);
+            }
+            else
+            {
+                client.getData().inBackground(callback).forPath(fromPath);
+            }
+        }
+        catch ( Exception e )
+        {
+            handleException(e);
+        }
+    }
+
+    private void putStorage(ChildData data)
+    {
+        Optional<ChildData> previousData = storage.put(data);
+        if ( previousData.isPresent() )
+        {
+            if ( previousData.get().getStat().getMzxid() != 
data.getStat().getMzxid() )
+            {
+                callListeners(l -> l.nodeChanged(previousData.get(), data));
+            }
+        }
+        else
+        {
+            callListeners(l -> l.nodeChanged(null, data));
+        }
+    }
+
+    private void removeStorage(String path)
+    {
+        storage.remove(path).ifPresent(previousData -> callListeners(l -> 
l.nodeChanged(previousData, null)));
+    }
+
+    private void callListeners(Consumer<CuratorCacheListener> proc)
+    {
+        if ( state.get() == State.STARTED )
+        {
+            client.runSafe(() -> listenerManager.forEach(proc));
+        }
+    }
+
+    private void handleException(CuratorEvent event)
+    {
+        
handleException(KeeperException.create(KeeperException.Code.get(event.getResultCode())));
+    }
+
+    private void handleException(Exception e)
+    {
+        ThreadUtils.checkInterrupted(e);
+        exceptionHandler.accept(e);
+    }
+}
diff --git 
a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/CuratorCacheListener.java
 
b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/CuratorCacheListener.java
new file mode 100644
index 0000000..a66f6b8
--- /dev/null
+++ 
b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/CuratorCacheListener.java
@@ -0,0 +1,145 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.curator.framework.recipes.cache;
+
+import org.apache.curator.framework.CuratorFramework;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.Set;
+
+/**
+ * Listener for {@link CuratorCache} events
+ */
+@SuppressWarnings("deprecation")
+@FunctionalInterface
+public interface CuratorCacheListener
+{
+    enum Type
+    {
+        /**
+         * A new node was added to the cache
+         */
+        NODE_CREATED,
+
+        /**
+         * A node already in the cache has changed
+         */
+        NODE_CHANGED,
+
+        /**
+         * A node already in the cache was deleted
+         */
+        NODE_DELETED
+        ;
+
+        public static Type get(ChildData oldNode, ChildData node)
+        {
+            if ( (oldNode != null) && (node != null) )
+            {
+                return NODE_CHANGED;
+            }
+            else if ( oldNode != null )
+            {
+                return NODE_DELETED;
+            }
+            else if ( node != null )
+            {
+                return NODE_CREATED;
+            }
+            throw new IllegalArgumentException("oldNode and node cannot both 
be null");
+        }
+    }
+
+    /**
+     * Called when a node changes or is deleted. You can get a descriptive
+     * type for the operation by calling {@link 
org.apache.curator.framework.recipes.cache.CuratorCacheListener.Type#get(ChildData,
 ChildData)}
+     * with the {@code oldNode} and {@code node} arguments
+     *
+     * @param oldNode the old node or null
+     * @param node the new node or null
+     */
+    void nodeChanged(ChildData oldNode, ChildData node);
+
+    /**
+     * Utility - wrap the given listener so that it's only called when an 
event of any of the given
+     * types is received
+     *
+     * @param types types to listen for
+     * @param listener listener
+     * @return wrapped listener
+     */
+    static CuratorCacheListener forTypes(CuratorCacheListener listener, 
Type... types)
+    {
+        Set<Type> typeSet = new HashSet<>(Arrays.asList(types));
+        return (oldNode, node) -> {
+            if ( typeSet.contains(Type.get(oldNode, node)) )
+            {
+                listener.nodeChanged(oldNode, node);
+            }
+        };
+    }
+
+    /**
+     * Bridge listener. You can reuse old-style {@link 
org.apache.curator.framework.recipes.cache.PathChildrenCacheListener}s
+     * with CuratorCache. IMPORTANT: the connection state methods in the 
listener will never be called as CuratorCache
+     * does not register the listener with the connection state listener 
container. Also note that CuratorCache
+     * behaves differently than {@link 
org.apache.curator.framework.recipes.cache.PathChildrenCache} so
+     * things such as event ordering will likely be different.
+     *
+     * @param client the curator client
+     * @param listener the listener to wrap
+     * @return a CuratorCacheListener that forwards to the given listener
+     */
+    static CuratorCacheListener wrap(CuratorFramework client, 
PathChildrenCacheListener listener)
+    {
+        return new PathChildrenCacheListenerWrapper(listener, client);
+    }
+
+    /**
+     * Bridge listener. You can reuse old-style {@link 
org.apache.curator.framework.recipes.cache.TreeCacheListener}s
+     * with CuratorCache. IMPORTANT: the connection state methods in the 
listener will never be called as CuratorCache
+     * does not register the listener with the connection state listener 
container. Also note that CuratorCache
+     * behaves differently than {@link 
org.apache.curator.framework.recipes.cache.TreeCache} so
+     * things such as event ordering will likely be different.
+     *
+     * @param client the curator client
+     * @param listener the listener to wrap
+     * @return a CuratorCacheListener that forwards to the given listener
+     */
+    static CuratorCacheListener wrap(CuratorFramework client, 
TreeCacheListener listener)
+    {
+        return new TreeCacheListenerWrapper(client, listener);
+    }
+
+    /**
+     * Bridge listener. You can reuse old-style {@link 
org.apache.curator.framework.recipes.cache.NodeCacheListener}s
+     * with CuratorCache. IMPORTANT: the connection state methods in the 
listener will never be called as CuratorCache
+     * does not register the listener with the connection state listener 
container. Also note that CuratorCache
+     * behaves differently than {@link 
org.apache.curator.framework.recipes.cache.NodeCache} so
+     * things such as event ordering will likely be different.
+     *
+     * @param listener the listener to wrap
+     * @return a CuratorCacheListener that forwards to the given listener
+     */
+    static CuratorCacheListener wrap(NodeCacheListener listener)
+    {
+        return new NodeCacheListenerWrapper(listener);
+    }
+
+}
diff --git 
a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/CuratorCacheStorage.java
 
b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/CuratorCacheStorage.java
new file mode 100644
index 0000000..ca41d15
--- /dev/null
+++ 
b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/CuratorCacheStorage.java
@@ -0,0 +1,117 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.curator.framework.recipes.cache;
+
+import java.io.Closeable;
+import java.util.Optional;
+import java.util.stream.Stream;
+
+/**
+ * Interface for maintaining data in a {@link 
org.apache.curator.framework.recipes.cache.CuratorCache}
+ */
+public interface CuratorCacheStorage extends Closeable
+{
+    /**
+     * Return a new standard storage instance
+     *
+     * @return storage instance
+     */
+    static CuratorCacheStorage standard() {
+        return new StandardCuratorCacheStorage(true);
+    }
+
+    /**
+     * Return a new storage instance that does not retain the data bytes. i.e. 
ChildData objects
+     * returned by this storage will always return {@code null} for {@link 
ChildData#getData()}.
+     *
+     * @return storage instance that does not retain data bytes
+     */
+    static CuratorCacheStorage bytesNotCached() {
+        return new StandardCuratorCacheStorage(false);
+    }
+
+    /**
+     * Returns a new copy of this storage that does not clear its internal 
data when it is closed.
+     * Useful for retaining the cache after it is closed.
+     *
+     * @return new copy that does not clear on close
+     */
+    CuratorCacheStorage doNotClearOnClose();
+
+    /**
+     * Add an entry to storage and return any previous entry at that path
+     *
+     * @param data entry to add
+     * @return previous entry or {@code empty()}
+     */
+    Optional<ChildData> put(ChildData data);
+
+    /**
+     * Remove the entry from storage and return any previous entry at that path
+     *
+     * @param path path to remove
+     * @return previous entry or {@code empty()}
+     */
+    Optional<ChildData> remove(String path);
+
+    /**
+     * Return an entry from storage
+     *
+     * @param path path to get
+     * @return entry or {@code empty()}
+     */
+    Optional<ChildData> get(String path);
+
+    /**
+     * Return true if the storage currently has an entry for the given path
+     *
+     * @param path path to check
+     * @return true/false
+     */
+    boolean containsPath(String path);
+
+    /**
+     * Return the current number of entries in storage
+     *
+     * @return number of entries
+     */
+    int size();
+
+    /**
+     * Return a stream over the storage entries. Note: for a standard storage 
instance, the stream
+     * behaves like a stream returned by {@link 
java.util.concurrent.ConcurrentHashMap#entrySet()}
+     *
+     * @return stream over entries
+     */
+    Stream<ChildData> stream();
+
+    /**
+     * Reset the storage to zero entries
+     */
+    void clear();
+
+    /**
+     * Close the storage. For a standard storage instance, the storage is 
cleared.
+     */
+    @Override
+    default void close()
+    {
+        clear();
+    }
+}
diff --git 
a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/NodeCache.java
 
b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/NodeCache.java
index 9687e1b..f730f78 100644
--- 
a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/NodeCache.java
+++ 
b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/NodeCache.java
@@ -53,7 +53,10 @@ import java.util.concurrent.atomic.AtomicReference;
  * <p><b>IMPORTANT</b> - it's not possible to stay transactionally in sync. 
Users of this class must
  * be prepared for false-positives and false-negatives. Additionally, always 
use the version number
  * when updating data to avoid overwriting another process' change.</p>
+ *
+ * @deprecated Use {@link CuratorCache}
  */
+@Deprecated
 public class NodeCache implements Closeable
 {
     private final Logger log = LoggerFactory.getLogger(getClass());
diff --git 
a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/NodeCacheListenerWrapper.java
 
b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/NodeCacheListenerWrapper.java
new file mode 100644
index 0000000..89360aa
--- /dev/null
+++ 
b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/NodeCacheListenerWrapper.java
@@ -0,0 +1,47 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.curator.framework.recipes.cache;
+
+public class NodeCacheListenerWrapper implements CuratorCacheListener
+{
+    private final NodeCacheListener listener;
+
+    public NodeCacheListenerWrapper(NodeCacheListener listener)
+    {
+        this.listener = listener;
+    }
+
+    @Override
+    public void nodeChanged(ChildData oldNode, ChildData node)
+    {
+        callListener();
+    }
+
+    private void callListener()
+    {
+        try
+        {
+            listener.nodeChanged();
+        }
+        catch ( Exception e )
+        {
+            throw new RuntimeException(e);
+        }
+    }
+}
diff --git 
a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/PathChildrenCache.java
 
b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/PathChildrenCache.java
index bdc73cc..eb37936 100644
--- 
a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/PathChildrenCache.java
+++ 
b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/PathChildrenCache.java
@@ -64,8 +64,11 @@ import java.util.concurrent.atomic.AtomicReference;
  * <p><b>IMPORTANT</b> - it's not possible to stay transactionally in sync. 
Users of this class must
  * be prepared for false-positives and false-negatives. Additionally, always 
use the version number
  * when updating data to avoid overwriting another process' change.</p>
+ *
+ * @deprecated Use {@link CuratorCache}
  */
 @SuppressWarnings("NullableProblems")
+@Deprecated
 public class PathChildrenCache implements Closeable
 {
     private final Logger log = LoggerFactory.getLogger(getClass());
diff --git 
a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/PathChildrenCacheListenerWrapper.java
 
b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/PathChildrenCacheListenerWrapper.java
new file mode 100644
index 0000000..0393204
--- /dev/null
+++ 
b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/PathChildrenCacheListenerWrapper.java
@@ -0,0 +1,71 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.curator.framework.recipes.cache;
+
+import org.apache.curator.framework.CuratorFramework;
+
+public class PathChildrenCacheListenerWrapper implements CuratorCacheListener
+{
+    private final PathChildrenCacheListener listener;
+    private final CuratorFramework client;
+
+    public PathChildrenCacheListenerWrapper(PathChildrenCacheListener 
listener, CuratorFramework client)
+    {
+        this.listener = listener;
+        this.client = client;
+    }
+
+    @Override
+    public void nodeChanged(ChildData oldData, ChildData node)
+    {
+        switch ( CuratorCacheListener.Type.get(oldData, node) )
+        {
+            case NODE_CREATED:
+            {
+                sendEvent(node, PathChildrenCacheEvent.Type.CHILD_ADDED);
+                break;
+            }
+
+            case NODE_CHANGED:
+            {
+                sendEvent(node, PathChildrenCacheEvent.Type.CHILD_UPDATED);
+                break;
+            }
+
+            case NODE_DELETED:
+            {
+                sendEvent(oldData, PathChildrenCacheEvent.Type.CHILD_REMOVED);
+                break;
+            }
+        }
+    }
+
+    private void sendEvent(ChildData node, PathChildrenCacheEvent.Type type)
+    {
+        PathChildrenCacheEvent event = new PathChildrenCacheEvent(type, node);
+        try
+        {
+            listener.childEvent(client, event);
+        }
+        catch ( Exception e )
+        {
+            throw new RuntimeException(e);
+        }
+    }
+}
diff --git 
a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/StandardCuratorCacheStorage.java
 
b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/StandardCuratorCacheStorage.java
new file mode 100644
index 0000000..5049770
--- /dev/null
+++ 
b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/StandardCuratorCacheStorage.java
@@ -0,0 +1,98 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.curator.framework.recipes.cache;
+
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.stream.Stream;
+
+class StandardCuratorCacheStorage implements CuratorCacheStorage
+{
+    private final Map<String, ChildData> dataMap;
+    private final boolean cacheBytes;
+
+    StandardCuratorCacheStorage(boolean cacheBytes)
+    {
+        this(new ConcurrentHashMap<>(), cacheBytes);
+    }
+
+    @Override
+    public CuratorCacheStorage doNotClearOnClose()
+    {
+        Map<String, ChildData> copyMap = new ConcurrentHashMap<>(dataMap);
+        return new StandardCuratorCacheStorage(copyMap, cacheBytes)
+        {
+            @Override
+            public void close()
+            {
+                // NOP
+            }
+        };
+    }
+
+    @Override
+    public Optional<ChildData> put(ChildData data)
+    {
+        ChildData localData = cacheBytes ? data : new 
ChildData(data.getPath(), data.getStat(), null);
+        return Optional.ofNullable(dataMap.put(data.getPath(), localData));
+    }
+
+    @Override
+    public Optional<ChildData> remove(String path)
+    {
+        return Optional.ofNullable(dataMap.remove(path));
+    }
+
+    @Override
+    public Optional<ChildData> get(String path)
+    {
+        return Optional.ofNullable(dataMap.get(path));
+    }
+
+    @Override
+    public boolean containsPath(String path)
+    {
+        return dataMap.containsKey(path);
+    }
+
+    @Override
+    public int size()
+    {
+        return dataMap.size();
+    }
+
+    @Override
+    public Stream<ChildData> stream()
+    {
+        return dataMap.values().stream();
+    }
+
+    @Override
+    public void clear()
+    {
+        dataMap.clear();
+    }
+
+    private StandardCuratorCacheStorage(Map<String, ChildData> dataMap, 
boolean cacheBytes)
+    {
+        this.dataMap = dataMap;
+        this.cacheBytes = cacheBytes;
+    }
+}
diff --git 
a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/TreeCache.java
 
b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/TreeCache.java
index f42c1d5..e321eba 100644
--- 
a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/TreeCache.java
+++ 
b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/TreeCache.java
@@ -71,7 +71,10 @@ import static 
org.apache.curator.utils.PathUtils.validatePath;
  * <p><b>IMPORTANT</b> - it's not possible to stay transactionally in sync. 
Users of this class must
  * be prepared for false-positives and false-negatives. Additionally, always 
use the version number
  * when updating data to avoid overwriting another process' change.</p>
+ *
+ * @deprecated Use {@link CuratorCache}
  */
+@Deprecated
 public class TreeCache implements Closeable
 {
     private static final Logger LOG = LoggerFactory.getLogger(TreeCache.class);
diff --git 
a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/TreeCacheListenerWrapper.java
 
b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/TreeCacheListenerWrapper.java
new file mode 100644
index 0000000..0c3a75b
--- /dev/null
+++ 
b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/TreeCacheListenerWrapper.java
@@ -0,0 +1,71 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.curator.framework.recipes.cache;
+
+import org.apache.curator.framework.CuratorFramework;
+
+public class TreeCacheListenerWrapper implements CuratorCacheListener
+{
+    private final CuratorFramework client;
+    private final TreeCacheListener listener;
+
+    public TreeCacheListenerWrapper(CuratorFramework client, TreeCacheListener 
listener)
+    {
+        this.client = client;
+        this.listener = listener;
+    }
+
+    @Override
+    public void nodeChanged(ChildData oldData, ChildData node)
+    {
+        switch ( CuratorCacheListener.Type.get(oldData, node) )
+        {
+        case NODE_CREATED:
+        {
+            sendEvent(node, TreeCacheEvent.Type.NODE_ADDED);
+            break;
+        }
+
+        case NODE_CHANGED:
+        {
+            sendEvent(node, TreeCacheEvent.Type.NODE_UPDATED);
+            break;
+        }
+
+        case NODE_DELETED:
+        {
+            sendEvent(oldData, TreeCacheEvent.Type.NODE_REMOVED);
+            break;
+        }
+        }
+    }
+
+    private void sendEvent(ChildData node, TreeCacheEvent.Type type)
+    {
+        TreeCacheEvent event = new TreeCacheEvent(type, node);
+        try
+        {
+            listener.childEvent(client, event);
+        }
+        catch ( Exception e )
+        {
+            throw new RuntimeException(e);
+        }
+    }
+}
diff --git 
a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/PersistentWatcher.java
 
b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/PersistentWatcher.java
index 2c97490..40174fa 100644
--- 
a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/PersistentWatcher.java
+++ 
b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/PersistentWatcher.java
@@ -1,4 +1,22 @@
 /**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/**
   * Licensed to the Apache Software Foundation (ASF) under one
   * or more contributor license agreements.  See the NOTICE file
   * distributed with this work for additional information
@@ -43,6 +61,7 @@ import java.util.concurrent.atomic.AtomicReference;
      private final Logger log = LoggerFactory.getLogger(getClass());
      private final AtomicReference<State> state = new 
AtomicReference<>(State.LATENT);
      private final StandardListenerManager<Watcher> listeners = 
StandardListenerManager.standard();
+     private final StandardListenerManager<Runnable> resetListeners = 
StandardListenerManager.standard();
      private final ConnectionStateListener connectionStateListener = (client, 
newState) -> {
          if ( newState.isConnected() )
          {
@@ -115,13 +134,29 @@ import java.util.concurrent.atomic.AtomicReference;
          return listeners;
      }
 
+     /**
+      * Listeners are called when the persistent watcher has been successfully 
registered
+      * or re-registered after a connection disruption
+      *
+      * @return listener container
+      */
+     public StandardListenerManager<Runnable> getResetListenable()
+     {
+         return resetListeners;
+     }
+
      private void reset()
      {
          try
          {
              BackgroundCallback callback = (__, event) -> {
-                 if ( event.getResultCode() != 
KeeperException.Code.OK.intValue() ) {
-                     client.runSafe(this::reset);
+                 if ( event.getResultCode() != 
KeeperException.Code.OK.intValue() )
+                 {
+                     reset();
+                 }
+                 else
+                 {
+                     resetListeners.forEach(Runnable::run);
                  }
              };
              client.addWatch().withMode(recursive ? 
AddWatchMode.PERSISTENT_RECURSIVE : 
AddWatchMode.PERSISTENT).inBackground(callback).usingWatcher(watcher).forPath(basePath);
diff --git 
a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestCuratorCache.java
 
b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestCuratorCache.java
new file mode 100644
index 0000000..3ffec11
--- /dev/null
+++ 
b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestCuratorCache.java
@@ -0,0 +1,112 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.curator.framework.recipes.cache;
+
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.CuratorFrameworkFactory;
+import org.apache.curator.framework.state.ConnectionState;
+import org.apache.curator.retry.RetryOneTime;
+import org.apache.curator.test.BaseClassForTests;
+import org.apache.curator.test.InstanceSpec;
+import org.apache.curator.test.TestingCluster;
+import org.apache.curator.test.compatibility.Timing2;
+import org.apache.curator.test.compatibility.Zk35MethodInterceptor;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+import java.util.concurrent.CountDownLatch;
+
+import static 
org.apache.curator.framework.recipes.cache.CuratorCache.Options.RECURSIVE;
+import static 
org.apache.curator.framework.recipes.cache.CuratorCacheListener.Type.NODE_CHANGED;
+import static 
org.apache.curator.framework.recipes.cache.CuratorCacheListener.Type.NODE_CREATED;
+
+@Test(groups = Zk35MethodInterceptor.zk35Group)
+public class TestCuratorCache extends BaseClassForTests
+{
+    private final Timing2 timing = new Timing2();
+
+    @Test
+    public void testServerLoss() throws Exception   // mostly copied from 
TestPathChildrenCacheInCluster
+    {
+        try ( TestingCluster cluster = new TestingCluster(3) )
+        {
+            cluster.start();
+
+            try ( CuratorFramework client = 
CuratorFrameworkFactory.newClient(cluster.getConnectString(), timing.session(), 
timing.connection(), new RetryOneTime(1)) )
+            {
+                client.start();
+                client.create().creatingParentsIfNeeded().forPath("/test");
+
+                try (CuratorCache cache = CuratorCache.build(client, "/test", 
RECURSIVE))
+                {
+                    cache.start();
+
+                    CountDownLatch reconnectLatch = new CountDownLatch(1);
+                    client.getConnectionStateListenable().addListener((__, 
newState) -> {
+                        if ( newState == ConnectionState.RECONNECTED )
+                        {
+                            reconnectLatch.countDown();
+                        }
+                    });
+                    CountDownLatch latch = new CountDownLatch(3);
+                    cache.listenable().addListener((__, ___) -> 
latch.countDown());
+
+                    client.create().forPath("/test/one");
+                    client.create().forPath("/test/two");
+                    client.create().forPath("/test/three");
+
+                    Assert.assertTrue(timing.awaitLatch(latch));
+
+                    InstanceSpec connectionInstance = 
cluster.findConnectionInstance(client.getZookeeperClient().getZooKeeper());
+                    cluster.killServer(connectionInstance);
+
+                    Assert.assertTrue(timing.awaitLatch(reconnectLatch));
+
+                    timing.sleepABit();
+
+                    Assert.assertEquals(cache.storage().stream().count(), 4);
+                }
+            }
+        }
+    }
+
+    @Test
+    public void testUpdateWhenNotCachingData() throws Exception // mostly 
copied from TestPathChildrenCache
+    {
+        CuratorCacheStorage storage = new StandardCuratorCacheStorage(false);
+        try ( CuratorFramework client = 
CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), 
timing.connection(), new RetryOneTime(1)) )
+        {
+            client.start();
+            final CountDownLatch updatedLatch = new CountDownLatch(1);
+            final CountDownLatch addedLatch = new CountDownLatch(1);
+            client.create().creatingParentsIfNeeded().forPath("/test");
+            try (CuratorCache cache = CuratorCache.build(client, storage, 
"/test", RECURSIVE))
+            {
+                
cache.listenable().addListener(CuratorCacheListener.forTypes((__, ___) -> 
updatedLatch.countDown(), NODE_CHANGED));
+                
cache.listenable().addListener(CuratorCacheListener.forTypes((__, ___) -> 
addedLatch.countDown(), NODE_CREATED));
+                cache.start();
+
+                client.create().forPath("/test/foo", "first".getBytes());
+                Assert.assertTrue(timing.awaitLatch(addedLatch));
+
+                client.setData().forPath("/test/foo", "something 
new".getBytes());
+                Assert.assertTrue(timing.awaitLatch(updatedLatch));
+            }
+        }
+    }
+}
diff --git 
a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestCuratorCacheConsistency.java
 
b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestCuratorCacheConsistency.java
new file mode 100644
index 0000000..7a6c964
--- /dev/null
+++ 
b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestCuratorCacheConsistency.java
@@ -0,0 +1,317 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.curator.framework.recipes.cache;
+
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.CuratorFrameworkFactory;
+import org.apache.curator.retry.ExponentialBackoffRetry;
+import org.apache.curator.test.BaseClassForTests;
+import org.apache.curator.test.InstanceSpec;
+import org.apache.curator.test.TestingCluster;
+import org.apache.curator.test.compatibility.Timing2;
+import org.apache.curator.test.compatibility.Zk35MethodInterceptor;
+import org.apache.curator.utils.ZKPaths;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+import java.io.Closeable;
+import java.time.Duration;
+import java.time.Instant;
+import java.util.AbstractMap;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+import static 
org.apache.curator.framework.recipes.cache.CuratorCache.Options.RECURSIVE;
+
+@Test(groups = Zk35MethodInterceptor.zk35Group)
+public class TestCuratorCacheConsistency extends BaseClassForTests
+{
+    private final Logger log = LoggerFactory.getLogger(getClass());
+    private final ThreadLocalRandom random = ThreadLocalRandom.current();
+
+    private static final Timing2 timing = new Timing2();
+    private static final Duration testLength = Duration.ofSeconds(30);
+    private static final Duration thirdOfTestLength = 
Duration.ofMillis(testLength.toMillis() / 3);
+    private static final Duration sleepLength = Duration.ofMillis(5);
+    private static final int nodesPerLevel = 10;
+    private static final int clusterSize = 5;
+    private static final int maxServerKills = 2;
+
+    private static final String BASE_PATH = "/test";
+
+    private static class Client implements Closeable
+    {
+        private final CuratorFramework client;
+        private final CuratorCache cache;
+        private final int index;
+
+        Client(int index, String connectionString)
+        {
+            this.index = index;
+            client = buildClient(connectionString);
+            cache = CuratorCache.build(client, 
CuratorCacheStorage.standard().doNotClearOnClose(), BASE_PATH, RECURSIVE);
+        }
+
+        void start()
+        {
+            client.start();
+            cache.start();
+        }
+
+        @Override
+        public void close()
+        {
+            cache.close();
+            client.close();
+        }
+    }
+
+    @Test
+    public void testConsistencyAfterSimulation() throws Exception
+    {
+        int clientQty = random.nextInt(10, 20);
+        int maxDepth = random.nextInt(5, 10);
+
+        log.info("clientQty: {}, maxDepth: {}", clientQty, maxDepth);
+
+        List<Client> clients = Collections.emptyList();
+        Map<String, String> actualTree;
+
+        try ( TestingCluster cluster = new TestingCluster(clusterSize) )
+        {
+            cluster.start();
+
+            initializeBasePath(cluster);
+            try
+            {
+                clients = buildClients(cluster, clientQty);
+                workLoop(cluster, clients, maxDepth);
+
+                log.info("Test complete - sleeping to allow events to 
complete");
+                timing.sleepABit();
+            }
+            finally
+            {
+                clients.forEach(Client::close);
+            }
+
+            actualTree = buildActual(cluster);
+        }
+
+        log.info("client qty: {}", clientQty);
+
+        Map<Integer, List<String>> errorsList = clients.stream()
+            .map(client -> findErrors(client.index, actualTree, 
client.cache.storage()))
+            .filter(errorsEntry -> !errorsEntry.getValue().isEmpty())
+            .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
+        if ( !errorsList.isEmpty() )
+        {
+            log.error("{} clients had errors", errorsList.size());
+            errorsList.forEach((index, errorList) -> {
+                log.error("Client {}", index);
+                errorList.forEach(log::error);
+                log.error("");
+            });
+
+            Assert.fail("Errors found");
+        }
+    }
+
+    private Map<String, String> buildActual(TestingCluster cluster)
+    {
+        Map<String, String> actual = new HashMap<>();
+        try ( CuratorFramework client = 
buildClient(cluster.getConnectString()) )
+        {
+            client.start();
+            buildActual(client, actual, BASE_PATH);
+        }
+        return actual;
+    }
+
+    private void buildActual(CuratorFramework client, Map<String, String> 
actual, String fromPath)
+    {
+        try
+        {
+            byte[] bytes = client.getData().forPath(fromPath);
+            actual.put(fromPath, new String(bytes));
+            client.getChildren().forPath(fromPath).forEach(child -> 
buildActual(client, actual, ZKPaths.makePath(fromPath, child)));
+        }
+        catch ( Exception e )
+        {
+            Assert.fail("", e);
+        }
+    }
+
+    private List<Client> buildClients(TestingCluster cluster, int clientQty)
+    {
+        return IntStream.range(0, clientQty)
+            .mapToObj(index -> new Client(index, cluster.getConnectString()))
+            .peek(Client::start)
+            .collect(Collectors.toList());
+    }
+
+    private void initializeBasePath(TestingCluster cluster) throws Exception
+    {
+        try ( CuratorFramework client = 
buildClient(cluster.getConnectString()) )
+        {
+            client.start();
+            client.create().forPath(BASE_PATH, "".getBytes());
+        }
+    }
+
+    private void workLoop(TestingCluster cluster, List<Client> clients, int 
maxDepth) throws Exception
+    {
+        Instant start = Instant.now();
+        Instant lastServerKill = Instant.now();
+        int serverKillIndex = 0;
+        while ( true )
+        {
+            Duration elapsed = Duration.between(start, Instant.now());
+            if ( elapsed.compareTo(testLength) >= 0 )
+            {
+                break;
+            }
+
+            Duration elapsedFromLastServerKill = 
Duration.between(lastServerKill, Instant.now());
+            if ( elapsedFromLastServerKill.compareTo(thirdOfTestLength) >= 0 )
+            {
+                lastServerKill = Instant.now();
+                if ( serverKillIndex < maxServerKills )
+                {
+                    doKillServer(cluster, serverKillIndex++);
+                }
+            }
+
+            int thisDepth = random.nextInt(0, maxDepth);
+            String thisPath = buildPath(thisDepth);
+            CuratorFramework client = randomClient(clients);
+            if ( random.nextBoolean() )
+            {
+                doDelete(client, thisPath);
+            }
+            else
+            {
+                doChange(client, thisPath);
+            }
+
+            Thread.sleep(sleepLength.toMillis());
+        }
+    }
+
+    private void doChange(CuratorFramework client, String thisPath)
+    {
+        try
+        {
+            String thisData = Long.toString(random.nextLong());
+            
client.create().orSetData().creatingParentsIfNeeded().forPath(thisPath, 
thisData.getBytes());
+        }
+        catch ( Exception e )
+        {
+            Assert.fail("Could not create/set: " + thisPath);
+        }
+    }
+
+    private void doDelete(CuratorFramework client, String thisPath)
+    {
+        if ( thisPath.equals(BASE_PATH) )
+        {
+            return;
+        }
+        try
+        {
+            
client.delete().quietly().deletingChildrenIfNeeded().forPath(thisPath);
+        }
+        catch ( Exception e )
+        {
+            Assert.fail("Could not delete: " + thisPath);
+        }
+    }
+
+    private void doKillServer(TestingCluster cluster, int serverKillIndex) 
throws Exception
+    {
+        log.info("Killing server {}", serverKillIndex);
+        InstanceSpec killSpec = new 
ArrayList<>(cluster.getInstances()).get(serverKillIndex);
+        cluster.killServer(killSpec);
+    }
+
+    private CuratorFramework randomClient(List<Client> clients)
+    {
+        return clients.get(random.nextInt(clients.size())).client;
+    }
+
+    private Map.Entry<Integer, List<String>> findErrors(int index, Map<String, 
String> tree, CuratorCacheStorage storage)
+    {
+        List<String> errors = new ArrayList<>();
+        if ( tree.size() != storage.size() )
+        {
+            errors.add(String.format("Size mismatch. Expected: %d - Actual: 
%d", tree.size(), storage.size()));
+        }
+        tree.keySet().forEach(path -> {
+            if ( !storage.containsPath(path) )
+            {
+                errors.add(String.format("Path %s in master but not client", 
path));
+            }
+        });
+        storage.stream().forEach(data -> {
+            String treeValue = tree.get(data.getPath());
+            if ( treeValue != null )
+            {
+                if ( !treeValue.equals(new String(data.getData())) )
+                {
+                    errors.add(String.format("Data at %s is not the same", 
data.getPath()));
+                }
+            }
+            else
+            {
+                errors.add(String.format("Path %s in client but not master", 
data.getPath()));
+            }
+        });
+        return new AbstractMap.SimpleEntry<>(index, errors);
+    }
+
+    private String buildPath(int depth)
+    {
+        StringBuilder str = new StringBuilder(BASE_PATH);
+        while ( depth-- > 0 )
+        {
+            int levelNodeName = random.nextInt(nodesPerLevel);
+            str.append("/").append(levelNodeName);
+        }
+        return str.toString();
+    }
+
+    @Override
+    protected void createServer()
+    {
+        // do nothing
+    }
+
+    private static CuratorFramework buildClient(String connectionString)
+    {
+        ExponentialBackoffRetry retryPolicy = new ExponentialBackoffRetry(100, 
100);
+        return CuratorFrameworkFactory.newClient(connectionString, 
timing.session(), timing.connection(), retryPolicy);
+    }
+}
diff --git 
a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestWrappedNodeCache.java
 
b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestWrappedNodeCache.java
new file mode 100644
index 0000000..d595e3c
--- /dev/null
+++ 
b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestWrappedNodeCache.java
@@ -0,0 +1,165 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.curator.framework.recipes.cache;
+
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.CuratorFrameworkFactory;
+import org.apache.curator.framework.imps.TestCleanState;
+import org.apache.curator.framework.recipes.cache.CuratorCache;
+import org.apache.curator.framework.recipes.cache.CuratorCacheListener;
+import org.apache.curator.framework.recipes.cache.NodeCacheListener;
+import org.apache.curator.framework.state.ConnectionState;
+import org.apache.curator.retry.RetryOneTime;
+import org.apache.curator.test.BaseClassForTests;
+import org.apache.curator.test.compatibility.Timing2;
+import org.apache.curator.test.compatibility.Zk35MethodInterceptor;
+import org.apache.curator.utils.CloseableUtils;
+import org.apache.curator.utils.Compatibility;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Semaphore;
+
+import static 
org.apache.curator.framework.recipes.cache.CuratorCache.Options.RECURSIVE;
+
+@Test(groups = Zk35MethodInterceptor.zk35Group)
+public class TestWrappedNodeCache extends BaseClassForTests
+{
+    private final Timing2 timing = new Timing2();
+
+    @Test
+    public void testDeleteThenCreate() throws Exception
+    {
+        CuratorCache cache = null;
+        CuratorFramework client = 
CuratorFrameworkFactory.newClient(server.getConnectString(), new 
RetryOneTime(1));
+        try
+        {
+            client.start();
+            client.create().creatingParentsIfNeeded().forPath("/test/foo", 
"one".getBytes());
+
+            final Semaphore semaphore = new Semaphore(0);
+            cache = CuratorCache.build(client, "/test/foo", RECURSIVE);
+            NodeCacheListener listener = semaphore::release;
+            
cache.listenable().addListener(CuratorCacheListener.wrap(listener));
+
+            cache.start();
+            Assert.assertTrue(timing.acquireSemaphore(semaphore));
+
+            Assert.assertTrue(cache.getRootData().isPresent());
+            Assert.assertEquals(cache.getRootData().get().getData(), 
"one".getBytes());
+
+            client.delete().forPath("/test/foo");
+            Assert.assertTrue(timing.acquireSemaphore(semaphore));
+            client.create().forPath("/test/foo", "two".getBytes());
+            Assert.assertTrue(timing.acquireSemaphore(semaphore));
+
+            Assert.assertTrue(cache.getRootData().isPresent());
+            Assert.assertEquals(cache.getRootData().get().getData(), 
"two".getBytes());
+        }
+        finally
+        {
+            CloseableUtils.closeQuietly(cache);
+            TestCleanState.closeAndTestClean(client);
+        }
+    }
+
+    @Test
+    public void testKilledSession() throws Exception
+    {
+        CuratorCache cache = null;
+        CuratorFramework client = null;
+        try
+        {
+            client = 
CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), 
timing.connection(), new RetryOneTime(1));
+            client.start();
+            client.create().creatingParentsIfNeeded().forPath("/test/node", 
"start".getBytes());
+
+            CountDownLatch lostLatch = new CountDownLatch(1);
+            client.getConnectionStateListenable().addListener((__, newState) 
-> {
+                if ( newState == ConnectionState.LOST )
+                {
+                    lostLatch.countDown();
+                }
+            });
+
+            cache = CuratorCache.build(client,"/test/node", RECURSIVE);
+
+            Semaphore latch = new Semaphore(0);
+            NodeCacheListener listener = latch::release;
+            
cache.listenable().addListener(CuratorCacheListener.wrap(listener));
+
+            cache.start();
+            Assert.assertTrue(timing.acquireSemaphore(latch));
+
+            
Compatibility.injectSessionExpiration(client.getZookeeperClient().getZooKeeper());
+            Assert.assertTrue(timing.awaitLatch(lostLatch));
+
+            Assert.assertTrue(cache.getRootData().isPresent());
+            Assert.assertEquals(cache.getRootData().get().getData(), 
"start".getBytes());
+
+            client.setData().forPath("/test/node", "new data".getBytes());
+            Assert.assertTrue(timing.acquireSemaphore(latch));
+            Assert.assertTrue(cache.getRootData().isPresent());
+            Assert.assertEquals(cache.getRootData().get().getData(), "new 
data".getBytes());
+        }
+        finally
+        {
+            CloseableUtils.closeQuietly(cache);
+            TestCleanState.closeAndTestClean(client);
+        }
+    }
+
+    @Test
+    public void testBasics() throws Exception
+    {
+        CuratorCache cache = null;
+        CuratorFramework client = 
CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), 
timing.connection(), new RetryOneTime(1));
+        try
+        {
+            client.start();
+            client.create().forPath("/test");
+
+            cache = CuratorCache.build(client, "/test/node", RECURSIVE);
+            cache.start();
+
+            final Semaphore semaphore = new Semaphore(0);
+            NodeCacheListener listener = semaphore::release;
+            
cache.listenable().addListener(CuratorCacheListener.wrap(listener));
+
+            Assert.assertNull(cache.getRootData().orElse(null));
+
+            client.create().forPath("/test/node", "a".getBytes());
+            Assert.assertTrue(timing.acquireSemaphore(semaphore));
+            Assert.assertEquals(cache.getRootData().orElse(null).getData(), 
"a".getBytes());
+
+            client.setData().forPath("/test/node", "b".getBytes());
+            Assert.assertTrue(timing.acquireSemaphore(semaphore));
+            Assert.assertEquals(cache.getRootData().orElse(null).getData(), 
"b".getBytes());
+
+            client.delete().forPath("/test/node");
+            Assert.assertTrue(timing.acquireSemaphore(semaphore));
+            Assert.assertNull(cache.getRootData().orElse(null));
+        }
+        finally
+        {
+            CloseableUtils.closeQuietly(cache);
+            TestCleanState.closeAndTestClean(client);
+        }
+    }
+}
diff --git 
a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/watch/TestPersistentWatcher.java
 
b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/watch/TestPersistentWatcher.java
index df18de5..b3d5701 100644
--- 
a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/watch/TestPersistentWatcher.java
+++ 
b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/watch/TestPersistentWatcher.java
@@ -1,3 +1,21 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
 package org.apache.curator.framework.recipes.watch;
 
 import org.apache.curator.framework.CuratorFramework;
@@ -6,6 +24,7 @@ import org.apache.curator.framework.state.ConnectionState;
 import org.apache.curator.retry.RetryOneTime;
 import org.apache.curator.test.BaseClassForTests;
 import org.apache.curator.test.compatibility.Timing2;
+import org.apache.curator.test.compatibility.Zk35MethodInterceptor;
 import org.apache.zookeeper.WatchedEvent;
 import org.apache.zookeeper.Watcher;
 import org.testng.Assert;
@@ -14,6 +33,7 @@ import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.LinkedBlockingQueue;
 
+@Test(groups = Zk35MethodInterceptor.zk35Group)
 public class TestPersistentWatcher extends BaseClassForTests
 {
     private final Timing2 timing = new Timing2();
diff --git 
a/curator-test/src/main/java/org/apache/curator/test/TestingCluster.java 
b/curator-test/src/main/java/org/apache/curator/test/TestingCluster.java
index 3d38fe1..58da2c0 100644
--- a/curator-test/src/main/java/org/apache/curator/test/TestingCluster.java
+++ b/curator-test/src/main/java/org/apache/curator/test/TestingCluster.java
@@ -225,7 +225,7 @@ public class TestingCluster implements Closeable
      */
     public InstanceSpec findConnectionInstance(ZooKeeper client) throws 
Exception
     {
-        Method              m = 
client.getClass().getDeclaredMethod("testableRemoteSocketAddress");
+        Method              m = 
ZooKeeper.class.getDeclaredMethod("testableRemoteSocketAddress");
         m.setAccessible(true);
         InetSocketAddress   address = (InetSocketAddress)m.invoke(client);
         if ( address != null )
diff --git 
a/curator-x-async/src/main/java/org/apache/curator/x/async/api/AsyncPersistentWatchBuilder.java
 
b/curator-x-async/src/main/java/org/apache/curator/x/async/api/AsyncPersistentWatchBuilder.java
index 0f29233..143a2c8 100644
--- 
a/curator-x-async/src/main/java/org/apache/curator/x/async/api/AsyncPersistentWatchBuilder.java
+++ 
b/curator-x-async/src/main/java/org/apache/curator/x/async/api/AsyncPersistentWatchBuilder.java
@@ -1,4 +1,22 @@
 /**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/**
   * Licensed to the Apache Software Foundation (ASF) under one
   * or more contributor license agreements.  See the NOTICE file
   * distributed with this work for additional information
diff --git 
a/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncPersistentWatchBuilderImpl.java
 
b/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncPersistentWatchBuilderImpl.java
deleted file mode 100644
index 14f3e30..0000000
--- 
a/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncPersistentWatchBuilderImpl.java
+++ /dev/null
@@ -1,75 +0,0 @@
-/**
-  * Licensed to the Apache Software Foundation (ASF) under one
-  * or more contributor license agreements.  See the NOTICE file
-  * distributed with this work for additional information
-  * regarding copyright ownership.  The ASF licenses this file
-  * to you under the Apache License, Version 2.0 (the
-  * "License"); you may not use this file except in compliance
-  * with the License.  You may obtain a copy of the License at
-  *
-  *   http://www.apache.org/licenses/LICENSE-2.0
-  *
-  * Unless required by applicable law or agreed to in writing,
-  * software distributed under the License is distributed on an
-  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-  * KIND, either express or implied.  See the License for the
-  * specific language governing permissions and limitations
-  * under the License.
-  */
- package org.apache.curator.x.async.details;
-
- import org.apache.curator.framework.api.AddPersistentWatchable;
- import org.apache.curator.framework.api.CuratorWatcher;
- import org.apache.curator.framework.imps.AddPersistentWatchBuilderImpl;
- import org.apache.curator.framework.imps.CuratorFrameworkImpl;
- import org.apache.curator.framework.imps.Watching;
- import org.apache.curator.x.async.AsyncStage;
- import org.apache.curator.x.async.api.AsyncPathable;
- import org.apache.curator.x.async.api.AsyncPersistentWatchBuilder;
- import org.apache.zookeeper.Watcher;
-
- import static org.apache.curator.x.async.details.BackgroundProcs.ignoredProc;
- import static org.apache.curator.x.async.details.BackgroundProcs.safeCall;
-
- class AsyncPersistentWatchBuilderImpl implements AsyncPersistentWatchBuilder, 
AddPersistentWatchable<AsyncPathable<AsyncStage<Void>>>, 
AsyncPathable<AsyncStage<Void>>
- {
-     private final CuratorFrameworkImpl client;
-     private final Filters filters;
-     private Watching watching = null;
-     private boolean recursive = false;
-
-     AsyncPersistentWatchBuilderImpl(CuratorFrameworkImpl client, Filters 
filters)
-     {
-         this.client = client;
-         this.filters = filters;
-     }
-
-     @Override
-     public AddPersistentWatchable<AsyncPathable<AsyncStage<Void>>> recursive()
-     {
-         recursive = true;
-         return this;
-     }
-
-     @Override
-     public AsyncPathable<AsyncStage<Void>> usingWatcher(Watcher watcher)
-     {
-         watching = new Watching(client, watcher);
-         return this;
-     }
-
-     @Override
-     public AsyncPathable<AsyncStage<Void>> usingWatcher(CuratorWatcher 
watcher)
-     {
-         watching = new Watching(client, watcher);
-         return this;
-     }
-
-     @Override
-     public AsyncStage<Void> forPath(String path)
-     {
-         BuilderCommon<Void> common = new BuilderCommon<>(filters, 
ignoredProc);
-         AddPersistentWatchBuilderImpl builder = new 
AddPersistentWatchBuilderImpl(client, watching, common.backgrounding, 
recursive);
-         return safeCall(common.internalCallback, () -> builder.forPath(path));
-     }
- }
\ No newline at end of file

Reply via email to