This is an automated email from the ASF dual-hosted git repository.

randgalt pushed a commit to branch CURATOR-549-zk36-persistent-watcher-bridge
in repository https://gitbox.apache.org/repos/asf/curator.git

commit 3043edf005406b56951b43e591e9cc5d79496566
Author: randgalt <randg...@apache.org>
AuthorDate: Sun Nov 3 17:53:02 2019 -0500

    CURATOR-549
    
    Creates a simple bridge that, when using ZK 3.6.0 creates a CuratorCache, 
and for earlier versions creates a TreeCache. The curator-test-zk35 module 
ensures that both code paths are tested.
---
 .../cache/CompatibleCuratorCacheBridge.java        | 113 +++++++++++++++++
 .../recipes/cache/CuratorCacheBridge.java          |  33 ++---
 .../recipes/cache/CuratorCacheBuilder.java         |   7 ++
 .../recipes/cache/CuratorCacheBuilderImpl.java     |  19 +++
 .../framework/recipes/cache/CuratorCacheImpl.java  |   2 +-
 .../recipes/cache/CuratorCacheStorage.java         |   6 +-
 .../cache/PathChildrenCacheListenerWrapper.java    |  11 +-
 .../curator/framework/recipes/cache/TreeCache.java |  36 ++++--
 curator-test-zk35/pom.xml                          |  46 +++++++
 curator-x-async/pom.xml                            |  15 +++
 .../x/async/modeled/details/ModeledCacheImpl.java  |  46 +++++--
 .../async/modeled/TestCachedModeledFramework.java  |   2 +
 .../x/async/modeled/TestModeledFrameworkBase.java  |   4 +-
 curator-x-discovery/pom.xml                        |  15 +++
 .../apache/curator/x/discovery/ServiceCache.java   |  17 ++-
 .../x/discovery/details/ServiceCacheImpl.java      | 133 +++++++++++----------
 .../curator/x/discovery/TestServiceCache.java      |   2 +
 pom.xml                                            |  14 +++
 18 files changed, 413 insertions(+), 108 deletions(-)

diff --git 
a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/CompatibleCuratorCacheBridge.java
 
b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/CompatibleCuratorCacheBridge.java
new file mode 100644
index 0000000..929c815
--- /dev/null
+++ 
b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/CompatibleCuratorCacheBridge.java
@@ -0,0 +1,113 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.curator.framework.recipes.cache;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Sets;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.listen.Listenable;
+import org.apache.curator.framework.listen.StandardListenerManager;
+import java.util.Collections;
+import java.util.Set;
+import java.util.concurrent.Executor;
+
+import static 
org.apache.curator.framework.recipes.cache.CuratorCacheListener.Type.*;
+
+class CompatibleCuratorCacheBridge implements CuratorCacheBridge, 
TreeCacheListener
+{
+    private final TreeCache cache;
+    private final StandardListenerManager<CuratorCacheListener> 
listenerManager = StandardListenerManager.standard();
+
+    CompatibleCuratorCacheBridge(CuratorFramework client, String path, 
CuratorCache.Options[] optionsArg, Executor executor)
+    {
+        Set<CuratorCache.Options> options = (optionsArg != null) ? 
Sets.newHashSet(optionsArg) : Collections.emptySet();
+        TreeCache.Builder builder = TreeCache.newBuilder(client, path);
+        if ( options.contains(CuratorCache.Options.SINGLE_NODE_CACHE) )
+        {
+            builder.setMaxDepth(0);
+        }
+        if ( options.contains(CuratorCache.Options.COMPRESSED_DATA) )
+        {
+            builder.setDataIsCompressed(true);
+        }
+        if ( executor != null )
+        {
+            builder.setExecutor(executor);
+        }
+        cache = builder.build();
+    }
+
+    @Override
+    public void start()
+    {
+        try
+        {
+            cache.getListenable().addListener(this);
+
+            cache.start();
+        }
+        catch ( Exception e )
+        {
+            throw new RuntimeException(e);
+        }
+    }
+
+    @Override
+    public void close()
+    {
+        cache.close();
+    }
+
+    @Override
+    public Listenable<CuratorCacheListener> listenable()
+    {
+        return listenerManager;
+    }
+
+    @Override
+    public void childEvent(CuratorFramework client, TreeCacheEvent event) 
throws Exception
+    {
+        switch ( event.getType() )
+        {
+            case NODE_ADDED:
+            {
+                listenerManager.forEach(listener -> 
listener.event(NODE_CREATED, null, event.getData()));
+                break;
+            }
+
+            case NODE_REMOVED:
+            {
+                listenerManager.forEach(listener -> 
listener.event(NODE_DELETED, event.getData(), null));
+                break;
+            }
+
+            case NODE_UPDATED:
+            {
+                listenerManager.forEach(listener -> 
listener.event(NODE_CHANGED, null, event.getData()));
+                break;
+            }
+
+            case INITIALIZED:
+            {
+                listenerManager.forEach(CuratorCacheListener::initialized);
+                break;
+            }
+        }
+    }
+}
diff --git 
a/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/ServiceCache.java
 
b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/CuratorCacheBridge.java
similarity index 54%
copy from 
curator-x-discovery/src/main/java/org/apache/curator/x/discovery/ServiceCache.java
copy to 
curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/CuratorCacheBridge.java
index a122d69..7103877 100644
--- 
a/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/ServiceCache.java
+++ 
b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/CuratorCacheBridge.java
@@ -16,29 +16,34 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.curator.x.discovery;
+package org.apache.curator.framework.recipes.cache;
 
 import org.apache.curator.framework.listen.Listenable;
-import org.apache.curator.x.discovery.details.InstanceProvider;
-import org.apache.curator.x.discovery.details.ServiceCacheListener;
 import java.io.Closeable;
-import java.util.List;
 
-public interface ServiceCache<T> extends Closeable, 
Listenable<ServiceCacheListener>, InstanceProvider<T>
+/**
+ * A facade that uses {@link 
org.apache.curator.framework.recipes.cache.CuratorCache} if
+ * persistent watches are available or a {@link 
org.apache.curator.framework.recipes.cache.TreeCache}
+ * otherwise
+ */
+public interface CuratorCacheBridge extends Closeable
 {
     /**
-     * Return the current list of instances. NOTE: there is no guarantee of 
freshness. This is
-     * merely the last known list of instances. However, the list is updated 
via a ZooKeeper watcher
-     * so it should be fresh within a window of a second or two.
-     *
-     * @return the list
+     * Start the cache. This will cause a complete refresh from the cache's 
root node and generate
+     * events for all nodes found, etc.
+     */
+    void start();
+
+    /**
+     * Close the cache, stop responding to events, etc.
      */
-    public List<ServiceInstance<T>> getInstances();
+    @Override
+    void close();
 
     /**
-     * The cache must be started before use
+     * Return the listener container so that listeners can be registered to be 
notified of changes to the cache
      *
-     * @throws Exception errors
+     * @return listener container
      */
-    public void start() throws Exception;
+    Listenable<CuratorCacheListener> listenable();
 }
diff --git 
a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/CuratorCacheBuilder.java
 
b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/CuratorCacheBuilder.java
index 35a5f26..ab80a6f 100644
--- 
a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/CuratorCacheBuilder.java
+++ 
b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/CuratorCacheBuilder.java
@@ -60,4 +60,11 @@ public interface CuratorCacheBuilder
      * @return new Curator Cache
      */
     CuratorCache build();
+
+    /**
+     * Return a new bridge cache based on the builder methods that have been 
called.
+     *
+     * @return new bridge cache
+     */
+    CuratorCacheBridge buildBridge();
 }
\ No newline at end of file
diff --git 
a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/CuratorCacheBuilderImpl.java
 
b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/CuratorCacheBuilderImpl.java
index 9f9e03d..6445187 100644
--- 
a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/CuratorCacheBuilderImpl.java
+++ 
b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/CuratorCacheBuilderImpl.java
@@ -19,7 +19,9 @@
 
 package org.apache.curator.framework.recipes.cache;
 
+import com.google.common.base.Preconditions;
 import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.utils.Compatibility;
 import java.util.concurrent.Executor;
 import java.util.function.Consumer;
 
@@ -69,6 +71,23 @@ class CuratorCacheBuilderImpl implements CuratorCacheBuilder
     @Override
     public CuratorCache build()
     {
+        return internalBuild();
+    }
+
+    @Override
+    public CuratorCacheBridge buildBridge()
+    {
+        if ( Compatibility.hasPersistentWatchers() )
+        {
+            return internalBuild();
+        }
+        Preconditions.checkArgument(exceptionHandler == null, 
"ExceptionHandler is not supported by the TreeCache bridge");
+        Preconditions.checkArgument(storage == null, "Custom 
CuratorCacheStorage is not supported by the TreeCache bridge");
+        return new CompatibleCuratorCacheBridge(client, path, options, 
executor);
+    }
+
+    private CuratorCacheImpl internalBuild()
+    {
         return new CuratorCacheImpl(client, storage, path, options, executor, 
exceptionHandler);
     }
 }
\ No newline at end of file
diff --git 
a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/CuratorCacheImpl.java
 
b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/CuratorCacheImpl.java
index 8916399..1e62a39 100644
--- 
a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/CuratorCacheImpl.java
+++ 
b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/CuratorCacheImpl.java
@@ -44,7 +44,7 @@ import static 
org.apache.curator.framework.recipes.cache.CuratorCacheListener.Ty
 import static org.apache.zookeeper.KeeperException.Code.NONODE;
 import static org.apache.zookeeper.KeeperException.Code.OK;
 
-class CuratorCacheImpl implements CuratorCache
+class CuratorCacheImpl implements CuratorCache, CuratorCacheBridge
 {
     private final Logger log = LoggerFactory.getLogger(getClass());
     private final AtomicReference<State> state = new 
AtomicReference<>(State.LATENT);
diff --git 
a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/CuratorCacheStorage.java
 
b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/CuratorCacheStorage.java
index f3d870d..e809263 100644
--- 
a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/CuratorCacheStorage.java
+++ 
b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/CuratorCacheStorage.java
@@ -34,7 +34,8 @@ public interface CuratorCacheStorage
      *
      * @return storage instance
      */
-    static CuratorCacheStorage standard() {
+    static CuratorCacheStorage standard()
+    {
         return new StandardCuratorCacheStorage(true);
     }
 
@@ -44,7 +45,8 @@ public interface CuratorCacheStorage
      *
      * @return storage instance that does not retain data bytes
      */
-    static CuratorCacheStorage bytesNotCached() {
+    static CuratorCacheStorage bytesNotCached()
+    {
         return new StandardCuratorCacheStorage(false);
     }
 
diff --git 
a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/PathChildrenCacheListenerWrapper.java
 
b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/PathChildrenCacheListenerWrapper.java
index a9123c1..7e9730c 100644
--- 
a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/PathChildrenCacheListenerWrapper.java
+++ 
b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/PathChildrenCacheListenerWrapper.java
@@ -39,19 +39,19 @@ class PathChildrenCacheListenerWrapper implements 
CuratorCacheListener
         {
             case NODE_CREATED:
             {
-                sendEvent(data, PathChildrenCacheEvent.Type.CHILD_ADDED);
+                sendEvent(new 
PathChildrenCacheEvent(PathChildrenCacheEvent.Type.CHILD_ADDED, data));
                 break;
             }
 
             case NODE_CHANGED:
             {
-                sendEvent(data, PathChildrenCacheEvent.Type.CHILD_UPDATED);
+                sendEvent(new 
PathChildrenCacheEvent(PathChildrenCacheEvent.Type.CHILD_UPDATED, data));
                 break;
             }
 
             case NODE_DELETED:
             {
-                sendEvent(oldData, PathChildrenCacheEvent.Type.CHILD_REMOVED);
+                sendEvent(new 
PathChildrenCacheEvent(PathChildrenCacheEvent.Type.CHILD_REMOVED, oldData));
                 break;
             }
         }
@@ -60,12 +60,11 @@ class PathChildrenCacheListenerWrapper implements 
CuratorCacheListener
     @Override
     public void initialized()
     {
-        sendEvent(null, PathChildrenCacheEvent.Type.INITIALIZED);
+        sendEvent(new 
PathChildrenCacheEvent(PathChildrenCacheEvent.Type.INITIALIZED, null));
     }
 
-    private void sendEvent(ChildData node, PathChildrenCacheEvent.Type type)
+    private void sendEvent(PathChildrenCacheEvent event)
     {
-        PathChildrenCacheEvent event = new PathChildrenCacheEvent(type, node);
         try
         {
             listener.childEvent(client, event);
diff --git 
a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/TreeCache.java
 
b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/TreeCache.java
index f42c1d5..4badb30 100644
--- 
a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/TreeCache.java
+++ 
b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/TreeCache.java
@@ -52,6 +52,7 @@ import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.Executor;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ThreadFactory;
@@ -75,6 +76,7 @@ import static org.apache.curator.utils.PathUtils.validatePath;
 public class TreeCache implements Closeable
 {
     private static final Logger LOG = LoggerFactory.getLogger(TreeCache.class);
+    private final Executor executor;
     private final boolean createParentNodes;
     private final boolean disableZkWatches;
     private final TreeCacheSelector selector;
@@ -86,6 +88,7 @@ public class TreeCache implements Closeable
         private boolean cacheData = true;
         private boolean dataIsCompressed = false;
         private ExecutorService executorService = null;
+        private Executor executor = null;
         private int maxDepth = Integer.MAX_VALUE;
         private boolean createParentNodes = false;
         private boolean disableZkWatches = false;
@@ -102,12 +105,12 @@ public class TreeCache implements Closeable
          */
         public TreeCache build()
         {
-            ExecutorService executor = executorService;
-            if ( executor == null )
+            ExecutorService localExecutorService = executorService;
+            if ( (localExecutorService == null) && (executor == null) )
             {
-                executor = 
Executors.newSingleThreadExecutor(defaultThreadFactory);
+                localExecutorService = 
Executors.newSingleThreadExecutor(defaultThreadFactory);
             }
-            return new TreeCache(client, path, cacheData, dataIsCompressed, 
maxDepth, executor, createParentNodes, disableZkWatches, selector);
+            return new TreeCache(client, path, cacheData, dataIsCompressed, 
maxDepth, localExecutorService, executor, createParentNodes, disableZkWatches, 
selector);
         }
 
         /**
@@ -146,6 +149,15 @@ public class TreeCache implements Closeable
         }
 
         /**
+         * Sets the executor to publish events; a default executor will be 
created if not specified.
+         */
+        public Builder setExecutor(Executor executor)
+        {
+            this.executor = checkNotNull(executor);
+            return this;
+        }
+
+        /**
          * Sets the maximum depth to explore/watch.  A {@code maxDepth} of 
{@code 0} will watch only
          * the root node (like {@link NodeCache}); a {@code maxDepth} of 
{@code 1} will watch the
          * root node and its immediate children (kind of like {@link 
PathChildrenCache}.
@@ -561,7 +573,7 @@ public class TreeCache implements Closeable
      */
     public TreeCache(CuratorFramework client, String path)
     {
-        this(client, path, true, false, Integer.MAX_VALUE, 
Executors.newSingleThreadExecutor(defaultThreadFactory), false, false, new 
DefaultTreeCacheSelector());
+        this(client, path, true, false, Integer.MAX_VALUE, 
Executors.newSingleThreadExecutor(defaultThreadFactory), null, false, false, 
new DefaultTreeCacheSelector());
     }
 
     /**
@@ -570,12 +582,14 @@ public class TreeCache implements Closeable
      * @param cacheData        if true, node contents are cached in addition 
to the stat
      * @param dataIsCompressed if true, data in the path is compressed
      * @param executorService  Closeable ExecutorService to use for the 
TreeCache's background thread
+     * @param executor          executor to use for the TreeCache's background 
thread
      * @param createParentNodes true to create parent nodes as containers
      * @param disableZkWatches true to disable Zookeeper watches
      * @param selector         the selector to use
      */
-    TreeCache(CuratorFramework client, String path, boolean cacheData, boolean 
dataIsCompressed, int maxDepth, final ExecutorService executorService, boolean 
createParentNodes, boolean disableZkWatches, TreeCacheSelector selector)
+    TreeCache(CuratorFramework client, String path, boolean cacheData, boolean 
dataIsCompressed, int maxDepth, final ExecutorService executorService, final 
Executor executor, boolean createParentNodes, boolean disableZkWatches, 
TreeCacheSelector selector)
     {
+        this.executor = executor;
         this.createParentNodes = createParentNodes;
         this.selector = Preconditions.checkNotNull(selector, "selector cannot 
be null");
         this.root = new TreeNode(validatePath(path), null);
@@ -585,7 +599,7 @@ public class TreeCache implements Closeable
         this.dataIsCompressed = dataIsCompressed;
         this.maxDepth = maxDepth;
         this.disableZkWatches = disableZkWatches;
-        this.executorService = Preconditions.checkNotNull(executorService, 
"executorService cannot be null");
+        this.executorService = executorService;
     }
 
     /**
@@ -620,7 +634,10 @@ public class TreeCache implements Closeable
             client.removeWatchers();
             
client.getConnectionStateListenable().removeListener(connectionStateListener);
             listeners.clear();
-            executorService.shutdown();
+            if ( executorService != null )
+            {
+                executorService.shutdown();
+            }
             try
             {
                 root.wasDeleted();
@@ -854,8 +871,9 @@ public class TreeCache implements Closeable
     {
         if ( treeState.get() != TreeState.CLOSED )
         {
+            Executor localExecutor = (executorService != null) ? 
executorService : executor;
             LOG.debug("publishEvent: {}", event);
-            executorService.submit(new Runnable()
+            localExecutor.execute(new Runnable()
             {
                 @Override
                 public void run()
diff --git a/curator-test-zk35/pom.xml b/curator-test-zk35/pom.xml
index 5803893..4ca8356 100644
--- a/curator-test-zk35/pom.xml
+++ b/curator-test-zk35/pom.xml
@@ -89,6 +89,18 @@
 
         <dependency>
             <groupId>org.apache.curator</groupId>
+            <artifactId>curator-x-discovery</artifactId>
+            <exclusions>
+                <exclusion>
+                    <groupId>org.apache.zookeeper</groupId>
+                    <artifactId>zookeeper</artifactId>
+                </exclusion>
+            </exclusions>
+            <scope>test</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.curator</groupId>
             <artifactId>curator-recipes</artifactId>
             <exclusions>
                 <exclusion>
@@ -114,6 +126,32 @@
         </dependency>
 
         <dependency>
+            <groupId>org.apache.curator</groupId>
+            <artifactId>curator-x-async</artifactId>
+            <exclusions>
+                <exclusion>
+                    <groupId>org.apache.zookeeper</groupId>
+                    <artifactId>zookeeper</artifactId>
+                </exclusion>
+            </exclusions>
+            <type>test-jar</type>
+            <scope>test</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.curator</groupId>
+            <artifactId>curator-x-discovery</artifactId>
+            <exclusions>
+                <exclusion>
+                    <groupId>org.apache.zookeeper</groupId>
+                    <artifactId>zookeeper</artifactId>
+                </exclusion>
+            </exclusions>
+            <type>test-jar</type>
+            <scope>test</scope>
+        </dependency>
+
+        <dependency>
             <groupId>org.testng</groupId>
             <artifactId>testng</artifactId>
             <scope>test</scope>
@@ -124,6 +162,12 @@
             <artifactId>slf4j-log4j12</artifactId>
             <scope>test</scope>
         </dependency>
+
+        <dependency>
+            <groupId>com.fasterxml.jackson.core</groupId>
+            <artifactId>jackson-databind</artifactId>
+            <scope>test</scope>
+        </dependency>
     </dependencies>
 
     <build>
@@ -151,6 +195,8 @@
                     <dependenciesToScan>
                         
<dependency>org.apache.curator:curator-framework</dependency>
                         
<dependency>org.apache.curator:curator-recipes</dependency>
+                        
<dependency>org.apache.curator:curator-x-async</dependency>
+                        
<dependency>org.apache.curator:curator-x-discovery</dependency>
                     </dependenciesToScan>
                     <groups>zk35,zk35Compatibility</groups>
                     <excludedGroups>zk36</excludedGroups>
diff --git a/curator-x-async/pom.xml b/curator-x-async/pom.xml
index 5ffd774..a32dbd3 100644
--- a/curator-x-async/pom.xml
+++ b/curator-x-async/pom.xml
@@ -49,4 +49,19 @@
             <scope>test</scope>
         </dependency>
     </dependencies>
+
+    <build>
+        <plugins>
+            <plugin>
+                <artifactId>maven-jar-plugin</artifactId>
+                <executions>
+                    <execution>
+                        <goals>
+                            <goal>test-jar</goal>
+                        </goals>
+                    </execution>
+                </executions>
+            </plugin>
+        </plugins>
+    </build>
 </project>
\ No newline at end of file
diff --git 
a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/ModeledCacheImpl.java
 
b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/ModeledCacheImpl.java
index b95e92d..d79d66c 100644
--- 
a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/ModeledCacheImpl.java
+++ 
b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/ModeledCacheImpl.java
@@ -19,34 +19,42 @@
 package org.apache.curator.x.async.modeled.details;
 
 import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.EnsureContainers;
 import org.apache.curator.framework.listen.Listenable;
 import org.apache.curator.framework.listen.ListenerContainer;
-import org.apache.curator.framework.recipes.cache.TreeCache;
+import org.apache.curator.framework.recipes.cache.CuratorCache;
+import org.apache.curator.framework.recipes.cache.CuratorCacheBridge;
+import org.apache.curator.framework.recipes.cache.CuratorCacheBuilder;
+import org.apache.curator.framework.recipes.cache.CuratorCacheListener;
 import org.apache.curator.framework.recipes.cache.TreeCacheEvent;
 import org.apache.curator.framework.recipes.cache.TreeCacheListener;
 import org.apache.curator.utils.ThreadUtils;
+import org.apache.curator.utils.ZKPaths;
 import org.apache.curator.x.async.api.CreateOption;
 import org.apache.curator.x.async.modeled.ModelSerializer;
 import org.apache.curator.x.async.modeled.ModelSpec;
+import org.apache.curator.x.async.modeled.ZNode;
 import org.apache.curator.x.async.modeled.ZPath;
 import org.apache.curator.x.async.modeled.cached.ModeledCache;
 import org.apache.curator.x.async.modeled.cached.ModeledCacheListener;
-import org.apache.curator.x.async.modeled.ZNode;
 import org.apache.zookeeper.data.Stat;
 import java.util.AbstractMap;
 import java.util.Map;
 import java.util.Optional;
+import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutorService;
 import java.util.stream.Collectors;
 
 class ModeledCacheImpl<T> implements TreeCacheListener, ModeledCache<T>
 {
-    private final TreeCache cache;
+    private final CuratorCacheBridge cache;
     private final Map<ZPath, Entry<T>> entries = new ConcurrentHashMap<>();
     private final ModelSerializer<T> serializer;
     private final ListenerContainer<ModeledCacheListener<T>> listenerContainer 
= new ListenerContainer<>();
     private final ZPath basePath;
+    private final CuratorFramework client;
+    private final Set<CreateOption> options;
 
     private static final class Entry<T>
     {
@@ -62,6 +70,7 @@ class ModeledCacheImpl<T> implements TreeCacheListener, 
ModeledCache<T>
 
     ModeledCacheImpl(CuratorFramework client, ModelSpec<T> modelSpec, 
ExecutorService executor)
     {
+        this.client = client;
         if ( !modelSpec.path().isResolved() && !modelSpec.path().isRoot() && 
modelSpec.path().parent().isResolved() )
         {
             modelSpec = modelSpec.parent(); // i.e. the last item is a 
parameter
@@ -69,19 +78,35 @@ class ModeledCacheImpl<T> implements TreeCacheListener, 
ModeledCache<T>
 
         basePath = modelSpec.path();
         this.serializer = modelSpec.serializer();
-        cache = TreeCache.newBuilder(client, basePath.fullPath())
-            .setCacheData(false)
-            
.setDataIsCompressed(modelSpec.createOptions().contains(CreateOption.compress))
-            .setExecutor(executor)
-            
.setCreateParentNodes(modelSpec.createOptions().contains(CreateOption.createParentsIfNeeded)
 || modelSpec.createOptions().contains(CreateOption.createParentsAsContainers))
-            .build();
+        options = modelSpec.createOptions();
+        CuratorCacheBuilder builder = CuratorCache.builder(client, 
basePath.fullPath());
+        if ( modelSpec.createOptions().contains(CreateOption.compress) )
+        {
+            builder.withOptions(CuratorCache.Options.COMPRESSED_DATA);
+        }
+        cache = builder.withExecutor(executor).buildBridge();
     }
 
     public void start()
     {
+        CuratorCacheListener listener = CuratorCacheListener.builder()
+            .forTreeCache(client, this)
+            .build();
         try
         {
-            cache.getListenable().addListener(this);
+            if ( options.contains(CreateOption.createParentsIfNeeded) )
+            {
+                if ( options.contains(CreateOption.createParentsAsContainers) )
+                {
+                    new EnsureContainers(client, basePath.fullPath()).ensure();
+                }
+                else
+                {
+                    ZKPaths.mkdirs(client.getZookeeperClient().getZooKeeper(), 
basePath.fullPath(), false, null, false);
+                }
+            }
+
+            cache.listenable().addListener(listener);
             cache.start();
         }
         catch ( Exception e )
@@ -92,7 +117,6 @@ class ModeledCacheImpl<T> implements TreeCacheListener, 
ModeledCache<T>
 
     public void close()
     {
-        cache.getListenable().removeListener(this);
         cache.close();
         entries.clear();
     }
diff --git 
a/curator-x-async/src/test/java/org/apache/curator/x/async/modeled/TestCachedModeledFramework.java
 
b/curator-x-async/src/test/java/org/apache/curator/x/async/modeled/TestCachedModeledFramework.java
index 2d33c13..830ee2b 100644
--- 
a/curator-x-async/src/test/java/org/apache/curator/x/async/modeled/TestCachedModeledFramework.java
+++ 
b/curator-x-async/src/test/java/org/apache/curator/x/async/modeled/TestCachedModeledFramework.java
@@ -21,6 +21,7 @@ package org.apache.curator.x.async.modeled;
 import com.google.common.collect.Sets;
 import org.apache.curator.framework.state.ConnectionState;
 import org.apache.curator.test.Timing;
+import org.apache.curator.test.compatibility.CuratorTestBase;
 import org.apache.curator.x.async.modeled.cached.CachedModeledFramework;
 import org.apache.curator.x.async.modeled.cached.ModeledCacheListener;
 import org.apache.curator.x.async.modeled.models.TestModel;
@@ -38,6 +39,7 @@ import java.util.function.Function;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
+@Test(groups = CuratorTestBase.zk35CompatibilityGroup)
 public class TestCachedModeledFramework extends TestModeledFrameworkBase
 {
     @Test
diff --git 
a/curator-x-async/src/test/java/org/apache/curator/x/async/modeled/TestModeledFrameworkBase.java
 
b/curator-x-async/src/test/java/org/apache/curator/x/async/modeled/TestModeledFrameworkBase.java
index 61a4570..5660539 100644
--- 
a/curator-x-async/src/test/java/org/apache/curator/x/async/modeled/TestModeledFrameworkBase.java
+++ 
b/curator-x-async/src/test/java/org/apache/curator/x/async/modeled/TestModeledFrameworkBase.java
@@ -37,7 +37,7 @@ public class TestModeledFrameworkBase extends 
CompletableBaseClassForTests
     protected ModelSpec<TestNewerModel> newModelSpec;
     protected AsyncCuratorFramework async;
 
-    @BeforeMethod
+    @BeforeMethod(alwaysRun = true)
     @Override
     public void setup() throws Exception
     {
@@ -54,7 +54,7 @@ public class TestModeledFrameworkBase extends 
CompletableBaseClassForTests
         newModelSpec = ModelSpec.builder(path, newSerializer).build();
     }
 
-    @AfterMethod
+    @AfterMethod(alwaysRun = true)
     @Override
     public void teardown() throws Exception
     {
diff --git a/curator-x-discovery/pom.xml b/curator-x-discovery/pom.xml
index 824231d..50b2dea 100644
--- a/curator-x-discovery/pom.xml
+++ b/curator-x-discovery/pom.xml
@@ -80,4 +80,19 @@
             <scope>test</scope>
         </dependency>
     </dependencies>
+
+    <build>
+        <plugins>
+            <plugin>
+                <artifactId>maven-jar-plugin</artifactId>
+                <executions>
+                    <execution>
+                        <goals>
+                            <goal>test-jar</goal>
+                        </goals>
+                    </execution>
+                </executions>
+            </plugin>
+        </plugins>
+    </build>
 </project>
diff --git 
a/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/ServiceCache.java
 
b/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/ServiceCache.java
index a122d69..ddae689 100644
--- 
a/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/ServiceCache.java
+++ 
b/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/ServiceCache.java
@@ -23,6 +23,7 @@ import 
org.apache.curator.x.discovery.details.InstanceProvider;
 import org.apache.curator.x.discovery.details.ServiceCacheListener;
 import java.io.Closeable;
 import java.util.List;
+import java.util.concurrent.CountDownLatch;
 
 public interface ServiceCache<T> extends Closeable, 
Listenable<ServiceCacheListener>, InstanceProvider<T>
 {
@@ -33,12 +34,22 @@ public interface ServiceCache<T> extends Closeable, 
Listenable<ServiceCacheListe
      *
      * @return the list
      */
-    public List<ServiceInstance<T>> getInstances();
+    List<ServiceInstance<T>> getInstances();
 
     /**
-     * The cache must be started before use
+     * The cache must be started before use. This method blocks while the 
internal
+     * cache is loaded.
      *
      * @throws Exception errors
      */
-    public void start() throws Exception;
+    void start() throws Exception;
+
+    /**
+     * The cache must be started before use. This version returns immediately.
+     * Use the returned latch to block until the cache is loaded
+     *
+     * @return a latch that can be used to block until the cache is loaded
+     * @throws Exception errors
+     */
+    CountDownLatch startImmediate() throws Exception;
 }
diff --git 
a/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceCacheImpl.java
 
b/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceCacheImpl.java
index d1a31ad..c108713 100644
--- 
a/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceCacheImpl.java
+++ 
b/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceCacheImpl.java
@@ -16,6 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
+
 package org.apache.curator.x.discovery.details;
 
 import com.google.common.annotations.VisibleForTesting;
@@ -23,14 +24,18 @@ import com.google.common.base.Function;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
-import org.apache.curator.utils.CloseableExecutorService;
-import org.apache.curator.utils.CloseableUtils;
 import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.EnsureContainers;
 import org.apache.curator.framework.listen.ListenerContainer;
 import org.apache.curator.framework.recipes.cache.ChildData;
-import org.apache.curator.framework.recipes.cache.PathChildrenCache;
+import org.apache.curator.framework.recipes.cache.CuratorCache;
+import org.apache.curator.framework.recipes.cache.CuratorCacheBridge;
+import org.apache.curator.framework.recipes.cache.CuratorCacheListener;
+import org.apache.curator.framework.recipes.cache.CuratorCacheStorage;
 import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent;
 import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener;
+import org.apache.curator.utils.CloseableExecutorService;
+import org.apache.curator.utils.CloseableUtils;
 import org.apache.curator.utils.ZKPaths;
 import org.apache.curator.x.discovery.ServiceCache;
 import org.apache.curator.x.discovery.ServiceInstance;
@@ -45,17 +50,17 @@ import java.util.concurrent.atomic.AtomicReference;
 
 public class ServiceCacheImpl<T> implements ServiceCache<T>, 
PathChildrenCacheListener
 {
-    private final ListenerContainer<ServiceCacheListener>           
listenerContainer = new ListenerContainer<ServiceCacheListener>();
-    private final ServiceDiscoveryImpl<T>                           discovery;
-    private final AtomicReference<State>                            state = 
new AtomicReference<State>(State.LATENT);
-    private final PathChildrenCache                                 cache;
-    private final ConcurrentMap<String, ServiceInstance<T>>         instances 
= Maps.newConcurrentMap();
+    private final ListenerContainer<ServiceCacheListener> listenerContainer = 
new ListenerContainer<ServiceCacheListener>();
+    private final ServiceDiscoveryImpl<T> discovery;
+    private final AtomicReference<State> state = new 
AtomicReference<State>(State.LATENT);
+    private final CuratorCacheBridge cache;
+    private final ConcurrentMap<String, ServiceInstance<T>> instances = 
Maps.newConcurrentMap();
+    private final CountDownLatch initializedLatch = new CountDownLatch(1);
+    private String path;
 
     private enum State
     {
-        LATENT,
-        STARTED,
-        STOPPED
+        LATENT, STARTED, STOPPED
     }
 
     private static CloseableExecutorService convertThreadFactory(ThreadFactory 
threadFactory)
@@ -77,8 +82,10 @@ public class ServiceCacheImpl<T> implements ServiceCache<T>, 
PathChildrenCacheLi
 
         this.discovery = discovery;
 
-        cache = new PathChildrenCache(discovery.getClient(), 
discovery.pathForName(name), true, false, executorService);
-        cache.getListenable().addListener(this);
+        path = discovery.pathForName(name);
+        cache = CuratorCache.builder(discovery.getClient(), 
path).withExecutor(executorService::submit).withStorage(CuratorCacheStorage.bytesNotCached()).buildBridge();
+        CuratorCacheListener listener = 
CuratorCacheListener.builder().forPathChildrenCache(discovery.getClient(), 
this).forInitialized(this::initialized).build();
+        cache.listenable().addListener(listener);
     }
 
     @Override
@@ -94,9 +101,17 @@ public class ServiceCacheImpl<T> implements 
ServiceCache<T>, PathChildrenCacheLi
     @Override
     public void start() throws Exception
     {
+        startImmediate().await();
+    }
+
+    @Override
+    public CountDownLatch startImmediate() throws Exception
+    {
         Preconditions.checkState(state.compareAndSet(State.LATENT, 
State.STARTED), "Cannot be started more than once");
 
-        cache.start(true);
+        new EnsureContainers(discovery.getClient(), path).ensure();
+
+        cache.start();
         if ( debugStartLatch != null )
         {
             debugStartLatch.countDown();
@@ -108,14 +123,7 @@ public class ServiceCacheImpl<T> implements 
ServiceCache<T>, PathChildrenCacheLi
             debugStartWaitLatch = null;
         }
 
-        for ( ChildData childData : cache.getCurrentData() )
-        {
-            if ( childData.getData() != null )  // else already processed by 
the cache listener
-            {
-                addInstance(childData, true);
-            }
-        }
-        discovery.cacheOpened(this);
+        return initializedLatch;
     }
 
     @Override
@@ -123,18 +131,15 @@ public class ServiceCacheImpl<T> implements 
ServiceCache<T>, PathChildrenCacheLi
     {
         Preconditions.checkState(state.compareAndSet(State.STARTED, 
State.STOPPED), "Already closed or has not been started");
 
-        listenerContainer.forEach
-            (
-                new Function<ServiceCacheListener, Void>()
-                {
-                    @Override
-                    public Void apply(ServiceCacheListener listener)
-                    {
-                        
discovery.getClient().getConnectionStateListenable().removeListener(listener);
-                        return null;
-                    }
-                }
-            );
+        listenerContainer.forEach(new Function<ServiceCacheListener, Void>()
+        {
+            @Override
+            public Void apply(ServiceCacheListener listener)
+            {
+                
discovery.getClient().getConnectionStateListenable().removeListener(listener);
+                return null;
+            }
+        });
         listenerContainer.clear();
 
         CloseableUtils.closeQuietly(cache);
@@ -166,39 +171,42 @@ public class ServiceCacheImpl<T> implements 
ServiceCache<T>, PathChildrenCacheLi
     @Override
     public void childEvent(CuratorFramework client, PathChildrenCacheEvent 
event) throws Exception
     {
-        boolean         notifyListeners = false;
+        boolean notifyListeners = false;
         switch ( event.getType() )
         {
-            case CHILD_ADDED:
-            case CHILD_UPDATED:
+        case CHILD_ADDED:
+        case CHILD_UPDATED:
+        {
+            if ( !event.getData().getPath().equals(path) )
             {
-                addInstance(event.getData(), false);
+                addInstance(event.getData());
                 notifyListeners = true;
-                break;
             }
+            break;
+        }
 
-            case CHILD_REMOVED:
+        case CHILD_REMOVED:
+        {
+            if ( !event.getData().getPath().equals(path) )
             {
                 instances.remove(instanceIdFromData(event.getData()));
                 notifyListeners = true;
-                break;
             }
+            break;
+        }
         }
 
-        if ( notifyListeners )
+        if ( notifyListeners && (initializedLatch.getCount() == 0) )
         {
-            listenerContainer.forEach
-            (
-                new Function<ServiceCacheListener, Void>()
+            listenerContainer.forEach(new Function<ServiceCacheListener, 
Void>()
+            {
+                @Override
+                public Void apply(ServiceCacheListener listener)
                 {
-                    @Override
-                    public Void apply(ServiceCacheListener listener)
-                    {
-                        listener.cacheChanged();
-                        return null;
-                    }
+                    listener.cacheChanged();
+                    return null;
                 }
-            );
+            });
         }
     }
 
@@ -207,18 +215,23 @@ public class ServiceCacheImpl<T> implements 
ServiceCache<T>, PathChildrenCacheLi
         return ZKPaths.getNodeFromPath(childData.getPath());
     }
 
-    private void addInstance(ChildData childData, boolean onlyIfAbsent) throws 
Exception
+    private void addInstance(ChildData childData)
     {
-        String                  instanceId = instanceIdFromData(childData);
-        ServiceInstance<T>      serviceInstance = 
discovery.getSerializer().deserialize(childData.getData());
-        if ( onlyIfAbsent )
+        try
         {
-            instances.putIfAbsent(instanceId, serviceInstance);
+            String instanceId = instanceIdFromData(childData);
+            ServiceInstance<T> serviceInstance = 
discovery.getSerializer().deserialize(childData.getData());
+            instances.put(instanceId, serviceInstance);
         }
-        else
+        catch ( Exception e )
         {
-            instances.put(instanceId, serviceInstance);
+            throw new RuntimeException(e);
         }
-        cache.clearDataBytes(childData.getPath(), 
childData.getStat().getVersion());
+    }
+
+    private void initialized()
+    {
+        discovery.cacheOpened(this);
+        initializedLatch.countDown();
     }
 }
diff --git 
a/curator-x-discovery/src/test/java/org/apache/curator/x/discovery/TestServiceCache.java
 
b/curator-x-discovery/src/test/java/org/apache/curator/x/discovery/TestServiceCache.java
index fda5c26..2a9d2d8 100644
--- 
a/curator-x-discovery/src/test/java/org/apache/curator/x/discovery/TestServiceCache.java
+++ 
b/curator-x-discovery/src/test/java/org/apache/curator/x/discovery/TestServiceCache.java
@@ -27,6 +27,7 @@ import org.apache.curator.retry.RetryOneTime;
 import org.apache.curator.test.BaseClassForTests;
 import org.apache.curator.test.ExecuteCalledWatchingExecutorService;
 import org.apache.curator.test.Timing;
+import org.apache.curator.test.compatibility.CuratorTestBase;
 import org.apache.curator.utils.CloseableUtils;
 import org.apache.curator.x.discovery.details.ServiceCacheListener;
 import org.testng.Assert;
@@ -40,6 +41,7 @@ import java.util.concurrent.Executors;
 import java.util.concurrent.Semaphore;
 import java.util.concurrent.TimeUnit;
 
+@Test(groups = CuratorTestBase.zk35CompatibilityGroup)
 public class TestServiceCache extends BaseClassForTests
 {
     @Test
diff --git a/pom.xml b/pom.xml
index e223de6..00083c4 100644
--- a/pom.xml
+++ b/pom.xml
@@ -385,6 +385,13 @@
                 <groupId>org.apache.curator</groupId>
                 <artifactId>curator-x-discovery</artifactId>
                 <version>${project.version}</version>
+                <type>test-jar</type>
+            </dependency>
+
+            <dependency>
+                <groupId>org.apache.curator</groupId>
+                <artifactId>curator-x-discovery</artifactId>
+                <version>${project.version}</version>
             </dependency>
 
             <dependency>
@@ -400,6 +407,13 @@
             </dependency>
 
             <dependency>
+                <groupId>org.apache.curator</groupId>
+                <artifactId>curator-x-async</artifactId>
+                <version>${project.version}</version>
+                <type>test-jar</type>
+            </dependency>
+
+            <dependency>
                 <groupId>org.apache.commons</groupId>
                 <artifactId>commons-math</artifactId>
                 <version>${commons-math-version}</version>

Reply via email to