This is an automated email from the ASF dual-hosted git repository.

randgalt pushed a commit to branch CURATOR-549-zk36-persistent-watcher-bridge
in repository https://gitbox.apache.org/repos/asf/curator.git

commit 20acdac9b1e2979236823997b832fefe24d56a16
Author: randgalt <randg...@apache.org>
AuthorDate: Sun Nov 3 17:53:02 2019 -0500

    CURATOR-549
    
    Creates a simple bridge that, when using ZK 3.6.0 creates a CuratorCache, 
and for earlier versions creates a TreeCache. The curator-test-zk35 module 
ensures that both code paths are tested.
---
 .../cache/CompatibleCuratorCacheBridge.java        | 113 ++++++++++++++++
 .../recipes/cache/CuratorCacheBridge.java          |  33 +++--
 .../recipes/cache/CuratorCacheBuilder.java         |   7 +
 .../recipes/cache/CuratorCacheBuilderImpl.java     |  21 ++-
 .../framework/recipes/cache/CuratorCacheImpl.java  |   2 +-
 .../cache/PathChildrenCacheListenerWrapper.java    |  11 +-
 .../curator/framework/recipes/cache/TreeCache.java |  36 ++++--
 curator-test-zk35/pom.xml                          |  46 +++++++
 curator-x-async/pom.xml                            |  15 +++
 .../details/CachedModeledFrameworkImpl.java        |  11 +-
 .../x/async/modeled/details/ModeledCacheImpl.java  |  50 +++++--
 .../modeled/details/ModeledFrameworkImpl.java      |   4 +-
 .../async/modeled/TestCachedModeledFramework.java  |   2 +
 .../x/async/modeled/TestModeledFrameworkBase.java  |   4 +-
 curator-x-discovery/pom.xml                        |  15 +++
 .../apache/curator/x/discovery/ServiceCache.java   |  20 ++-
 .../curator/x/discovery/ServiceProvider.java       |  24 +++-
 .../discovery/details/ServiceCacheBuilderImpl.java |   6 +-
 .../x/discovery/details/ServiceCacheImpl.java      | 143 ++++++++++++---------
 .../x/discovery/details/ServiceDiscoveryImpl.java  |  66 +++++-----
 .../x/discovery/details/ServiceProviderImpl.java   |   8 ++
 .../curator/x/discovery/TestServiceCache.java      |   2 +
 .../x/discovery/details/TestServiceDiscovery.java  |   2 +
 .../x/discovery/details/TestWatchedInstances.java  |   2 +
 pom.xml                                            |  14 ++
 25 files changed, 497 insertions(+), 160 deletions(-)

diff --git 
a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/CompatibleCuratorCacheBridge.java
 
b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/CompatibleCuratorCacheBridge.java
new file mode 100644
index 0000000..a09196f
--- /dev/null
+++ 
b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/CompatibleCuratorCacheBridge.java
@@ -0,0 +1,113 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.curator.framework.recipes.cache;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Sets;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.listen.Listenable;
+import org.apache.curator.framework.listen.StandardListenerManager;
+import java.util.Collections;
+import java.util.Set;
+import java.util.concurrent.Executor;
+
+import static 
org.apache.curator.framework.recipes.cache.CuratorCacheListener.Type.*;
+
+class CompatibleCuratorCacheBridge implements CuratorCacheBridge, 
TreeCacheListener
+{
+    private final TreeCache cache;
+    private final StandardListenerManager<CuratorCacheListener> 
listenerManager = StandardListenerManager.standard();
+
+    CompatibleCuratorCacheBridge(CuratorFramework client, String path, 
CuratorCache.Options[] optionsArg, Executor executor)
+    {
+        Set<CuratorCache.Options> options = (optionsArg != null) ? 
Sets.newHashSet(optionsArg) : Collections.emptySet();
+        TreeCache.Builder builder = TreeCache.newBuilder(client, 
path).setCacheData(false);
+        if ( options.contains(CuratorCache.Options.SINGLE_NODE_CACHE) )
+        {
+            builder.setMaxDepth(0);
+        }
+        if ( options.contains(CuratorCache.Options.COMPRESSED_DATA) )
+        {
+            builder.setDataIsCompressed(true);
+        }
+        if ( executor != null )
+        {
+            builder.setExecutor(executor);
+        }
+        cache = builder.build();
+    }
+
+    @Override
+    public void start()
+    {
+        try
+        {
+            cache.getListenable().addListener(this);
+
+            cache.start();
+        }
+        catch ( Exception e )
+        {
+            throw new RuntimeException(e);
+        }
+    }
+
+    @Override
+    public void close()
+    {
+        cache.close();
+    }
+
+    @Override
+    public Listenable<CuratorCacheListener> listenable()
+    {
+        return listenerManager;
+    }
+
+    @Override
+    public void childEvent(CuratorFramework client, TreeCacheEvent event) 
throws Exception
+    {
+        switch ( event.getType() )
+        {
+            case NODE_ADDED:
+            {
+                listenerManager.forEach(listener -> 
listener.event(NODE_CREATED, null, event.getData()));
+                break;
+            }
+
+            case NODE_REMOVED:
+            {
+                listenerManager.forEach(listener -> 
listener.event(NODE_DELETED, event.getData(), null));
+                break;
+            }
+
+            case NODE_UPDATED:
+            {
+                listenerManager.forEach(listener -> 
listener.event(NODE_CHANGED, null, event.getData()));
+                break;
+            }
+
+            case INITIALIZED:
+            {
+                listenerManager.forEach(CuratorCacheListener::initialized);
+                break;
+            }
+        }
+    }
+}
diff --git 
a/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/ServiceCache.java
 
b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/CuratorCacheBridge.java
similarity index 54%
copy from 
curator-x-discovery/src/main/java/org/apache/curator/x/discovery/ServiceCache.java
copy to 
curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/CuratorCacheBridge.java
index a122d69..7103877 100644
--- 
a/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/ServiceCache.java
+++ 
b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/CuratorCacheBridge.java
@@ -16,29 +16,34 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.curator.x.discovery;
+package org.apache.curator.framework.recipes.cache;
 
 import org.apache.curator.framework.listen.Listenable;
-import org.apache.curator.x.discovery.details.InstanceProvider;
-import org.apache.curator.x.discovery.details.ServiceCacheListener;
 import java.io.Closeable;
-import java.util.List;
 
-public interface ServiceCache<T> extends Closeable, 
Listenable<ServiceCacheListener>, InstanceProvider<T>
+/**
+ * A facade that uses {@link 
org.apache.curator.framework.recipes.cache.CuratorCache} if
+ * persistent watches are available or a {@link 
org.apache.curator.framework.recipes.cache.TreeCache}
+ * otherwise
+ */
+public interface CuratorCacheBridge extends Closeable
 {
     /**
-     * Return the current list of instances. NOTE: there is no guarantee of 
freshness. This is
-     * merely the last known list of instances. However, the list is updated 
via a ZooKeeper watcher
-     * so it should be fresh within a window of a second or two.
-     *
-     * @return the list
+     * Start the cache. This will cause a complete refresh from the cache's 
root node and generate
+     * events for all nodes found, etc.
+     */
+    void start();
+
+    /**
+     * Close the cache, stop responding to events, etc.
      */
-    public List<ServiceInstance<T>> getInstances();
+    @Override
+    void close();
 
     /**
-     * The cache must be started before use
+     * Return the listener container so that listeners can be registered to be 
notified of changes to the cache
      *
-     * @throws Exception errors
+     * @return listener container
      */
-    public void start() throws Exception;
+    Listenable<CuratorCacheListener> listenable();
 }
diff --git 
a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/CuratorCacheBuilder.java
 
b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/CuratorCacheBuilder.java
index 35a5f26..ab80a6f 100644
--- 
a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/CuratorCacheBuilder.java
+++ 
b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/CuratorCacheBuilder.java
@@ -60,4 +60,11 @@ public interface CuratorCacheBuilder
      * @return new Curator Cache
      */
     CuratorCache build();
+
+    /**
+     * Return a new bridge cache based on the builder methods that have been 
called.
+     *
+     * @return new bridge cache
+     */
+    CuratorCacheBridge buildBridge();
 }
\ No newline at end of file
diff --git 
a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/CuratorCacheBuilderImpl.java
 
b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/CuratorCacheBuilderImpl.java
index 9f9e03d..43491b1 100644
--- 
a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/CuratorCacheBuilderImpl.java
+++ 
b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/CuratorCacheBuilderImpl.java
@@ -19,7 +19,9 @@
 
 package org.apache.curator.framework.recipes.cache;
 
+import com.google.common.base.Preconditions;
 import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.utils.Compatibility;
 import java.util.concurrent.Executor;
 import java.util.function.Consumer;
 
@@ -69,6 +71,23 @@ class CuratorCacheBuilderImpl implements CuratorCacheBuilder
     @Override
     public CuratorCache build()
     {
-        return new CuratorCacheImpl(client, storage, path, options, executor, 
exceptionHandler);
+        return internalBuild();
+    }
+
+    @Override
+    public CuratorCacheBridge buildBridge()
+    {
+        Preconditions.checkArgument(storage == null, "Custom 
CuratorCacheStorage is not supported by the TreeCache bridge");
+        if ( Compatibility.hasPersistentWatchers() )
+        {
+            return internalBuild();
+        }
+        Preconditions.checkArgument(exceptionHandler == null, 
"ExceptionHandler is not supported by the TreeCache bridge");
+        return new CompatibleCuratorCacheBridge(client, path, options, 
executor);
+    }
+
+    private CuratorCacheImpl internalBuild()
+    {
+        return new CuratorCacheImpl(client, 
CuratorCacheStorage.bytesNotCached(), path, options, executor, 
exceptionHandler);
     }
 }
\ No newline at end of file
diff --git 
a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/CuratorCacheImpl.java
 
b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/CuratorCacheImpl.java
index 8916399..1e62a39 100644
--- 
a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/CuratorCacheImpl.java
+++ 
b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/CuratorCacheImpl.java
@@ -44,7 +44,7 @@ import static 
org.apache.curator.framework.recipes.cache.CuratorCacheListener.Ty
 import static org.apache.zookeeper.KeeperException.Code.NONODE;
 import static org.apache.zookeeper.KeeperException.Code.OK;
 
-class CuratorCacheImpl implements CuratorCache
+class CuratorCacheImpl implements CuratorCache, CuratorCacheBridge
 {
     private final Logger log = LoggerFactory.getLogger(getClass());
     private final AtomicReference<State> state = new 
AtomicReference<>(State.LATENT);
diff --git 
a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/PathChildrenCacheListenerWrapper.java
 
b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/PathChildrenCacheListenerWrapper.java
index a9123c1..7e9730c 100644
--- 
a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/PathChildrenCacheListenerWrapper.java
+++ 
b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/PathChildrenCacheListenerWrapper.java
@@ -39,19 +39,19 @@ class PathChildrenCacheListenerWrapper implements 
CuratorCacheListener
         {
             case NODE_CREATED:
             {
-                sendEvent(data, PathChildrenCacheEvent.Type.CHILD_ADDED);
+                sendEvent(new 
PathChildrenCacheEvent(PathChildrenCacheEvent.Type.CHILD_ADDED, data));
                 break;
             }
 
             case NODE_CHANGED:
             {
-                sendEvent(data, PathChildrenCacheEvent.Type.CHILD_UPDATED);
+                sendEvent(new 
PathChildrenCacheEvent(PathChildrenCacheEvent.Type.CHILD_UPDATED, data));
                 break;
             }
 
             case NODE_DELETED:
             {
-                sendEvent(oldData, PathChildrenCacheEvent.Type.CHILD_REMOVED);
+                sendEvent(new 
PathChildrenCacheEvent(PathChildrenCacheEvent.Type.CHILD_REMOVED, oldData));
                 break;
             }
         }
@@ -60,12 +60,11 @@ class PathChildrenCacheListenerWrapper implements 
CuratorCacheListener
     @Override
     public void initialized()
     {
-        sendEvent(null, PathChildrenCacheEvent.Type.INITIALIZED);
+        sendEvent(new 
PathChildrenCacheEvent(PathChildrenCacheEvent.Type.INITIALIZED, null));
     }
 
-    private void sendEvent(ChildData node, PathChildrenCacheEvent.Type type)
+    private void sendEvent(PathChildrenCacheEvent event)
     {
-        PathChildrenCacheEvent event = new PathChildrenCacheEvent(type, node);
         try
         {
             listener.childEvent(client, event);
diff --git 
a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/TreeCache.java
 
b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/TreeCache.java
index e2f3a8b..121e72c 100644
--- 
a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/TreeCache.java
+++ 
b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/TreeCache.java
@@ -52,6 +52,7 @@ import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.Executor;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ThreadFactory;
@@ -78,6 +79,7 @@ import static org.apache.curator.utils.PathUtils.validatePath;
 public class TreeCache implements Closeable
 {
     private static final Logger LOG = LoggerFactory.getLogger(TreeCache.class);
+    private final Executor executor;
     private final boolean createParentNodes;
     private final boolean disableZkWatches;
     private final TreeCacheSelector selector;
@@ -89,6 +91,7 @@ public class TreeCache implements Closeable
         private boolean cacheData = true;
         private boolean dataIsCompressed = false;
         private ExecutorService executorService = null;
+        private Executor executor = null;
         private int maxDepth = Integer.MAX_VALUE;
         private boolean createParentNodes = false;
         private boolean disableZkWatches = false;
@@ -105,12 +108,12 @@ public class TreeCache implements Closeable
          */
         public TreeCache build()
         {
-            ExecutorService executor = executorService;
-            if ( executor == null )
+            ExecutorService localExecutorService = executorService;
+            if ( (localExecutorService == null) && (executor == null) )
             {
-                executor = 
Executors.newSingleThreadExecutor(defaultThreadFactory);
+                localExecutorService = 
Executors.newSingleThreadExecutor(defaultThreadFactory);
             }
-            return new TreeCache(client, path, cacheData, dataIsCompressed, 
maxDepth, executor, createParentNodes, disableZkWatches, selector);
+            return new TreeCache(client, path, cacheData, dataIsCompressed, 
maxDepth, localExecutorService, executor, createParentNodes, disableZkWatches, 
selector);
         }
 
         /**
@@ -149,6 +152,15 @@ public class TreeCache implements Closeable
         }
 
         /**
+         * Sets the executor to publish events; a default executor will be 
created if not specified.
+         */
+        public Builder setExecutor(Executor executor)
+        {
+            this.executor = checkNotNull(executor);
+            return this;
+        }
+
+        /**
          * Sets the maximum depth to explore/watch.  A {@code maxDepth} of 
{@code 0} will watch only
          * the root node (like {@link NodeCache}); a {@code maxDepth} of 
{@code 1} will watch the
          * root node and its immediate children (kind of like {@link 
PathChildrenCache}.
@@ -564,7 +576,7 @@ public class TreeCache implements Closeable
      */
     public TreeCache(CuratorFramework client, String path)
     {
-        this(client, path, true, false, Integer.MAX_VALUE, 
Executors.newSingleThreadExecutor(defaultThreadFactory), false, false, new 
DefaultTreeCacheSelector());
+        this(client, path, true, false, Integer.MAX_VALUE, 
Executors.newSingleThreadExecutor(defaultThreadFactory), null, false, false, 
new DefaultTreeCacheSelector());
     }
 
     /**
@@ -573,12 +585,14 @@ public class TreeCache implements Closeable
      * @param cacheData        if true, node contents are cached in addition 
to the stat
      * @param dataIsCompressed if true, data in the path is compressed
      * @param executorService  Closeable ExecutorService to use for the 
TreeCache's background thread
+     * @param executor          executor to use for the TreeCache's background 
thread
      * @param createParentNodes true to create parent nodes as containers
      * @param disableZkWatches true to disable Zookeeper watches
      * @param selector         the selector to use
      */
-    TreeCache(CuratorFramework client, String path, boolean cacheData, boolean 
dataIsCompressed, int maxDepth, final ExecutorService executorService, boolean 
createParentNodes, boolean disableZkWatches, TreeCacheSelector selector)
+    TreeCache(CuratorFramework client, String path, boolean cacheData, boolean 
dataIsCompressed, int maxDepth, final ExecutorService executorService, final 
Executor executor, boolean createParentNodes, boolean disableZkWatches, 
TreeCacheSelector selector)
     {
+        this.executor = executor;
         this.createParentNodes = createParentNodes;
         this.selector = Preconditions.checkNotNull(selector, "selector cannot 
be null");
         this.root = new TreeNode(validatePath(path), null);
@@ -588,7 +602,7 @@ public class TreeCache implements Closeable
         this.dataIsCompressed = dataIsCompressed;
         this.maxDepth = maxDepth;
         this.disableZkWatches = disableZkWatches;
-        this.executorService = Preconditions.checkNotNull(executorService, 
"executorService cannot be null");
+        this.executorService = executorService;
     }
 
     /**
@@ -623,7 +637,10 @@ public class TreeCache implements Closeable
             client.removeWatchers();
             
client.getConnectionStateListenable().removeListener(connectionStateListener);
             listeners.clear();
-            executorService.shutdown();
+            if ( executorService != null )
+            {
+                executorService.shutdown();
+            }
             try
             {
                 root.wasDeleted();
@@ -857,8 +874,9 @@ public class TreeCache implements Closeable
     {
         if ( treeState.get() != TreeState.CLOSED )
         {
+            Executor localExecutor = (executorService != null) ? 
executorService : executor;
             LOG.debug("publishEvent: {}", event);
-            executorService.submit(new Runnable()
+            localExecutor.execute(new Runnable()
             {
                 @Override
                 public void run()
diff --git a/curator-test-zk35/pom.xml b/curator-test-zk35/pom.xml
index 5803893..4ca8356 100644
--- a/curator-test-zk35/pom.xml
+++ b/curator-test-zk35/pom.xml
@@ -89,6 +89,18 @@
 
         <dependency>
             <groupId>org.apache.curator</groupId>
+            <artifactId>curator-x-discovery</artifactId>
+            <exclusions>
+                <exclusion>
+                    <groupId>org.apache.zookeeper</groupId>
+                    <artifactId>zookeeper</artifactId>
+                </exclusion>
+            </exclusions>
+            <scope>test</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.curator</groupId>
             <artifactId>curator-recipes</artifactId>
             <exclusions>
                 <exclusion>
@@ -114,6 +126,32 @@
         </dependency>
 
         <dependency>
+            <groupId>org.apache.curator</groupId>
+            <artifactId>curator-x-async</artifactId>
+            <exclusions>
+                <exclusion>
+                    <groupId>org.apache.zookeeper</groupId>
+                    <artifactId>zookeeper</artifactId>
+                </exclusion>
+            </exclusions>
+            <type>test-jar</type>
+            <scope>test</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.curator</groupId>
+            <artifactId>curator-x-discovery</artifactId>
+            <exclusions>
+                <exclusion>
+                    <groupId>org.apache.zookeeper</groupId>
+                    <artifactId>zookeeper</artifactId>
+                </exclusion>
+            </exclusions>
+            <type>test-jar</type>
+            <scope>test</scope>
+        </dependency>
+
+        <dependency>
             <groupId>org.testng</groupId>
             <artifactId>testng</artifactId>
             <scope>test</scope>
@@ -124,6 +162,12 @@
             <artifactId>slf4j-log4j12</artifactId>
             <scope>test</scope>
         </dependency>
+
+        <dependency>
+            <groupId>com.fasterxml.jackson.core</groupId>
+            <artifactId>jackson-databind</artifactId>
+            <scope>test</scope>
+        </dependency>
     </dependencies>
 
     <build>
@@ -151,6 +195,8 @@
                     <dependenciesToScan>
                         
<dependency>org.apache.curator:curator-framework</dependency>
                         
<dependency>org.apache.curator:curator-recipes</dependency>
+                        
<dependency>org.apache.curator:curator-x-async</dependency>
+                        
<dependency>org.apache.curator:curator-x-discovery</dependency>
                     </dependenciesToScan>
                     <groups>zk35,zk35Compatibility</groups>
                     <excludedGroups>zk36</excludedGroups>
diff --git a/curator-x-async/pom.xml b/curator-x-async/pom.xml
index 5ffd774..a32dbd3 100644
--- a/curator-x-async/pom.xml
+++ b/curator-x-async/pom.xml
@@ -49,4 +49,19 @@
             <scope>test</scope>
         </dependency>
     </dependencies>
+
+    <build>
+        <plugins>
+            <plugin>
+                <artifactId>maven-jar-plugin</artifactId>
+                <executions>
+                    <execution>
+                        <goals>
+                            <goal>test-jar</goal>
+                        </goals>
+                    </execution>
+                </executions>
+            </plugin>
+        </plugins>
+    </build>
 </project>
\ No newline at end of file
diff --git 
a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/CachedModeledFrameworkImpl.java
 
b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/CachedModeledFrameworkImpl.java
index c897b4e..2dd5625 100644
--- 
a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/CachedModeledFrameworkImpl.java
+++ 
b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/CachedModeledFrameworkImpl.java
@@ -37,7 +37,6 @@ import org.apache.zookeeper.server.DataTree;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
-import java.util.concurrent.Executor;
 import java.util.concurrent.ExecutorService;
 import java.util.function.Function;
 import java.util.function.Supplier;
@@ -47,18 +46,16 @@ class CachedModeledFrameworkImpl<T> implements 
CachedModeledFramework<T>
 {
     private final ModeledFramework<T> client;
     private final ModeledCacheImpl<T> cache;
-    private final Executor executor;
 
     CachedModeledFrameworkImpl(ModeledFramework<T> client, ExecutorService 
executor)
     {
-        this(client, new ModeledCacheImpl<>(client.unwrap().unwrap(), 
client.modelSpec(), executor), executor);
+        this(client, new ModeledCacheImpl<>(client.unwrap().unwrap(), 
client.modelSpec(), executor));
     }
 
-    private CachedModeledFrameworkImpl(ModeledFramework<T> client, 
ModeledCacheImpl<T> cache, Executor executor)
+    private CachedModeledFrameworkImpl(ModeledFramework<T> client, 
ModeledCacheImpl<T> cache)
     {
         this.client = client;
         this.cache = cache;
-        this.executor = executor;
     }
 
     @Override
@@ -118,7 +115,7 @@ class CachedModeledFrameworkImpl<T> implements 
CachedModeledFramework<T>
     @Override
     public CachedModeledFramework<T> child(Object child)
     {
-        return new CachedModeledFrameworkImpl<>(client.child(child), cache, 
executor);
+        return new CachedModeledFrameworkImpl<>(client.child(child), cache);
     }
 
     @Override
@@ -130,7 +127,7 @@ class CachedModeledFrameworkImpl<T> implements 
CachedModeledFramework<T>
     @Override
     public CachedModeledFramework<T> withPath(ZPath path)
     {
-        return new CachedModeledFrameworkImpl<>(client.withPath(path), cache, 
executor);
+        return new CachedModeledFrameworkImpl<>(client.withPath(path), cache);
     }
 
     @Override
diff --git 
a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/ModeledCacheImpl.java
 
b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/ModeledCacheImpl.java
index b95e92d..54f01c1 100644
--- 
a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/ModeledCacheImpl.java
+++ 
b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/ModeledCacheImpl.java
@@ -19,34 +19,42 @@
 package org.apache.curator.x.async.modeled.details;
 
 import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.EnsureContainers;
 import org.apache.curator.framework.listen.Listenable;
 import org.apache.curator.framework.listen.ListenerContainer;
-import org.apache.curator.framework.recipes.cache.TreeCache;
+import org.apache.curator.framework.recipes.cache.CuratorCache;
+import org.apache.curator.framework.recipes.cache.CuratorCacheBridge;
+import org.apache.curator.framework.recipes.cache.CuratorCacheBuilder;
+import org.apache.curator.framework.recipes.cache.CuratorCacheListener;
 import org.apache.curator.framework.recipes.cache.TreeCacheEvent;
 import org.apache.curator.framework.recipes.cache.TreeCacheListener;
 import org.apache.curator.utils.ThreadUtils;
+import org.apache.curator.utils.ZKPaths;
 import org.apache.curator.x.async.api.CreateOption;
 import org.apache.curator.x.async.modeled.ModelSerializer;
 import org.apache.curator.x.async.modeled.ModelSpec;
+import org.apache.curator.x.async.modeled.ZNode;
 import org.apache.curator.x.async.modeled.ZPath;
 import org.apache.curator.x.async.modeled.cached.ModeledCache;
 import org.apache.curator.x.async.modeled.cached.ModeledCacheListener;
-import org.apache.curator.x.async.modeled.ZNode;
 import org.apache.zookeeper.data.Stat;
 import java.util.AbstractMap;
 import java.util.Map;
 import java.util.Optional;
+import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutorService;
 import java.util.stream.Collectors;
 
 class ModeledCacheImpl<T> implements TreeCacheListener, ModeledCache<T>
 {
-    private final TreeCache cache;
+    private final CuratorCacheBridge cache;
     private final Map<ZPath, Entry<T>> entries = new ConcurrentHashMap<>();
     private final ModelSerializer<T> serializer;
     private final ListenerContainer<ModeledCacheListener<T>> listenerContainer 
= new ListenerContainer<>();
     private final ZPath basePath;
+    private final CuratorFramework client;
+    private final Set<CreateOption> options;
 
     private static final class Entry<T>
     {
@@ -62,6 +70,7 @@ class ModeledCacheImpl<T> implements TreeCacheListener, 
ModeledCache<T>
 
     ModeledCacheImpl(CuratorFramework client, ModelSpec<T> modelSpec, 
ExecutorService executor)
     {
+        this.client = client;
         if ( !modelSpec.path().isResolved() && !modelSpec.path().isRoot() && 
modelSpec.path().parent().isResolved() )
         {
             modelSpec = modelSpec.parent(); // i.e. the last item is a 
parameter
@@ -69,19 +78,39 @@ class ModeledCacheImpl<T> implements TreeCacheListener, 
ModeledCache<T>
 
         basePath = modelSpec.path();
         this.serializer = modelSpec.serializer();
-        cache = TreeCache.newBuilder(client, basePath.fullPath())
-            .setCacheData(false)
-            
.setDataIsCompressed(modelSpec.createOptions().contains(CreateOption.compress))
-            .setExecutor(executor)
-            
.setCreateParentNodes(modelSpec.createOptions().contains(CreateOption.createParentsIfNeeded)
 || modelSpec.createOptions().contains(CreateOption.createParentsAsContainers))
-            .build();
+        options = modelSpec.createOptions();
+        CuratorCacheBuilder builder = CuratorCache.builder(client, 
basePath.fullPath());
+        if ( modelSpec.createOptions().contains(CreateOption.compress) )
+        {
+            builder.withOptions(CuratorCache.Options.COMPRESSED_DATA);
+        }
+        if ( executor != null )
+        {
+            builder.withExecutor(executor);
+        }
+        cache = builder.buildBridge();
     }
 
     public void start()
     {
+        CuratorCacheListener listener = CuratorCacheListener.builder()
+            .forTreeCache(client, this)
+            .build();
         try
         {
-            cache.getListenable().addListener(this);
+            if ( options.contains(CreateOption.createParentsIfNeeded) )
+            {
+                if ( options.contains(CreateOption.createParentsAsContainers) )
+                {
+                    new EnsureContainers(client, basePath.fullPath()).ensure();
+                }
+                else
+                {
+                    ZKPaths.mkdirs(client.getZookeeperClient().getZooKeeper(), 
basePath.fullPath(), false, null, false);
+                }
+            }
+
+            cache.listenable().addListener(listener);
             cache.start();
         }
         catch ( Exception e )
@@ -92,7 +121,6 @@ class ModeledCacheImpl<T> implements TreeCacheListener, 
ModeledCache<T>
 
     public void close()
     {
-        cache.getListenable().removeListener(this);
         cache.close();
         entries.clear();
     }
diff --git 
a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/ModeledFrameworkImpl.java
 
b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/ModeledFrameworkImpl.java
index dbbf3cb..799845f 100644
--- 
a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/ModeledFrameworkImpl.java
+++ 
b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/ModeledFrameworkImpl.java
@@ -113,14 +113,14 @@ public class ModeledFrameworkImpl<T> implements 
ModeledFramework<T>
     @Override
     public CachedModeledFramework<T> cached()
     {
-        return 
cached(ThreadUtils.newSingleThreadExecutor("CachedModeledFramework"));
+        return cached(null);
     }
 
     @Override
     public CachedModeledFramework<T> cached(ExecutorService executor)
     {
         Preconditions.checkState(!isWatched, "CachedModeledFramework cannot be 
used with watched instances as the internal cache would bypass the watchers.");
-        return new CachedModeledFrameworkImpl<>(this, 
Objects.requireNonNull(executor, "executor cannot be null"));
+        return new CachedModeledFrameworkImpl<>(this, null);
     }
 
     @Override
diff --git 
a/curator-x-async/src/test/java/org/apache/curator/x/async/modeled/TestCachedModeledFramework.java
 
b/curator-x-async/src/test/java/org/apache/curator/x/async/modeled/TestCachedModeledFramework.java
index 2d33c13..830ee2b 100644
--- 
a/curator-x-async/src/test/java/org/apache/curator/x/async/modeled/TestCachedModeledFramework.java
+++ 
b/curator-x-async/src/test/java/org/apache/curator/x/async/modeled/TestCachedModeledFramework.java
@@ -21,6 +21,7 @@ package org.apache.curator.x.async.modeled;
 import com.google.common.collect.Sets;
 import org.apache.curator.framework.state.ConnectionState;
 import org.apache.curator.test.Timing;
+import org.apache.curator.test.compatibility.CuratorTestBase;
 import org.apache.curator.x.async.modeled.cached.CachedModeledFramework;
 import org.apache.curator.x.async.modeled.cached.ModeledCacheListener;
 import org.apache.curator.x.async.modeled.models.TestModel;
@@ -38,6 +39,7 @@ import java.util.function.Function;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
+@Test(groups = CuratorTestBase.zk35CompatibilityGroup)
 public class TestCachedModeledFramework extends TestModeledFrameworkBase
 {
     @Test
diff --git 
a/curator-x-async/src/test/java/org/apache/curator/x/async/modeled/TestModeledFrameworkBase.java
 
b/curator-x-async/src/test/java/org/apache/curator/x/async/modeled/TestModeledFrameworkBase.java
index 61a4570..5660539 100644
--- 
a/curator-x-async/src/test/java/org/apache/curator/x/async/modeled/TestModeledFrameworkBase.java
+++ 
b/curator-x-async/src/test/java/org/apache/curator/x/async/modeled/TestModeledFrameworkBase.java
@@ -37,7 +37,7 @@ public class TestModeledFrameworkBase extends 
CompletableBaseClassForTests
     protected ModelSpec<TestNewerModel> newModelSpec;
     protected AsyncCuratorFramework async;
 
-    @BeforeMethod
+    @BeforeMethod(alwaysRun = true)
     @Override
     public void setup() throws Exception
     {
@@ -54,7 +54,7 @@ public class TestModeledFrameworkBase extends 
CompletableBaseClassForTests
         newModelSpec = ModelSpec.builder(path, newSerializer).build();
     }
 
-    @AfterMethod
+    @AfterMethod(alwaysRun = true)
     @Override
     public void teardown() throws Exception
     {
diff --git a/curator-x-discovery/pom.xml b/curator-x-discovery/pom.xml
index 824231d..50b2dea 100644
--- a/curator-x-discovery/pom.xml
+++ b/curator-x-discovery/pom.xml
@@ -80,4 +80,19 @@
             <scope>test</scope>
         </dependency>
     </dependencies>
+
+    <build>
+        <plugins>
+            <plugin>
+                <artifactId>maven-jar-plugin</artifactId>
+                <executions>
+                    <execution>
+                        <goals>
+                            <goal>test-jar</goal>
+                        </goals>
+                    </execution>
+                </executions>
+            </plugin>
+        </plugins>
+    </build>
 </project>
diff --git 
a/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/ServiceCache.java
 
b/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/ServiceCache.java
index a122d69..270005e 100644
--- 
a/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/ServiceCache.java
+++ 
b/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/ServiceCache.java
@@ -23,6 +23,7 @@ import 
org.apache.curator.x.discovery.details.InstanceProvider;
 import org.apache.curator.x.discovery.details.ServiceCacheListener;
 import java.io.Closeable;
 import java.util.List;
+import java.util.concurrent.CountDownLatch;
 
 public interface ServiceCache<T> extends Closeable, 
Listenable<ServiceCacheListener>, InstanceProvider<T>
 {
@@ -33,12 +34,25 @@ public interface ServiceCache<T> extends Closeable, 
Listenable<ServiceCacheListe
      *
      * @return the list
      */
-    public List<ServiceInstance<T>> getInstances();
+    List<ServiceInstance<T>> getInstances();
 
     /**
-     * The cache must be started before use
+     * The cache must be started before use. This method blocks while the 
internal
+     * cache is loaded.
      *
      * @throws Exception errors
      */
-    public void start() throws Exception;
+    void start() throws Exception;
+
+    /**
+     * The cache must be started before use. This version returns immediately.
+     * Use the returned latch to block until the cache is loaded
+     *
+     * @return a latch that can be used to block until the cache is loaded
+     * @throws Exception errors
+     */
+    default CountDownLatch startImmediate() throws Exception
+    {
+        throw new UnsupportedOperationException();
+    }
 }
diff --git 
a/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/ServiceProvider.java
 
b/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/ServiceProvider.java
index f542ed3..d09885b 100644
--- 
a/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/ServiceProvider.java
+++ 
b/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/ServiceProvider.java
@@ -23,6 +23,7 @@ import 
org.apache.curator.x.discovery.details.InstanceProvider;
 import java.io.Closeable;
 import java.io.IOException;
 import java.util.Collection;
+import java.util.concurrent.CountDownLatch;
 
 /**
  * The main API for Discovery. This class is essentially a facade over a 
{@link ProviderStrategy}
@@ -31,11 +32,24 @@ import java.util.Collection;
 public interface ServiceProvider<T> extends Closeable
 {
     /**
-     * The provider must be started before use
+     * The provider must be started before use. This method blocks while the 
internal
+     * cache is loaded.
      *
      * @throws Exception any errors
      */
-    public void start() throws Exception;
+    void start() throws Exception;
+
+    /**
+     * The provider must be started before use. This version returns 
immediately.
+     * Use the returned latch to block until the cache is loaded
+     *
+     * @return a latch that can be used to block until the cache is loaded
+     * @throws Exception errors
+     */
+    default CountDownLatch startImmediate() throws Exception
+    {
+        throw new UnsupportedOperationException();
+    }
 
     /**
      * Return an instance for a single use. <b>IMPORTANT: </b> users
@@ -44,7 +58,7 @@ public interface ServiceProvider<T> extends Closeable
      * @return the instance to use
      * @throws Exception any errors
      */
-    public ServiceInstance<T> getInstance() throws Exception;
+    ServiceInstance<T> getInstance() throws Exception;
 
     /**
      * Return the current available set of instances <b>IMPORTANT: </b> users
@@ -53,7 +67,7 @@ public interface ServiceProvider<T> extends Closeable
      * @return all known instances
      * @throws Exception any errors
      */
-    public Collection<ServiceInstance<T>> getAllInstances() throws Exception;
+    Collection<ServiceInstance<T>> getAllInstances() throws Exception;
 
     /**
      * Take note of an error connecting to the given instance. The instance 
will potentially
@@ -61,7 +75,7 @@ public interface ServiceProvider<T> extends Closeable
      *
      * @param instance instance that had an error
      */
-    public void noteError(ServiceInstance<T> instance);
+    void noteError(ServiceInstance<T> instance);
 
     /**
      * Close the provider. Note: it's the provider's responsibility to close 
any caches it manages
diff --git 
a/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceCacheBuilderImpl.java
 
b/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceCacheBuilderImpl.java
index 8922233..501e289 100644
--- 
a/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceCacheBuilderImpl.java
+++ 
b/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceCacheBuilderImpl.java
@@ -47,13 +47,13 @@ class ServiceCacheBuilderImpl<T> implements 
ServiceCacheBuilder<T>
     @Override
     public ServiceCache<T> build()
     {
-        if (executorService != null)
+        if (threadFactory != null)
         {
-            return new ServiceCacheImpl<T>(discovery, name, executorService);
+            return new ServiceCacheImpl<T>(discovery, name, threadFactory);
         }
         else
         {
-            return new ServiceCacheImpl<T>(discovery, name, threadFactory);
+            return new ServiceCacheImpl<T>(discovery, name, executorService);
         }
     }
 
diff --git 
a/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceCacheImpl.java
 
b/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceCacheImpl.java
index d1a31ad..a752ead 100644
--- 
a/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceCacheImpl.java
+++ 
b/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceCacheImpl.java
@@ -16,6 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
+
 package org.apache.curator.x.discovery.details;
 
 import com.google.common.annotations.VisibleForTesting;
@@ -23,14 +24,20 @@ import com.google.common.base.Function;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
-import org.apache.curator.utils.CloseableExecutorService;
-import org.apache.curator.utils.CloseableUtils;
 import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.EnsureContainers;
 import org.apache.curator.framework.listen.ListenerContainer;
 import org.apache.curator.framework.recipes.cache.ChildData;
-import org.apache.curator.framework.recipes.cache.PathChildrenCache;
+import org.apache.curator.framework.recipes.cache.CuratorCache;
+import org.apache.curator.framework.recipes.cache.CuratorCacheBridge;
+import org.apache.curator.framework.recipes.cache.CuratorCacheBuilder;
+import org.apache.curator.framework.recipes.cache.CuratorCacheListener;
+import org.apache.curator.framework.recipes.cache.CuratorCacheStorage;
 import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent;
 import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener;
+import org.apache.curator.utils.CloseableExecutorService;
+import org.apache.curator.utils.CloseableUtils;
+import org.apache.curator.utils.Compatibility;
 import org.apache.curator.utils.ZKPaths;
 import org.apache.curator.x.discovery.ServiceCache;
 import org.apache.curator.x.discovery.ServiceInstance;
@@ -45,17 +52,17 @@ import java.util.concurrent.atomic.AtomicReference;
 
 public class ServiceCacheImpl<T> implements ServiceCache<T>, 
PathChildrenCacheListener
 {
-    private final ListenerContainer<ServiceCacheListener>           
listenerContainer = new ListenerContainer<ServiceCacheListener>();
-    private final ServiceDiscoveryImpl<T>                           discovery;
-    private final AtomicReference<State>                            state = 
new AtomicReference<State>(State.LATENT);
-    private final PathChildrenCache                                 cache;
-    private final ConcurrentMap<String, ServiceInstance<T>>         instances 
= Maps.newConcurrentMap();
+    private final ListenerContainer<ServiceCacheListener> listenerContainer = 
new ListenerContainer<ServiceCacheListener>();
+    private final ServiceDiscoveryImpl<T> discovery;
+    private final AtomicReference<State> state = new 
AtomicReference<State>(State.LATENT);
+    private final CuratorCacheBridge cache;
+    private final ConcurrentMap<String, ServiceInstance<T>> instances = 
Maps.newConcurrentMap();
+    private final CountDownLatch initializedLatch = new CountDownLatch(1);
+    private String path;
 
     private enum State
     {
-        LATENT,
-        STARTED,
-        STOPPED
+        LATENT, STARTED, STOPPED
     }
 
     private static CloseableExecutorService convertThreadFactory(ThreadFactory 
threadFactory)
@@ -73,12 +80,20 @@ public class ServiceCacheImpl<T> implements 
ServiceCache<T>, PathChildrenCacheLi
     {
         Preconditions.checkNotNull(discovery, "discovery cannot be null");
         Preconditions.checkNotNull(name, "name cannot be null");
-        Preconditions.checkNotNull(executorService, "executorService cannot be 
null");
 
         this.discovery = discovery;
 
-        cache = new PathChildrenCache(discovery.getClient(), 
discovery.pathForName(name), true, false, executorService);
-        cache.getListenable().addListener(this);
+        path = discovery.pathForName(name);
+
+        CuratorCacheBuilder builder = 
CuratorCache.builder(discovery.getClient(), path);
+        if ( executorService != null )
+        {
+            builder.withExecutor(executorService::submit);
+        }
+        cache = builder.buildBridge();
+
+        CuratorCacheListener listener = 
CuratorCacheListener.builder().forPathChildrenCache(discovery.getClient(), 
this).forInitialized(this::initialized).build();
+        cache.listenable().addListener(listener);
     }
 
     @Override
@@ -94,9 +109,17 @@ public class ServiceCacheImpl<T> implements 
ServiceCache<T>, PathChildrenCacheLi
     @Override
     public void start() throws Exception
     {
+        startImmediate().await();
+    }
+
+    @Override
+    public CountDownLatch startImmediate() throws Exception
+    {
         Preconditions.checkState(state.compareAndSet(State.LATENT, 
State.STARTED), "Cannot be started more than once");
 
-        cache.start(true);
+        new EnsureContainers(discovery.getClient(), path).ensure();
+
+        cache.start();
         if ( debugStartLatch != null )
         {
             debugStartLatch.countDown();
@@ -108,14 +131,7 @@ public class ServiceCacheImpl<T> implements 
ServiceCache<T>, PathChildrenCacheLi
             debugStartWaitLatch = null;
         }
 
-        for ( ChildData childData : cache.getCurrentData() )
-        {
-            if ( childData.getData() != null )  // else already processed by 
the cache listener
-            {
-                addInstance(childData, true);
-            }
-        }
-        discovery.cacheOpened(this);
+        return initializedLatch;
     }
 
     @Override
@@ -123,18 +139,15 @@ public class ServiceCacheImpl<T> implements 
ServiceCache<T>, PathChildrenCacheLi
     {
         Preconditions.checkState(state.compareAndSet(State.STARTED, 
State.STOPPED), "Already closed or has not been started");
 
-        listenerContainer.forEach
-            (
-                new Function<ServiceCacheListener, Void>()
-                {
-                    @Override
-                    public Void apply(ServiceCacheListener listener)
-                    {
-                        
discovery.getClient().getConnectionStateListenable().removeListener(listener);
-                        return null;
-                    }
-                }
-            );
+        listenerContainer.forEach(new Function<ServiceCacheListener, Void>()
+        {
+            @Override
+            public Void apply(ServiceCacheListener listener)
+            {
+                
discovery.getClient().getConnectionStateListenable().removeListener(listener);
+                return null;
+            }
+        });
         listenerContainer.clear();
 
         CloseableUtils.closeQuietly(cache);
@@ -166,39 +179,42 @@ public class ServiceCacheImpl<T> implements 
ServiceCache<T>, PathChildrenCacheLi
     @Override
     public void childEvent(CuratorFramework client, PathChildrenCacheEvent 
event) throws Exception
     {
-        boolean         notifyListeners = false;
+        boolean notifyListeners = false;
         switch ( event.getType() )
         {
-            case CHILD_ADDED:
-            case CHILD_UPDATED:
+        case CHILD_ADDED:
+        case CHILD_UPDATED:
+        {
+            if ( !event.getData().getPath().equals(path) )
             {
-                addInstance(event.getData(), false);
+                addInstance(event.getData());
                 notifyListeners = true;
-                break;
             }
+            break;
+        }
 
-            case CHILD_REMOVED:
+        case CHILD_REMOVED:
+        {
+            if ( !event.getData().getPath().equals(path) )
             {
                 instances.remove(instanceIdFromData(event.getData()));
                 notifyListeners = true;
-                break;
             }
+            break;
+        }
         }
 
-        if ( notifyListeners )
+        if ( notifyListeners && (initializedLatch.getCount() == 0) )
         {
-            listenerContainer.forEach
-            (
-                new Function<ServiceCacheListener, Void>()
+            listenerContainer.forEach(new Function<ServiceCacheListener, 
Void>()
+            {
+                @Override
+                public Void apply(ServiceCacheListener listener)
                 {
-                    @Override
-                    public Void apply(ServiceCacheListener listener)
-                    {
-                        listener.cacheChanged();
-                        return null;
-                    }
+                    listener.cacheChanged();
+                    return null;
                 }
-            );
+            });
         }
     }
 
@@ -207,18 +223,23 @@ public class ServiceCacheImpl<T> implements 
ServiceCache<T>, PathChildrenCacheLi
         return ZKPaths.getNodeFromPath(childData.getPath());
     }
 
-    private void addInstance(ChildData childData, boolean onlyIfAbsent) throws 
Exception
+    private void addInstance(ChildData childData)
     {
-        String                  instanceId = instanceIdFromData(childData);
-        ServiceInstance<T>      serviceInstance = 
discovery.getSerializer().deserialize(childData.getData());
-        if ( onlyIfAbsent )
+        try
         {
-            instances.putIfAbsent(instanceId, serviceInstance);
+            String instanceId = instanceIdFromData(childData);
+            ServiceInstance<T> serviceInstance = 
discovery.getSerializer().deserialize(childData.getData());
+            instances.put(instanceId, serviceInstance);
         }
-        else
+        catch ( Exception e )
         {
-            instances.put(instanceId, serviceInstance);
+            throw new RuntimeException(e);
         }
-        cache.clearDataBytes(childData.getPath(), 
childData.getStat().getVersion());
+    }
+
+    private void initialized()
+    {
+        discovery.cacheOpened(this);
+        initializedLatch.countDown();
     }
 }
diff --git 
a/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceDiscoveryImpl.java
 
b/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceDiscoveryImpl.java
index 476705c..12b81f4 100644
--- 
a/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceDiscoveryImpl.java
+++ 
b/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceDiscoveryImpl.java
@@ -26,8 +26,10 @@ import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
 import org.apache.curator.framework.CuratorFramework;
-import org.apache.curator.framework.recipes.cache.NodeCache;
-import org.apache.curator.framework.recipes.cache.NodeCacheListener;
+import org.apache.curator.framework.recipes.cache.ChildData;
+import org.apache.curator.framework.recipes.cache.CuratorCache;
+import org.apache.curator.framework.recipes.cache.CuratorCacheBridge;
+import org.apache.curator.framework.recipes.cache.CuratorCacheListener;
 import org.apache.curator.framework.state.ConnectionState;
 import org.apache.curator.framework.state.ConnectionStateListener;
 import org.apache.curator.utils.CloseableUtils;
@@ -92,7 +94,7 @@ public class ServiceDiscoveryImpl<T> implements 
ServiceDiscovery<T>
     private static class Entry<T>
     {
         private volatile ServiceInstance<T> service;
-        private volatile NodeCache cache;
+        private volatile CuratorCacheBridge cache;
 
         private Entry(ServiceInstance<T> service)
         {
@@ -277,8 +279,7 @@ public class ServiceDiscoveryImpl<T> implements 
ServiceDiscovery<T>
     @Override
     public ServiceCacheBuilder<T> serviceCacheBuilder()
     {
-        return new ServiceCacheBuilderImpl<T>(this)
-            .threadFactory(ThreadUtils.newThreadFactory("ServiceCache"));
+        return new ServiceCacheBuilderImpl<T>(this);
     }
 
     /**
@@ -458,52 +459,47 @@ public class ServiceDiscoveryImpl<T> implements 
ServiceDiscovery<T>
         }
     }
 
-    private NodeCache makeNodeCache(final ServiceInstance<T> instance)
+    private CuratorCacheBridge makeNodeCache(final ServiceInstance<T> instance)
     {
         if ( !watchInstances )
         {
             return null;
         }
 
-        final NodeCache nodeCache = new NodeCache(client, 
pathForInstance(instance.getName(), instance.getId()));
-        try
-        {
-            nodeCache.start(true);
-        }
-        catch ( InterruptedException e)
-        {
-            Thread.currentThread().interrupt();
-            return null;
-        }
-        catch ( Exception e )
-        {
-            log.error("Could not start node cache for: " + instance, e);
-        }
-        NodeCacheListener listener = new NodeCacheListener()
-        {
-            @Override
-            public void nodeChanged() throws Exception
-            {
-                if ( nodeCache.getCurrentData() != null )
+        CuratorCacheBridge cache = CuratorCache.builder(client, 
pathForInstance(instance.getName(), instance.getId()))
+            .withOptions(CuratorCache.Options.SINGLE_NODE_CACHE)
+            .buildBridge();
+        CuratorCacheListener listener = CuratorCacheListener.builder()
+            .afterInitialized()
+            .forAll((__, ___, data) -> {
+                if ( data != null )
                 {
-                    ServiceInstance<T> newInstance = 
serializer.deserialize(nodeCache.getCurrentData().getData());
-                    Entry<T> entry = services.get(newInstance.getId());
-                    if ( entry != null )
+                    try
                     {
-                        synchronized(entry)
+                        ServiceInstance<T> newInstance = 
serializer.deserialize(data.getData());
+                        Entry<T> entry = services.get(newInstance.getId());
+                        if ( entry != null )
                         {
-                            entry.service = newInstance;
+                            synchronized(entry)
+                            {
+                                entry.service = newInstance;
+                            }
                         }
                     }
+                    catch ( Exception e )
+                    {
+                        log.debug("Could not deserialize: " + data.getPath());
+                    }
                 }
                 else
                 {
                     log.warn("Instance data has been deleted for: " + 
instance);
                 }
-            }
-        };
-        nodeCache.getListenable().addListener(listener);
-        return nodeCache;
+            })
+            .build();
+        cache.listenable().addListener(listener);
+        cache.start();
+        return cache;
     }
 
     private void internalUnregisterService(final Entry<T> entry) throws 
Exception
diff --git 
a/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceProviderImpl.java
 
b/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceProviderImpl.java
index 2ab1434..a411f33 100644
--- 
a/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceProviderImpl.java
+++ 
b/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceProviderImpl.java
@@ -29,6 +29,7 @@ import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.List;
+import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ThreadFactory;
 
 /**
@@ -76,6 +77,13 @@ public class ServiceProviderImpl<T> implements 
ServiceProvider<T>
         discovery.providerOpened(this);
     }
 
+    @Override
+    public CountDownLatch startImmediate() throws Exception
+    {
+        discovery.providerOpened(this);
+        return cache.startImmediate();
+    }
+
     /**
      * {@inheritDoc}
      */
diff --git 
a/curator-x-discovery/src/test/java/org/apache/curator/x/discovery/TestServiceCache.java
 
b/curator-x-discovery/src/test/java/org/apache/curator/x/discovery/TestServiceCache.java
index fda5c26..2a9d2d8 100644
--- 
a/curator-x-discovery/src/test/java/org/apache/curator/x/discovery/TestServiceCache.java
+++ 
b/curator-x-discovery/src/test/java/org/apache/curator/x/discovery/TestServiceCache.java
@@ -27,6 +27,7 @@ import org.apache.curator.retry.RetryOneTime;
 import org.apache.curator.test.BaseClassForTests;
 import org.apache.curator.test.ExecuteCalledWatchingExecutorService;
 import org.apache.curator.test.Timing;
+import org.apache.curator.test.compatibility.CuratorTestBase;
 import org.apache.curator.utils.CloseableUtils;
 import org.apache.curator.x.discovery.details.ServiceCacheListener;
 import org.testng.Assert;
@@ -40,6 +41,7 @@ import java.util.concurrent.Executors;
 import java.util.concurrent.Semaphore;
 import java.util.concurrent.TimeUnit;
 
+@Test(groups = CuratorTestBase.zk35CompatibilityGroup)
 public class TestServiceCache extends BaseClassForTests
 {
     @Test
diff --git 
a/curator-x-discovery/src/test/java/org/apache/curator/x/discovery/details/TestServiceDiscovery.java
 
b/curator-x-discovery/src/test/java/org/apache/curator/x/discovery/details/TestServiceDiscovery.java
index 54719a5..d86e193 100644
--- 
a/curator-x-discovery/src/test/java/org/apache/curator/x/discovery/details/TestServiceDiscovery.java
+++ 
b/curator-x-discovery/src/test/java/org/apache/curator/x/discovery/details/TestServiceDiscovery.java
@@ -26,6 +26,7 @@ import org.apache.curator.framework.CuratorFrameworkFactory;
 import org.apache.curator.retry.RetryOneTime;
 import org.apache.curator.test.BaseClassForTests;
 import org.apache.curator.test.Timing;
+import org.apache.curator.test.compatibility.CuratorTestBase;
 import org.apache.curator.utils.CloseableUtils;
 import org.apache.curator.utils.Compatibility;
 import org.apache.curator.x.discovery.ServiceDiscovery;
@@ -39,6 +40,7 @@ import java.util.List;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.Semaphore;
 
+@Test(groups = CuratorTestBase.zk35CompatibilityGroup)
 public class TestServiceDiscovery extends BaseClassForTests
 {
     private static final Comparator<ServiceInstance<Void>> comparator = new 
Comparator<ServiceInstance<Void>>()
diff --git 
a/curator-x-discovery/src/test/java/org/apache/curator/x/discovery/details/TestWatchedInstances.java
 
b/curator-x-discovery/src/test/java/org/apache/curator/x/discovery/details/TestWatchedInstances.java
index 2d03c47..a96ff6f 100644
--- 
a/curator-x-discovery/src/test/java/org/apache/curator/x/discovery/details/TestWatchedInstances.java
+++ 
b/curator-x-discovery/src/test/java/org/apache/curator/x/discovery/details/TestWatchedInstances.java
@@ -24,6 +24,7 @@ import org.apache.curator.framework.CuratorFrameworkFactory;
 import org.apache.curator.retry.RetryOneTime;
 import org.apache.curator.test.BaseClassForTests;
 import org.apache.curator.test.Timing;
+import org.apache.curator.test.compatibility.CuratorTestBase;
 import org.apache.curator.utils.CloseableUtils;
 import org.apache.curator.x.discovery.ServiceDiscovery;
 import org.apache.curator.x.discovery.ServiceDiscoveryBuilder;
@@ -35,6 +36,7 @@ import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
 
+@Test(groups = CuratorTestBase.zk35CompatibilityGroup)
 public class TestWatchedInstances extends BaseClassForTests
 {
     @Test
diff --git a/pom.xml b/pom.xml
index e223de6..00083c4 100644
--- a/pom.xml
+++ b/pom.xml
@@ -385,6 +385,13 @@
                 <groupId>org.apache.curator</groupId>
                 <artifactId>curator-x-discovery</artifactId>
                 <version>${project.version}</version>
+                <type>test-jar</type>
+            </dependency>
+
+            <dependency>
+                <groupId>org.apache.curator</groupId>
+                <artifactId>curator-x-discovery</artifactId>
+                <version>${project.version}</version>
             </dependency>
 
             <dependency>
@@ -400,6 +407,13 @@
             </dependency>
 
             <dependency>
+                <groupId>org.apache.curator</groupId>
+                <artifactId>curator-x-async</artifactId>
+                <version>${project.version}</version>
+                <type>test-jar</type>
+            </dependency>
+
+            <dependency>
                 <groupId>org.apache.commons</groupId>
                 <artifactId>commons-math</artifactId>
                 <version>${commons-math-version}</version>

Reply via email to