This is an automated email from the ASF dual-hosted git repository.

randgalt pushed a commit to branch CURATOR-549-zk36-persistent-watcher-bridge
in repository https://gitbox.apache.org/repos/asf/curator.git

commit e388abd6c86a13f1debed01b7bf3934dd3ecd3e9
Author: randgalt <randg...@apache.org>
AuthorDate: Fri Mar 20 14:48:18 2020 -0500

    CURATOR-549
    
    Creates a simple bridge that, when using ZK 3.6.0 creates a CuratorCache, 
and for earlier versions creates a TreeCache. The curator-test-zk35 module 
ensures that both code paths are tested.
---
 .../cache/CompatibleCuratorCacheBridge.java        | 131 +++++++++++++++++++++
 .../framework/recipes/cache/CuratorCache.java      |  15 +++
 .../recipes/cache/CuratorCacheBridge.java          |  58 +++++++++
 .../recipes/cache/CuratorCacheBridgeBuilder.java   |  57 +++++++++
 .../cache/CuratorCacheBridgeBuilderImpl.java       |  76 ++++++++++++
 .../framework/recipes/cache/CuratorCacheImpl.java  |   8 +-
 .../framework/recipes/nodes/GroupMember.java       |  28 ++---
 .../framework/recipes/nodes/TestGroupMember.java   |   2 +
 .../x/async/modeled/details/ModeledCacheImpl.java  |  65 +++++-----
 .../async/modeled/TestCachedModeledFramework.java  |   2 +
 .../apache/curator/x/discovery/ServiceCache.java   |   3 +
 .../curator/x/discovery/ServiceCacheBuilder.java   |  10 --
 .../x/discovery/ServiceProviderBuilder.java        |  24 ++--
 .../discovery/details/ServiceCacheBuilderImpl.java |  24 +---
 .../x/discovery/details/ServiceCacheImpl.java      |  83 +++++++------
 .../x/discovery/details/ServiceDiscoveryImpl.java  |  66 +++++------
 .../details/ServiceProviderBuilderImpl.java        |   9 +-
 .../x/discovery/details/ServiceProviderImpl.java   |  24 +---
 .../x/discovery/ServiceCacheLeakTester.java        |   3 +
 .../curator/x/discovery/TestServiceCache.java      |   8 ++
 .../x/discovery/details/TestServiceCacheRace.java  |   2 +
 .../x/discovery/details/TestServiceDiscovery.java  |   2 +
 .../details/TestServiceDiscoveryBuilder.java       |   6 +-
 .../x/discovery/details/TestServiceProvider.java   |   2 +
 src/site/confluence/breaking-changes.confluence    |   3 +
 25 files changed, 526 insertions(+), 185 deletions(-)

diff --git 
a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/CompatibleCuratorCacheBridge.java
 
b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/CompatibleCuratorCacheBridge.java
new file mode 100644
index 0000000..5dd71fd
--- /dev/null
+++ 
b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/CompatibleCuratorCacheBridge.java
@@ -0,0 +1,131 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.curator.framework.recipes.cache;
+
+import com.google.common.collect.Sets;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.listen.Listenable;
+import org.apache.curator.framework.listen.StandardListenerManager;
+import java.util.Collections;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ExecutorService;
+import java.util.stream.Stream;
+
+import static 
org.apache.curator.framework.recipes.cache.CuratorCacheListener.Type.*;
+
+/**
+ * Version of CuratorCacheBridge for pre-ZK 3.6 - uses TreeCache instead of 
CuratorCache
+ */
+@SuppressWarnings("deprecation")
+class CompatibleCuratorCacheBridge implements CuratorCacheBridge, 
TreeCacheListener
+{
+    private final TreeCache cache;
+    private final StandardListenerManager<CuratorCacheListener> 
listenerManager = StandardListenerManager.standard();
+    private final String path;
+
+    CompatibleCuratorCacheBridge(CuratorFramework client, String path, 
CuratorCache.Options[] optionsArg, ExecutorService executorService, boolean 
cacheData)
+    {
+        this.path = path;
+        Set<CuratorCache.Options> options = (optionsArg != null) ? 
Sets.newHashSet(optionsArg) : Collections.emptySet();
+        TreeCache.Builder builder = TreeCache.newBuilder(client, 
path).setCacheData(cacheData);
+        if ( options.contains(CuratorCache.Options.SINGLE_NODE_CACHE) )
+        {
+            builder.setMaxDepth(0);
+        }
+        if ( options.contains(CuratorCache.Options.COMPRESSED_DATA) )
+        {
+            builder.setDataIsCompressed(true);
+        }
+        if ( executorService != null )
+        {
+            builder.setExecutor(executorService);
+        }
+        cache = builder.build();
+    }
+
+    @Override
+    public void start()
+    {
+        try
+        {
+            cache.getListenable().addListener(this);
+
+            cache.start();
+        }
+        catch ( Exception e )
+        {
+            throw new RuntimeException(e);
+        }
+    }
+
+    @Override
+    public void close()
+    {
+        cache.close();
+    }
+
+    @Override
+    public Listenable<CuratorCacheListener> listenable()
+    {
+        return listenerManager;
+    }
+
+    @Override
+    public Stream<ChildData> streamRootChildren()
+    {
+        Map<String, ChildData> currentChildren = 
cache.getCurrentChildren(path);
+        if ( currentChildren == null )
+        {
+            return Stream.empty();
+        }
+        return currentChildren.values().stream();
+    }
+
+    @Override
+    public void childEvent(CuratorFramework client, TreeCacheEvent event) 
throws Exception
+    {
+        switch ( event.getType() )
+        {
+            case NODE_ADDED:
+            {
+                listenerManager.forEach(listener -> 
listener.event(NODE_CREATED, null, event.getData()));
+                break;
+            }
+
+            case NODE_REMOVED:
+            {
+                listenerManager.forEach(listener -> 
listener.event(NODE_DELETED, event.getData(), null));
+                break;
+            }
+
+            case NODE_UPDATED:
+            {
+                listenerManager.forEach(listener -> 
listener.event(NODE_CHANGED, null, event.getData()));
+                break;
+            }
+
+            case INITIALIZED:
+            {
+                listenerManager.forEach(CuratorCacheListener::initialized);
+                break;
+            }
+        }
+    }
+}
diff --git 
a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/CuratorCache.java
 
b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/CuratorCache.java
index 7ebf56d..e1fcca9 100644
--- 
a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/CuratorCache.java
+++ 
b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/CuratorCache.java
@@ -89,6 +89,21 @@ public interface CuratorCache extends Closeable, 
CuratorCacheAccessor
     }
 
     /**
+     * Start a Curator Cache Bridge builder. A Curator Cache Bridge is
+     * a facade that uses {@link 
org.apache.curator.framework.recipes.cache.CuratorCache} if
+     * persistent watches are available or {@link 
org.apache.curator.framework.recipes.cache.TreeCache}
+     * otherwise (i.e. if you are using ZooKeeper 3.5.x).
+     *
+     * @param client Curator client
+     * @param path path to cache
+     * @return bridge builder
+     */
+    static CuratorCacheBridgeBuilder bridgeBuilder(CuratorFramework client, 
String path)
+    {
+        return new CuratorCacheBridgeBuilderImpl(client, path);
+    }
+
+    /**
      * Start the cache. This will cause a complete refresh from the cache's 
root node and generate
      * events for all nodes found, etc.
      */
diff --git 
a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/CuratorCacheBridge.java
 
b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/CuratorCacheBridge.java
new file mode 100644
index 0000000..e7d8868
--- /dev/null
+++ 
b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/CuratorCacheBridge.java
@@ -0,0 +1,58 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.curator.framework.recipes.cache;
+
+import org.apache.curator.framework.listen.Listenable;
+import java.io.Closeable;
+import java.util.stream.Stream;
+
+/**
+ * A facade that uses {@link 
org.apache.curator.framework.recipes.cache.CuratorCache} if
+ * persistent watches are available or a {@link 
org.apache.curator.framework.recipes.cache.TreeCache}
+ * otherwise
+ */
+@SuppressWarnings("deprecation")
+public interface CuratorCacheBridge extends Closeable
+{
+    /**
+     * Start the cache. This will cause a complete refresh from the cache's 
root node and generate
+     * events for all nodes found, etc.
+     */
+    void start();
+
+    /**
+     * Close the cache, stop responding to events, etc.
+     */
+    @Override
+    void close();
+
+    /**
+     * Return the listener container so that listeners can be registered to be 
notified of changes to the cache
+     *
+     * @return listener container
+     */
+    Listenable<CuratorCacheListener> listenable();
+
+    /**
+     * Return a stream over the storage entries that are the immediate 
children of the root node.
+     *
+     * @return stream over root entries
+     */
+    Stream<ChildData> streamRootChildren();
+}
diff --git 
a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/CuratorCacheBridgeBuilder.java
 
b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/CuratorCacheBridgeBuilder.java
new file mode 100644
index 0000000..4a1920f
--- /dev/null
+++ 
b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/CuratorCacheBridgeBuilder.java
@@ -0,0 +1,57 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.curator.framework.recipes.cache;
+
+import java.util.concurrent.ExecutorService;
+
+public interface CuratorCacheBridgeBuilder
+{
+    /**
+     * @param options any options
+     * @return this
+     */
+    CuratorCacheBridgeBuilder withOptions(CuratorCache.Options... options);
+
+    /**
+     * The bridge cache will not retain the data bytes. i.e. ChildData objects
+     * returned by the cache will always return {@code null} for {@link 
ChildData#getData()}
+     *
+     * @return this
+     */
+    CuratorCacheBridgeBuilder withBytesNotCached();
+
+    /**
+     * If the old {@link org.apache.curator.framework.recipes.cache.TreeCache} 
is used by the bridge
+     * (i.e. you are using ZooKeeper 3.5.x) then this executor service is 
passed to {@link 
org.apache.curator.framework.recipes.cache.TreeCache.Builder#setExecutor(java.util.concurrent.ExecutorService)}.
+     * For {@link org.apache.curator.framework.recipes.cache.CuratorCache} 
this is not used and will be ignored (a warning will be logged).
+     *
+     * @param executorService executor to use for ZooKeeper 3.5.x
+     * @return this
+     */
+    @SuppressWarnings("deprecation")
+    CuratorCacheBridgeBuilder withExecutorService(ExecutorService 
executorService);
+
+    /**
+     * Return a new Curator Cache Bridge based on the builder methods that 
have been called
+     *
+     * @return new Curator Cache Bridge
+     */
+    CuratorCacheBridge build();
+}
\ No newline at end of file
diff --git 
a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/CuratorCacheBridgeBuilderImpl.java
 
b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/CuratorCacheBridgeBuilderImpl.java
new file mode 100644
index 0000000..2076992
--- /dev/null
+++ 
b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/CuratorCacheBridgeBuilderImpl.java
@@ -0,0 +1,76 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.curator.framework.recipes.cache;
+
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.utils.Compatibility;
+import org.slf4j.LoggerFactory;
+import java.util.concurrent.ExecutorService;
+
+class CuratorCacheBridgeBuilderImpl implements CuratorCacheBridgeBuilder
+{
+    private final CuratorFramework client;
+    private final String path;
+    private CuratorCache.Options[] options;
+    private boolean cacheData = true;
+    private ExecutorService executorService = null;
+
+    CuratorCacheBridgeBuilderImpl(CuratorFramework client, String path)
+    {
+        this.client = client;
+        this.path = path;
+    }
+
+    @Override
+    public CuratorCacheBridgeBuilder withOptions(CuratorCache.Options... 
options)
+    {
+        this.options = options;
+        return this;
+    }
+
+    @Override
+    public CuratorCacheBridgeBuilder withBytesNotCached()
+    {
+        cacheData = false;
+        return this;
+    }
+
+    @Override
+    public CuratorCacheBridgeBuilder withExecutorService(ExecutorService 
executorService)
+    {
+        this.executorService = executorService;
+        return this;
+    }
+
+    @Override
+    public CuratorCacheBridge build()
+    {
+        if ( Compatibility.hasPersistentWatchers() )
+        {
+            if ( executorService != null )
+            {
+                LoggerFactory.getLogger(getClass()).warn("CuratorCache does 
not support custom ExecutorService");
+            }
+            CuratorCacheStorage storage = cacheData ? 
CuratorCacheStorage.standard() : CuratorCacheStorage.bytesNotCached();
+            return new CuratorCacheImpl(client, storage, path, options, null, 
null);
+        }
+        return new CompatibleCuratorCacheBridge(client, path, options, 
executorService, cacheData);
+    }
+}
\ No newline at end of file
diff --git 
a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/CuratorCacheImpl.java
 
b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/CuratorCacheImpl.java
index c207b26..a5cc57c 100644
--- 
a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/CuratorCacheImpl.java
+++ 
b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/CuratorCacheImpl.java
@@ -47,7 +47,7 @@ import static 
org.apache.curator.framework.recipes.cache.CuratorCacheListener.Ty
 import static org.apache.zookeeper.KeeperException.Code.NONODE;
 import static org.apache.zookeeper.KeeperException.Code.OK;
 
-class CuratorCacheImpl implements CuratorCache
+class CuratorCacheImpl implements CuratorCache, CuratorCacheBridge
 {
     private final Logger log = LoggerFactory.getLogger(getClass());
     private final AtomicReference<State> state = new 
AtomicReference<>(State.LATENT);
@@ -136,6 +136,12 @@ class CuratorCacheImpl implements CuratorCache
         return storage.streamImmediateChildren(fromParent);
     }
 
+    @Override
+    public Stream<ChildData> streamRootChildren()
+    {
+        return storage.streamImmediateChildren(path);
+    }
+
     @VisibleForTesting
     CuratorCacheStorage storage()
     {
diff --git 
a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/nodes/GroupMember.java
 
b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/nodes/GroupMember.java
index 8cd1f65..954b78d 100644
--- 
a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/nodes/GroupMember.java
+++ 
b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/nodes/GroupMember.java
@@ -24,11 +24,14 @@ import com.google.common.collect.ImmutableMap;
 import org.apache.curator.framework.CuratorFramework;
 import org.apache.curator.framework.CuratorFrameworkFactory;
 import org.apache.curator.framework.recipes.cache.ChildData;
-import org.apache.curator.framework.recipes.cache.PathChildrenCache;
+import org.apache.curator.framework.recipes.cache.CuratorCache;
+import org.apache.curator.framework.recipes.cache.CuratorCacheBridge;
 import org.apache.curator.utils.CloseableUtils;
 import org.apache.curator.utils.ThreadUtils;
 import org.apache.curator.utils.ZKPaths;
+import org.apache.zookeeper.CreateMode;
 import java.io.Closeable;
+import java.util.Iterator;
 import java.util.Map;
 
 /**
@@ -37,8 +40,8 @@ import java.util.Map;
  */
 public class GroupMember implements Closeable
 {
-    private final PersistentEphemeralNode pen;
-    private final PathChildrenCache cache;
+    private final PersistentNode pen;
+    private final CuratorCacheBridge cache;
     private final String thisId;
 
     /**
@@ -61,8 +64,8 @@ public class GroupMember implements Closeable
     {
         this.thisId = Preconditions.checkNotNull(thisId, "thisId cannot be 
null");
 
-        cache = newPathChildrenCache(client, membershipPath);
-        pen = newPersistentEphemeralNode(client, membershipPath, thisId, 
payload);
+        cache = CuratorCache.bridgeBuilder(client, membershipPath).build();
+        pen = new PersistentNode(client, CreateMode.EPHEMERAL, false, 
ZKPaths.makePath(membershipPath, thisId), payload);
     }
 
     /**
@@ -121,8 +124,11 @@ public class GroupMember implements Closeable
     {
         ImmutableMap.Builder<String, byte[]> builder = ImmutableMap.builder();
         boolean thisIdAdded = false;
-        for ( ChildData data : cache.getCurrentData() )
+
+        Iterator<ChildData> iterator = cache.streamRootChildren().iterator();
+        while ( iterator.hasNext() )
         {
+            ChildData data = iterator.next();
             String id = idFromPath(data.getPath());
             thisIdAdded = thisIdAdded || id.equals(thisId);
             builder.put(id, data.getData());
@@ -144,14 +150,4 @@ public class GroupMember implements Closeable
     {
         return ZKPaths.getNodeFromPath(path);
     }
-
-    protected PersistentEphemeralNode 
newPersistentEphemeralNode(CuratorFramework client, String membershipPath, 
String thisId, byte[] payload)
-    {
-        return new PersistentEphemeralNode(client, 
PersistentEphemeralNode.Mode.EPHEMERAL, ZKPaths.makePath(membershipPath, 
thisId), payload);
-    }
-
-    protected PathChildrenCache newPathChildrenCache(CuratorFramework client, 
String membershipPath)
-    {
-        return new PathChildrenCache(client, membershipPath, true);
-    }
 }
diff --git 
a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/nodes/TestGroupMember.java
 
b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/nodes/TestGroupMember.java
index 2da051f..4395b36 100644
--- 
a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/nodes/TestGroupMember.java
+++ 
b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/nodes/TestGroupMember.java
@@ -25,11 +25,13 @@ import org.apache.curator.framework.CuratorFrameworkFactory;
 import org.apache.curator.retry.RetryOneTime;
 import org.apache.curator.test.BaseClassForTests;
 import org.apache.curator.test.Timing;
+import org.apache.curator.test.compatibility.CuratorTestBase;
 import org.apache.curator.utils.CloseableUtils;
 import org.testng.Assert;
 import org.testng.annotations.Test;
 import java.util.Map;
 
+@Test(groups = CuratorTestBase.zk35TestCompatibilityGroup)
 public class TestGroupMember extends BaseClassForTests
 {
     // NOTE - don't need many tests as this class is just a wrapper around two 
existing recipes
diff --git 
a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/ModeledCacheImpl.java
 
b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/ModeledCacheImpl.java
index 5c015f4..e324ac4 100644
--- 
a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/ModeledCacheImpl.java
+++ 
b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/ModeledCacheImpl.java
@@ -19,9 +19,13 @@
 package org.apache.curator.x.async.modeled.details;
 
 import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.EnsureContainers;
 import org.apache.curator.framework.listen.Listenable;
 import org.apache.curator.framework.listen.StandardListenerManager;
-import org.apache.curator.framework.recipes.cache.TreeCache;
+import org.apache.curator.framework.recipes.cache.CuratorCache;
+import org.apache.curator.framework.recipes.cache.CuratorCacheBridge;
+import org.apache.curator.framework.recipes.cache.CuratorCacheBridgeBuilder;
+import org.apache.curator.framework.recipes.cache.CuratorCacheListener;
 import org.apache.curator.framework.recipes.cache.TreeCacheEvent;
 import org.apache.curator.framework.recipes.cache.TreeCacheListener;
 import org.apache.curator.utils.ThreadUtils;
@@ -42,11 +46,12 @@ import java.util.stream.Collectors;
 
 class ModeledCacheImpl<T> implements TreeCacheListener, ModeledCache<T>
 {
-    private final TreeCache cache;
+    private final CuratorCacheBridge cache;
     private final Map<ZPath, Entry<T>> entries = new ConcurrentHashMap<>();
     private final ModelSerializer<T> serializer;
     private final StandardListenerManager<ModeledCacheListener<T>> 
listenerContainer = StandardListenerManager.standard();
     private final ZPath basePath;
+    private final EnsureContainers ensureContainers;
 
     private static final class Entry<T>
     {
@@ -69,19 +74,32 @@ class ModeledCacheImpl<T> implements TreeCacheListener, 
ModeledCache<T>
 
         basePath = modelSpec.path();
         this.serializer = modelSpec.serializer();
-        cache = TreeCache.newBuilder(client, basePath.fullPath())
-            .setCacheData(false)
-            
.setDataIsCompressed(modelSpec.createOptions().contains(CreateOption.compress))
-            .setExecutor(executor)
-            
.setCreateParentNodes(modelSpec.createOptions().contains(CreateOption.createParentsIfNeeded)
 || modelSpec.createOptions().contains(CreateOption.createParentsAsContainers))
-            .build();
+        CuratorCacheBridgeBuilder bridgeBuilder = 
CuratorCache.bridgeBuilder(client, 
basePath.fullPath()).withBytesNotCached().withExecutorService(executor);
+        if ( modelSpec.createOptions().contains(CreateOption.compress) )
+        {
+            bridgeBuilder = 
bridgeBuilder.withOptions(CuratorCache.Options.COMPRESSED_DATA);
+        }
+        cache = bridgeBuilder.build();
+        
cache.listenable().addListener(CuratorCacheListener.builder().forTreeCache(client,
 this).build());
+
+        if ( 
modelSpec.createOptions().contains(CreateOption.createParentsIfNeeded) || 
modelSpec.createOptions().contains(CreateOption.createParentsAsContainers) )
+        {
+            ensureContainers = new EnsureContainers(client, 
basePath.fullPath());
+        }
+        else
+        {
+            ensureContainers = null;
+        }
     }
 
     public void start()
     {
         try
         {
-            cache.getListenable().addListener(this);
+            if ( ensureContainers != null )
+            {
+                ensureContainers.ensure();
+            }
             cache.start();
         }
         catch ( Exception e )
@@ -92,7 +110,6 @@ class ModeledCacheImpl<T> implements TreeCacheListener, 
ModeledCache<T>
 
     public void close()
     {
-        cache.getListenable().removeListener(this);
         cache.close();
         entries.clear();
     }
@@ -148,7 +165,7 @@ class ModeledCacheImpl<T> implements TreeCacheListener, 
ModeledCache<T>
         }
     }
 
-    private void internalChildEvent(TreeCacheEvent event) throws Exception
+    private void internalChildEvent(TreeCacheEvent event)
     {
         switch ( event.getType() )
         {
@@ -156,16 +173,13 @@ class ModeledCacheImpl<T> implements TreeCacheListener, 
ModeledCache<T>
         case NODE_UPDATED:
         {
             ZPath path = ZPath.parse(event.getData().getPath());
-            if ( !path.equals(basePath) )
+            byte[] bytes = event.getData().getData();
+            if ( (bytes != null) && (bytes.length > 0) )    // otherwise it's 
probably just a parent node being created
             {
-                byte[] bytes = event.getData().getData();
-                if ( (bytes != null) && (bytes.length > 0) )    // otherwise 
it's probably just a parent node being created
-                {
-                    T model = serializer.deserialize(bytes);
-                    entries.put(path, new Entry<>(event.getData().getStat(), 
model));
-                    ModeledCacheListener.Type type = (event.getType() == 
TreeCacheEvent.Type.NODE_ADDED) ? ModeledCacheListener.Type.NODE_ADDED : 
ModeledCacheListener.Type.NODE_UPDATED;
-                    accept(type, path, event.getData().getStat(), model);
-                }
+                T model = serializer.deserialize(bytes);
+                entries.put(path, new Entry<>(event.getData().getStat(), 
model));
+                ModeledCacheListener.Type type = (event.getType() == 
TreeCacheEvent.Type.NODE_ADDED) ? ModeledCacheListener.Type.NODE_ADDED : 
ModeledCacheListener.Type.NODE_UPDATED;
+                accept(type, path, event.getData().getStat(), model);
             }
             break;
         }
@@ -173,13 +187,10 @@ class ModeledCacheImpl<T> implements TreeCacheListener, 
ModeledCache<T>
         case NODE_REMOVED:
         {
             ZPath path = ZPath.parse(event.getData().getPath());
-            if ( !path.equals(basePath) )
-            {
-                Entry<T> entry = entries.remove(path);
-                T model = (entry != null) ? entry.model : 
serializer.deserialize(event.getData().getData());
-                Stat stat = (entry != null) ? entry.stat : 
event.getData().getStat();
-                accept(ModeledCacheListener.Type.NODE_REMOVED, path, stat, 
model);
-            }
+            Entry<T> entry = entries.remove(path);
+            T model = (entry != null) ? entry.model : 
serializer.deserialize(event.getData().getData());
+            Stat stat = (entry != null) ? entry.stat : 
event.getData().getStat();
+            accept(ModeledCacheListener.Type.NODE_REMOVED, path, stat, model);
             break;
         }
 
diff --git 
a/curator-x-async/src/test/java/org/apache/curator/x/async/modeled/TestCachedModeledFramework.java
 
b/curator-x-async/src/test/java/org/apache/curator/x/async/modeled/TestCachedModeledFramework.java
index 2d33c13..70f9c66 100644
--- 
a/curator-x-async/src/test/java/org/apache/curator/x/async/modeled/TestCachedModeledFramework.java
+++ 
b/curator-x-async/src/test/java/org/apache/curator/x/async/modeled/TestCachedModeledFramework.java
@@ -21,6 +21,7 @@ package org.apache.curator.x.async.modeled;
 import com.google.common.collect.Sets;
 import org.apache.curator.framework.state.ConnectionState;
 import org.apache.curator.test.Timing;
+import org.apache.curator.test.compatibility.CuratorTestBase;
 import org.apache.curator.x.async.modeled.cached.CachedModeledFramework;
 import org.apache.curator.x.async.modeled.cached.ModeledCacheListener;
 import org.apache.curator.x.async.modeled.models.TestModel;
@@ -38,6 +39,7 @@ import java.util.function.Function;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
+@Test(groups = CuratorTestBase.zk35TestCompatibilityGroup)
 public class TestCachedModeledFramework extends TestModeledFrameworkBase
 {
     @Test
diff --git 
a/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/ServiceCache.java
 
b/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/ServiceCache.java
index a122d69..d409f99 100644
--- 
a/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/ServiceCache.java
+++ 
b/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/ServiceCache.java
@@ -23,6 +23,7 @@ import 
org.apache.curator.x.discovery.details.InstanceProvider;
 import org.apache.curator.x.discovery.details.ServiceCacheListener;
 import java.io.Closeable;
 import java.util.List;
+import java.util.concurrent.CountDownLatch;
 
 public interface ServiceCache<T> extends Closeable, 
Listenable<ServiceCacheListener>, InstanceProvider<T>
 {
@@ -41,4 +42,6 @@ public interface ServiceCache<T> extends Closeable, 
Listenable<ServiceCacheListe
      * @throws Exception errors
      */
     public void start() throws Exception;
+
+    CountDownLatch startImmediate() throws Exception;
 }
diff --git 
a/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/ServiceCacheBuilder.java
 
b/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/ServiceCacheBuilder.java
index 326a16c..cfec2c4 100644
--- 
a/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/ServiceCacheBuilder.java
+++ 
b/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/ServiceCacheBuilder.java
@@ -18,7 +18,6 @@
  */
 package org.apache.curator.x.discovery;
 
-import org.apache.curator.utils.CloseableExecutorService;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.ThreadFactory;
 
@@ -59,13 +58,4 @@ public interface ServiceCacheBuilder<T>
      * @return this
      */
     public ServiceCacheBuilder<T> executorService(ExecutorService 
executorService);
-
-    /**
-     * Optional CloseableExecutorService to use for the cache's background 
thread. The specified ExecutorService
-     * overrides any prior ThreadFactory or ExecutorService set on the 
ServiceCacheBuilder.
-     *
-     * @param executorService an instance of CloseableExecutorService
-     * @return this
-     */
-    public ServiceCacheBuilder<T> executorService(CloseableExecutorService 
executorService);
 }
diff --git 
a/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/ServiceProviderBuilder.java
 
b/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/ServiceProviderBuilder.java
index 02948a3..4c0544d 100644
--- 
a/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/ServiceProviderBuilder.java
+++ 
b/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/ServiceProviderBuilder.java
@@ -18,7 +18,6 @@
  */
 package org.apache.curator.x.discovery;
 
-import org.apache.curator.utils.CloseableExecutorService;
 import org.apache.curator.x.discovery.strategies.RoundRobinStrategy;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.ThreadFactory;
@@ -30,7 +29,7 @@ public interface ServiceProviderBuilder<T>
      *
      * @return provider
      */
-    public ServiceProvider<T> build();
+    ServiceProvider<T> build();
 
     /**
      * required - set the name of the service to be provided
@@ -38,7 +37,7 @@ public interface ServiceProviderBuilder<T>
      * @param serviceName the name of the service
      * @return this
      */
-    public ServiceProviderBuilder<T> serviceName(String serviceName);
+    ServiceProviderBuilder<T> serviceName(String serviceName);
 
     /**
      * optional - set the provider strategy. The default is {@link 
RoundRobinStrategy}
@@ -46,7 +45,7 @@ public interface ServiceProviderBuilder<T>
      * @param providerStrategy strategy to use
      * @return this
      */
-    public ServiceProviderBuilder<T> providerStrategy(ProviderStrategy<T> 
providerStrategy);
+    ServiceProviderBuilder<T> providerStrategy(ProviderStrategy<T> 
providerStrategy);
 
     /**
      * optional - the thread factory to use for creating internal threads. The 
specified ThreadFactory overrides
@@ -57,7 +56,7 @@ public interface ServiceProviderBuilder<T>
      * @deprecated use {@link #executorService(ExecutorService)} instead
      */
     @Deprecated
-    public ServiceProviderBuilder<T> threadFactory(ThreadFactory 
threadFactory);
+    ServiceProviderBuilder<T> threadFactory(ThreadFactory threadFactory);
 
     /**
      * Set the down instance policy
@@ -65,7 +64,7 @@ public interface ServiceProviderBuilder<T>
      * @param downInstancePolicy new policy
      * @return this
      */
-    public ServiceProviderBuilder<T> downInstancePolicy(DownInstancePolicy 
downInstancePolicy);
+    ServiceProviderBuilder<T> downInstancePolicy(DownInstancePolicy 
downInstancePolicy);
 
     /**
      * Add an instance filter. NOTE: this does not remove previously added 
filters. i.e.
@@ -75,7 +74,7 @@ public interface ServiceProviderBuilder<T>
      * @param filter filter to add
      * @return this
      */
-    public ServiceProviderBuilder<T> additionalFilter(InstanceFilter<T> 
filter);
+    ServiceProviderBuilder<T> additionalFilter(InstanceFilter<T> filter);
 
     /**
      * Optional ExecutorService to use for the cache's background thread. The 
specified ExecutorService
@@ -85,14 +84,5 @@ public interface ServiceProviderBuilder<T>
      * @param executorService executor service
      * @return this
      */
-    public ServiceProviderBuilder<T> executorService(ExecutorService 
executorService);
-
-    /**
-     * Optional CloseableExecutorService to use for the cache's background 
thread. The specified CloseableExecutorService
-     * overrides any prior ThreadFactory or CloseableExecutorService set on 
the ServiceProviderBuilder.
-     *
-     * @param executorService an instance of CloseableExecutorService
-     * @return this
-     */
-    public ServiceProviderBuilder<T> executorService(CloseableExecutorService 
executorService);
+    ServiceProviderBuilder<T> executorService(ExecutorService executorService);
 }
diff --git 
a/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceCacheBuilderImpl.java
 
b/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceCacheBuilderImpl.java
index 7647c0f..cf7f468 100644
--- 
a/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceCacheBuilderImpl.java
+++ 
b/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceCacheBuilderImpl.java
@@ -18,7 +18,6 @@
  */
 package org.apache.curator.x.discovery.details;
 
-import org.apache.curator.utils.CloseableExecutorService;
 import org.apache.curator.x.discovery.ServiceCache;
 import org.apache.curator.x.discovery.ServiceCacheBuilder;
 import java.util.concurrent.ExecutorService;
@@ -32,7 +31,7 @@ class ServiceCacheBuilderImpl<T> implements 
ServiceCacheBuilder<T>
     private ServiceDiscoveryImpl<T> discovery;
     private String name;
     private ThreadFactory threadFactory;
-    private CloseableExecutorService executorService;
+    private ExecutorService executorService;
 
     ServiceCacheBuilderImpl(ServiceDiscoveryImpl<T> discovery)
     {
@@ -47,13 +46,13 @@ class ServiceCacheBuilderImpl<T> implements 
ServiceCacheBuilder<T>
     @Override
     public ServiceCache<T> build()
     {
-        if (executorService != null)
+        if (threadFactory != null)
         {
-            return new ServiceCacheImpl<T>(discovery, name, executorService);
+            return new ServiceCacheImpl<T>(discovery, name, threadFactory);
         }
         else
         {
-            return new ServiceCacheImpl<T>(discovery, name, threadFactory);
+            return new ServiceCacheImpl<T>(discovery, name, executorService);
         }
     }
 
@@ -92,20 +91,9 @@ class ServiceCacheBuilderImpl<T> implements 
ServiceCacheBuilder<T>
      * @return this
      */
     @Override
-    public ServiceCacheBuilder<T> executorService(ExecutorService 
executorService) {
-        return executorService(new CloseableExecutorService(executorService, 
false));
-    }
-
-    /**
-     * Optional CloseableExecutorService to use for the cache's background 
thread
-     *
-     * @param executorService an instance of CloseableExecutorService
-     * @return this
-     */
-    @Override
-    public ServiceCacheBuilder<T> executorService(CloseableExecutorService 
executorService) {
+    public ServiceCacheBuilder<T> executorService(ExecutorService 
executorService)
+    {
         this.executorService = executorService;
-        this.threadFactory = null;
         return this;
     }
 }
diff --git 
a/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceCacheImpl.java
 
b/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceCacheImpl.java
index c154836..f0316ac 100644
--- 
a/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceCacheImpl.java
+++ 
b/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceCacheImpl.java
@@ -20,26 +20,27 @@
 package org.apache.curator.x.discovery.details;
 
 import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Function;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.EnsureContainers;
 import org.apache.curator.framework.listen.StandardListenerManager;
 import org.apache.curator.framework.recipes.cache.ChildData;
-import org.apache.curator.framework.recipes.cache.PathChildrenCache;
+import org.apache.curator.framework.recipes.cache.CuratorCache;
+import org.apache.curator.framework.recipes.cache.CuratorCacheBridge;
+import org.apache.curator.framework.recipes.cache.CuratorCacheListener;
 import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent;
 import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener;
-import org.apache.curator.utils.CloseableExecutorService;
 import org.apache.curator.utils.CloseableUtils;
 import org.apache.curator.utils.ZKPaths;
 import org.apache.curator.x.discovery.ServiceCache;
 import org.apache.curator.x.discovery.ServiceInstance;
-import java.io.IOException;
 import java.util.List;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.Executor;
+import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.atomic.AtomicReference;
@@ -48,19 +49,21 @@ public class ServiceCacheImpl<T> implements 
ServiceCache<T>, PathChildrenCacheLi
 {
     private final StandardListenerManager<ServiceCacheListener> 
listenerContainer = StandardListenerManager.standard();
     private final ServiceDiscoveryImpl<T> discovery;
-    private final AtomicReference<State> state = new 
AtomicReference<State>(State.LATENT);
-    private final PathChildrenCache cache;
+    private final AtomicReference<State> state = new 
AtomicReference<>(State.LATENT);
+    private final CuratorCacheBridge cache;
     private final ConcurrentMap<String, ServiceInstance<T>> instances = 
Maps.newConcurrentMap();
+    private final EnsureContainers ensureContainers;
+    private final CountDownLatch initializedLatch = new CountDownLatch(1);
 
     private enum State
     {
         LATENT, STARTED, STOPPED
     }
 
-    private static CloseableExecutorService convertThreadFactory(ThreadFactory 
threadFactory)
+    private static ExecutorService convertThreadFactory(ThreadFactory 
threadFactory)
     {
         Preconditions.checkNotNull(threadFactory, "threadFactory cannot be 
null");
-        return new 
CloseableExecutorService(Executors.newSingleThreadExecutor(threadFactory), 
true);
+        return Executors.newSingleThreadExecutor(threadFactory);
     }
 
     ServiceCacheImpl(ServiceDiscoveryImpl<T> discovery, String name, 
ThreadFactory threadFactory)
@@ -68,16 +71,24 @@ public class ServiceCacheImpl<T> implements 
ServiceCache<T>, PathChildrenCacheLi
         this(discovery, name, convertThreadFactory(threadFactory));
     }
 
-    ServiceCacheImpl(ServiceDiscoveryImpl<T> discovery, String name, 
CloseableExecutorService executorService)
+    ServiceCacheImpl(ServiceDiscoveryImpl<T> discovery, String name, 
ExecutorService executorService)
     {
         Preconditions.checkNotNull(discovery, "discovery cannot be null");
         Preconditions.checkNotNull(name, "name cannot be null");
-        Preconditions.checkNotNull(executorService, "executorService cannot be 
null");
 
         this.discovery = discovery;
 
-        cache = new PathChildrenCache(discovery.getClient(), 
discovery.pathForName(name), true, false, executorService);
-        cache.getListenable().addListener(this);
+        String path = discovery.pathForName(name);
+        cache = CuratorCache.bridgeBuilder(discovery.getClient(), path)
+            .withExecutorService(executorService)
+            .withBytesNotCached()
+            .build();
+        CuratorCacheListener listener = CuratorCacheListener.builder()
+            .forPathChildrenCache(discovery.getClient(), this)
+            .forInitialized(this::initialized)
+            .build();
+        cache.listenable().addListener(listener);
+        ensureContainers = new EnsureContainers(discovery.getClient(), path);
     }
 
     @Override
@@ -93,11 +104,19 @@ public class ServiceCacheImpl<T> implements 
ServiceCache<T>, PathChildrenCacheLi
     @Override
     public void start() throws Exception
     {
+        startImmediate().await();
+    }
+
+    @Override
+    public CountDownLatch startImmediate() throws Exception
+    {
         Preconditions.checkState(state.compareAndSet(State.LATENT, 
State.STARTED), "Cannot be started more than once");
 
-        cache.start(true);
+        ensureContainers.ensure();
+        cache.start();
         if ( debugStartLatch != null )
         {
+            initializedLatch.await();
             debugStartLatch.countDown();
             debugStartLatch = null;
         }
@@ -107,18 +126,11 @@ public class ServiceCacheImpl<T> implements 
ServiceCache<T>, PathChildrenCacheLi
             debugStartWaitLatch = null;
         }
 
-        for ( ChildData childData : cache.getCurrentData() )
-        {
-            if ( childData.getData() != null )  // else already processed by 
the cache listener
-            {
-                addInstance(childData, true);
-            }
-        }
-        discovery.cacheOpened(this);
+        return initializedLatch;
     }
 
     @Override
-    public void close() throws IOException
+    public void close()
     {
         Preconditions.checkState(state.compareAndSet(State.STARTED, 
State.STOPPED), "Already closed or has not been started");
 
@@ -152,7 +164,7 @@ public class ServiceCacheImpl<T> implements 
ServiceCache<T>, PathChildrenCacheLi
     }
 
     @Override
-    public void childEvent(CuratorFramework client, PathChildrenCacheEvent 
event) throws Exception
+    public void childEvent(CuratorFramework client, PathChildrenCacheEvent 
event)
     {
         boolean notifyListeners = false;
         switch ( event.getType() )
@@ -160,7 +172,7 @@ public class ServiceCacheImpl<T> implements 
ServiceCache<T>, PathChildrenCacheLi
         case CHILD_ADDED:
         case CHILD_UPDATED:
         {
-            addInstance(event.getData(), false);
+            addInstance(event.getData());
             notifyListeners = true;
             break;
         }
@@ -173,7 +185,7 @@ public class ServiceCacheImpl<T> implements 
ServiceCache<T>, PathChildrenCacheLi
         }
         }
 
-        if ( notifyListeners )
+        if ( notifyListeners && (initializedLatch.getCount() == 0) )
         {
             listenerContainer.forEach(ServiceCacheListener::cacheChanged);
         }
@@ -184,18 +196,23 @@ public class ServiceCacheImpl<T> implements 
ServiceCache<T>, PathChildrenCacheLi
         return ZKPaths.getNodeFromPath(childData.getPath());
     }
 
-    private void addInstance(ChildData childData, boolean onlyIfAbsent) throws 
Exception
+    private void addInstance(ChildData childData)
     {
-        String instanceId = instanceIdFromData(childData);
-        ServiceInstance<T> serviceInstance = 
discovery.getSerializer().deserialize(childData.getData());
-        if ( onlyIfAbsent )
+        try
         {
-            instances.putIfAbsent(instanceId, serviceInstance);
+            String instanceId = instanceIdFromData(childData);
+            ServiceInstance<T> serviceInstance = 
discovery.getSerializer().deserialize(childData.getData());
+            instances.put(instanceId, serviceInstance);
         }
-        else
+        catch ( Exception e )
         {
-            instances.put(instanceId, serviceInstance);
+            throw new RuntimeException(e);
         }
-        cache.clearDataBytes(childData.getPath(), 
childData.getStat().getVersion());
+    }
+
+    private void initialized()
+    {
+        discovery.cacheOpened(this);
+        initializedLatch.countDown();
     }
 }
diff --git 
a/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceDiscoveryImpl.java
 
b/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceDiscoveryImpl.java
index 476705c..c072213 100644
--- 
a/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceDiscoveryImpl.java
+++ 
b/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceDiscoveryImpl.java
@@ -26,8 +26,9 @@ import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
 import org.apache.curator.framework.CuratorFramework;
-import org.apache.curator.framework.recipes.cache.NodeCache;
-import org.apache.curator.framework.recipes.cache.NodeCacheListener;
+import org.apache.curator.framework.recipes.cache.CuratorCache;
+import org.apache.curator.framework.recipes.cache.CuratorCacheBridge;
+import org.apache.curator.framework.recipes.cache.CuratorCacheListener;
 import org.apache.curator.framework.state.ConnectionState;
 import org.apache.curator.framework.state.ConnectionStateListener;
 import org.apache.curator.utils.CloseableUtils;
@@ -92,7 +93,7 @@ public class ServiceDiscoveryImpl<T> implements 
ServiceDiscovery<T>
     private static class Entry<T>
     {
         private volatile ServiceInstance<T> service;
-        private volatile NodeCache cache;
+        private volatile CuratorCacheBridge cache;
 
         private Entry(ServiceInstance<T> service)
         {
@@ -277,8 +278,7 @@ public class ServiceDiscoveryImpl<T> implements 
ServiceDiscovery<T>
     @Override
     public ServiceCacheBuilder<T> serviceCacheBuilder()
     {
-        return new ServiceCacheBuilderImpl<T>(this)
-            .threadFactory(ThreadUtils.newThreadFactory("ServiceCache"));
+        return new ServiceCacheBuilderImpl<T>(this);
     }
 
     /**
@@ -458,52 +458,48 @@ public class ServiceDiscoveryImpl<T> implements 
ServiceDiscovery<T>
         }
     }
 
-    private NodeCache makeNodeCache(final ServiceInstance<T> instance)
+    private CuratorCacheBridge makeNodeCache(final ServiceInstance<T> instance)
     {
         if ( !watchInstances )
         {
             return null;
         }
 
-        final NodeCache nodeCache = new NodeCache(client, 
pathForInstance(instance.getName(), instance.getId()));
-        try
-        {
-            nodeCache.start(true);
-        }
-        catch ( InterruptedException e)
-        {
-            Thread.currentThread().interrupt();
-            return null;
-        }
-        catch ( Exception e )
-        {
-            log.error("Could not start node cache for: " + instance, e);
-        }
-        NodeCacheListener listener = new NodeCacheListener()
-        {
-            @Override
-            public void nodeChanged() throws Exception
-            {
-                if ( nodeCache.getCurrentData() != null )
+        CuratorCacheBridge cache = CuratorCache.bridgeBuilder(client, 
pathForInstance(instance.getName(), instance.getId()))
+            .withOptions(CuratorCache.Options.SINGLE_NODE_CACHE)
+            .withBytesNotCached()
+            .build();
+        CuratorCacheListener listener = CuratorCacheListener.builder()
+            .afterInitialized()
+            .forAll((__, ___, data) -> {
+                if ( data != null )
                 {
-                    ServiceInstance<T> newInstance = 
serializer.deserialize(nodeCache.getCurrentData().getData());
-                    Entry<T> entry = services.get(newInstance.getId());
-                    if ( entry != null )
+                    try
                     {
-                        synchronized(entry)
+                        ServiceInstance<T> newInstance = 
serializer.deserialize(data.getData());
+                        Entry<T> entry = services.get(newInstance.getId());
+                        if ( entry != null )
                         {
-                            entry.service = newInstance;
+                            synchronized(entry)
+                            {
+                                entry.service = newInstance;
+                            }
                         }
                     }
+                    catch ( Exception e )
+                    {
+                        log.debug("Could not deserialize: " + data.getPath());
+                    }
                 }
                 else
                 {
                     log.warn("Instance data has been deleted for: " + 
instance);
                 }
-            }
-        };
-        nodeCache.getListenable().addListener(listener);
-        return nodeCache;
+            })
+            .build();
+        cache.listenable().addListener(listener);
+        cache.start();
+        return cache;
     }
 
     private void internalUnregisterService(final Entry<T> entry) throws 
Exception
diff --git 
a/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceProviderBuilderImpl.java
 
b/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceProviderBuilderImpl.java
index e36700b..2f0bbb2 100644
--- 
a/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceProviderBuilderImpl.java
+++ 
b/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceProviderBuilderImpl.java
@@ -19,7 +19,6 @@
 package org.apache.curator.x.discovery.details;
 
 import com.google.common.collect.Lists;
-import org.apache.curator.utils.CloseableExecutorService;
 import org.apache.curator.x.discovery.DownInstancePolicy;
 import org.apache.curator.x.discovery.InstanceFilter;
 import org.apache.curator.x.discovery.ProviderStrategy;
@@ -39,7 +38,7 @@ class ServiceProviderBuilderImpl<T> implements 
ServiceProviderBuilder<T>
     private String serviceName;
     private ProviderStrategy<T> providerStrategy;
     private ThreadFactory threadFactory;
-    private CloseableExecutorService executorService;
+    private ExecutorService executorService;
     private List<InstanceFilter<T>> filters = Lists.newArrayList();
     private DownInstancePolicy downInstancePolicy = new DownInstancePolicy();
 
@@ -111,12 +110,6 @@ class ServiceProviderBuilderImpl<T> implements 
ServiceProviderBuilder<T>
     @Override
     public ServiceProviderBuilder<T> executorService(ExecutorService 
executorService)
     {
-        return executorService(new CloseableExecutorService(executorService));
-    }
-
-    @Override
-    public ServiceProviderBuilder<T> executorService(CloseableExecutorService 
executorService)
-    {
         this.executorService = executorService;
         this.threadFactory = null;
         return this;
diff --git 
a/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceProviderImpl.java
 
b/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceProviderImpl.java
index d9787e4..45b4b9f 100644
--- 
a/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceProviderImpl.java
+++ 
b/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceProviderImpl.java
@@ -19,7 +19,6 @@
 package org.apache.curator.x.discovery.details;
 
 import com.google.common.collect.Lists;
-import org.apache.curator.utils.CloseableExecutorService;
 import org.apache.curator.x.discovery.DownInstancePolicy;
 import org.apache.curator.x.discovery.InstanceFilter;
 import org.apache.curator.x.discovery.ProviderStrategy;
@@ -51,38 +50,27 @@ public class ServiceProviderImpl<T> implements 
ServiceProvider<T>
         this(discovery, serviceName, providerStrategy, threadFactory, null, 
filters, downInstancePolicy);
     }
 
-    public ServiceProviderImpl(ServiceDiscoveryImpl<T> discovery, String 
serviceName, ProviderStrategy<T> providerStrategy, CloseableExecutorService 
executorService, List<InstanceFilter<T>> filters, DownInstancePolicy 
downInstancePolicy)
-    {
-        this(discovery, serviceName, providerStrategy, null, executorService, 
filters, downInstancePolicy);
-    }
-
-    protected ServiceProviderImpl(ServiceDiscoveryImpl<T> discovery, String 
serviceName, ProviderStrategy<T> providerStrategy, ThreadFactory threadFactory, 
CloseableExecutorService executorService, List<InstanceFilter<T>> filters, 
DownInstancePolicy downInstancePolicy)
+    protected ServiceProviderImpl(ServiceDiscoveryImpl<T> discovery, String 
serviceName, ProviderStrategy<T> providerStrategy, ThreadFactory threadFactory, 
ExecutorService executorService, List<InstanceFilter<T>> filters, 
DownInstancePolicy downInstancePolicy)
     {
         this.discovery = discovery;
         this.providerStrategy = providerStrategy;
 
-        downInstanceManager = new DownInstanceManager<T>(downInstancePolicy);
-        final ServiceCacheBuilder builder = 
discovery.serviceCacheBuilder().name(serviceName);
+        downInstanceManager = new DownInstanceManager<>(downInstancePolicy);
+        final ServiceCacheBuilder<T> builder = 
discovery.serviceCacheBuilder().name(serviceName);
         if (executorService != null)
         {
             builder.executorService(executorService);
         } else
         {
+            //noinspection deprecation
             builder.threadFactory(threadFactory);
         }
         cache = builder.build();
 
         ArrayList<InstanceFilter<T>> localFilters = 
Lists.newArrayList(filters);
         localFilters.add(downInstanceManager);
-        localFilters.add(new InstanceFilter<T>()
-        {
-            @Override
-            public boolean apply(ServiceInstance<T> instance)
-            {
-                return instance.isEnabled();
-            }
-        });
-        instanceProvider = new FilteredInstanceProvider<T>(cache, 
localFilters);
+        localFilters.add(ServiceInstance::isEnabled);
+        instanceProvider = new FilteredInstanceProvider<>(cache, localFilters);
     }
 
     /**
diff --git 
a/curator-x-discovery/src/test/java/org/apache/curator/x/discovery/ServiceCacheLeakTester.java
 
b/curator-x-discovery/src/test/java/org/apache/curator/x/discovery/ServiceCacheLeakTester.java
index 590c55c..3d357f6 100644
--- 
a/curator-x-discovery/src/test/java/org/apache/curator/x/discovery/ServiceCacheLeakTester.java
+++ 
b/curator-x-discovery/src/test/java/org/apache/curator/x/discovery/ServiceCacheLeakTester.java
@@ -22,9 +22,12 @@ import org.apache.curator.framework.CuratorFramework;
 import org.apache.curator.framework.CuratorFrameworkFactory;
 import org.apache.curator.retry.RetryOneTime;
 import org.apache.curator.test.TestingServer;
+import org.apache.curator.test.compatibility.CuratorTestBase;
 import org.apache.curator.utils.CloseableUtils;
 import org.apache.curator.x.discovery.strategies.RandomStrategy;
+import org.testng.annotations.Test;
 
+@Test(groups = CuratorTestBase.zk35TestCompatibilityGroup)
 public class ServiceCacheLeakTester
 {
     public static void main(String[] args) throws Exception
diff --git 
a/curator-x-discovery/src/test/java/org/apache/curator/x/discovery/TestServiceCache.java
 
b/curator-x-discovery/src/test/java/org/apache/curator/x/discovery/TestServiceCache.java
index fda5c26..6d24586 100644
--- 
a/curator-x-discovery/src/test/java/org/apache/curator/x/discovery/TestServiceCache.java
+++ 
b/curator-x-discovery/src/test/java/org/apache/curator/x/discovery/TestServiceCache.java
@@ -27,7 +27,9 @@ import org.apache.curator.retry.RetryOneTime;
 import org.apache.curator.test.BaseClassForTests;
 import org.apache.curator.test.ExecuteCalledWatchingExecutorService;
 import org.apache.curator.test.Timing;
+import org.apache.curator.test.compatibility.CuratorTestBase;
 import org.apache.curator.utils.CloseableUtils;
+import org.apache.curator.utils.Compatibility;
 import org.apache.curator.x.discovery.details.ServiceCacheListener;
 import org.testng.Assert;
 import org.testng.annotations.Test;
@@ -40,6 +42,7 @@ import java.util.concurrent.Executors;
 import java.util.concurrent.Semaphore;
 import java.util.concurrent.TimeUnit;
 
+@Test(groups = CuratorTestBase.zk35TestCompatibilityGroup)
 public class TestServiceCache extends BaseClassForTests
 {
     @Test
@@ -261,6 +264,11 @@ public class TestServiceCache extends BaseClassForTests
     @Test
     public void testExecutorServiceIsInvoked() throws Exception
     {
+        if ( Compatibility.hasPersistentWatchers() )
+        {
+            return; // for ZK 3.6 the underlying cache ignores the executor
+        }
+
         List<Closeable> closeables = Lists.newArrayList();
         try
         {
diff --git 
a/curator-x-discovery/src/test/java/org/apache/curator/x/discovery/details/TestServiceCacheRace.java
 
b/curator-x-discovery/src/test/java/org/apache/curator/x/discovery/details/TestServiceCacheRace.java
index 06d63b9..9604c19 100644
--- 
a/curator-x-discovery/src/test/java/org/apache/curator/x/discovery/details/TestServiceCacheRace.java
+++ 
b/curator-x-discovery/src/test/java/org/apache/curator/x/discovery/details/TestServiceCacheRace.java
@@ -25,6 +25,7 @@ import org.apache.curator.framework.state.ConnectionState;
 import org.apache.curator.retry.RetryOneTime;
 import org.apache.curator.test.BaseClassForTests;
 import org.apache.curator.test.Timing;
+import org.apache.curator.test.compatibility.CuratorTestBase;
 import org.apache.curator.utils.CloseableExecutorService;
 import org.apache.curator.utils.CloseableUtils;
 import org.apache.curator.x.discovery.ServiceCache;
@@ -40,6 +41,7 @@ import java.util.List;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.Executors;
 
+@Test(groups = CuratorTestBase.zk35TestCompatibilityGroup)
 public class TestServiceCacheRace extends BaseClassForTests
 {
     private final Timing timing = new Timing();
diff --git 
a/curator-x-discovery/src/test/java/org/apache/curator/x/discovery/details/TestServiceDiscovery.java
 
b/curator-x-discovery/src/test/java/org/apache/curator/x/discovery/details/TestServiceDiscovery.java
index a1c8cfe..7a23e16 100644
--- 
a/curator-x-discovery/src/test/java/org/apache/curator/x/discovery/details/TestServiceDiscovery.java
+++ 
b/curator-x-discovery/src/test/java/org/apache/curator/x/discovery/details/TestServiceDiscovery.java
@@ -26,6 +26,7 @@ import org.apache.curator.framework.CuratorFrameworkFactory;
 import org.apache.curator.retry.RetryOneTime;
 import org.apache.curator.test.BaseClassForTests;
 import org.apache.curator.test.Timing;
+import org.apache.curator.test.compatibility.CuratorTestBase;
 import org.apache.curator.utils.CloseableUtils;
 import org.apache.curator.x.discovery.ServiceDiscovery;
 import org.apache.curator.x.discovery.ServiceDiscoveryBuilder;
@@ -38,6 +39,7 @@ import java.util.List;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.Semaphore;
 
+@Test(groups = CuratorTestBase.zk35TestCompatibilityGroup)
 public class TestServiceDiscovery extends BaseClassForTests
 {
     private static final Comparator<ServiceInstance<Void>> comparator = new 
Comparator<ServiceInstance<Void>>()
diff --git 
a/curator-x-discovery/src/test/java/org/apache/curator/x/discovery/details/TestServiceDiscoveryBuilder.java
 
b/curator-x-discovery/src/test/java/org/apache/curator/x/discovery/details/TestServiceDiscoveryBuilder.java
index 312c884..fd0c438 100644
--- 
a/curator-x-discovery/src/test/java/org/apache/curator/x/discovery/details/TestServiceDiscoveryBuilder.java
+++ 
b/curator-x-discovery/src/test/java/org/apache/curator/x/discovery/details/TestServiceDiscoveryBuilder.java
@@ -22,15 +22,17 @@ import org.apache.curator.framework.CuratorFramework;
 import org.apache.curator.framework.CuratorFrameworkFactory;
 import org.apache.curator.retry.RetryOneTime;
 import org.apache.curator.test.BaseClassForTests;
+import org.apache.curator.test.compatibility.CuratorTestBase;
 import org.apache.curator.x.discovery.ServiceDiscoveryBuilder;
 import org.apache.curator.x.discovery.ServiceInstance;
 import org.testng.Assert;
 import org.testng.annotations.Test;
 
+@Test(groups = CuratorTestBase.zk35TestCompatibilityGroup)
 public class TestServiceDiscoveryBuilder extends BaseClassForTests
 {
     @Test
-    public void testDefaultSerializer() throws Exception
+    public void testDefaultSerializer()
     {        
         CuratorFramework client = 
CuratorFrameworkFactory.newClient(server.getConnectString(), new 
RetryOneTime(1));
         ServiceDiscoveryBuilder<Object> builder = 
ServiceDiscoveryBuilder.builder(Object.class).client(client);
@@ -41,7 +43,7 @@ public class TestServiceDiscoveryBuilder extends 
BaseClassForTests
     }
 
     @Test
-    public void testSetSerializer() throws Exception
+    public void testSetSerializer()
     {
         CuratorFramework client = 
CuratorFrameworkFactory.newClient(server.getConnectString(), new 
RetryOneTime(1));
         ServiceDiscoveryBuilder<Object> builder = 
ServiceDiscoveryBuilder.builder(Object.class).client(client);
diff --git 
a/curator-x-discovery/src/test/java/org/apache/curator/x/discovery/details/TestServiceProvider.java
 
b/curator-x-discovery/src/test/java/org/apache/curator/x/discovery/details/TestServiceProvider.java
index d7358fe..b743e07 100644
--- 
a/curator-x-discovery/src/test/java/org/apache/curator/x/discovery/details/TestServiceProvider.java
+++ 
b/curator-x-discovery/src/test/java/org/apache/curator/x/discovery/details/TestServiceProvider.java
@@ -26,6 +26,7 @@ import org.apache.curator.framework.CuratorFramework;
 import org.apache.curator.framework.CuratorFrameworkFactory;
 import org.apache.curator.retry.RetryOneTime;
 import org.apache.curator.test.BaseClassForTests;
+import org.apache.curator.test.compatibility.CuratorTestBase;
 import org.apache.curator.utils.CloseableUtils;
 import org.apache.curator.x.discovery.ServiceDiscovery;
 import org.apache.curator.x.discovery.ServiceDiscoveryBuilder;
@@ -36,6 +37,7 @@ import org.testng.annotations.Test;
 
 import com.google.common.collect.Lists;
 
+@Test(groups = CuratorTestBase.zk35TestCompatibilityGroup)
 public class TestServiceProvider extends BaseClassForTests
 {
 
diff --git a/src/site/confluence/breaking-changes.confluence 
b/src/site/confluence/breaking-changes.confluence
index 3170a16..af0dc04 100644
--- a/src/site/confluence/breaking-changes.confluence
+++ b/src/site/confluence/breaking-changes.confluence
@@ -8,3 +8,6 @@ need to use Curator with ZooKeeper 3.4.x you will need to use a 
previous version
 * Exhibitor support has been removed.
 * {{ConnectionHandlingPolicy}} and related classes have been removed.
 * The {{Reaper}} and {{ChildReaper}} classes/recipes have been removed. You 
should use ZooKeeper container nodes instead.
+* {{newPersistentEphemeralNode(}} and {{newPathChildrenCache}} were removed 
from {{GroupMember}}
+* {{ServiceCacheBuilder<T> executorService(CloseableExecutorService 
executorService)} was removed from {{ServiceCacheBuilder}}
+* {{ServiceProviderBuilder<T> executorService(CloseableExecutorService 
executorService);)} was removed from {{ServiceProviderBuilder}}

Reply via email to