This is an automated email from the ASF dual-hosted git repository.
randgalt pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/curator.git
The following commit(s) were added to refs/heads/master by this push:
new b29bb01 CURATOR-549
b29bb01 is described below
commit b29bb010ada7f17626438b828de1f9e122e5d7bf
Author: randgalt <[email protected]>
AuthorDate: Fri Mar 20 14:48:18 2020 -0500
CURATOR-549
Creates a simple bridge that, when using ZK 3.6.0 creates a CuratorCache,
and for earlier versions creates a TreeCache. The curator-test-zk35 module
ensures that both code paths are tested.
---
.../cache/CompatibleCuratorCacheBridge.java | 144 +++++++++++++++++++++
.../framework/recipes/cache/CuratorCache.java | 15 +++
.../recipes/cache/CuratorCacheBridge.java | 36 ++++++
.../recipes/cache/CuratorCacheBridgeBuilder.java | 57 ++++++++
.../cache/CuratorCacheBridgeBuilderImpl.java | 77 +++++++++++
.../framework/recipes/cache/CuratorCacheImpl.java | 8 +-
.../recipes/cache/CuratorCacheStorage.java | 2 +-
.../recipes/cache/TreeCacheListenerWrapper.java | 15 +--
.../framework/recipes/nodes/GroupMember.java | 32 ++---
.../recipes/cache/TestCuratorCacheBridge.java | 66 ++++++++++
.../framework/recipes/nodes/TestGroupMember.java | 2 +
.../x/async/modeled/details/ModeledCacheImpl.java | 65 ++++++----
.../async/modeled/TestCachedModeledFramework.java | 2 +
.../apache/curator/x/discovery/ServiceCache.java | 3 +
.../curator/x/discovery/ServiceCacheBuilder.java | 10 --
.../x/discovery/ServiceProviderBuilder.java | 24 +---
.../discovery/details/ServiceCacheBuilderImpl.java | 24 +---
.../x/discovery/details/ServiceCacheImpl.java | 83 +++++++-----
.../x/discovery/details/ServiceDiscoveryImpl.java | 66 +++++-----
.../details/ServiceProviderBuilderImpl.java | 9 +-
.../x/discovery/details/ServiceProviderImpl.java | 24 +---
.../x/discovery/ServiceCacheLeakTester.java | 3 +
.../curator/x/discovery/TestServiceCache.java | 8 ++
.../x/discovery/details/TestServiceCacheRace.java | 2 +
.../x/discovery/details/TestServiceDiscovery.java | 2 +
.../details/TestServiceDiscoveryBuilder.java | 6 +-
.../x/discovery/details/TestServiceProvider.java | 2 +
src/site/confluence/breaking-changes.confluence | 3 +
28 files changed, 595 insertions(+), 195 deletions(-)
diff --git
a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/CompatibleCuratorCacheBridge.java
b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/CompatibleCuratorCacheBridge.java
new file mode 100644
index 0000000..97293f2
--- /dev/null
+++
b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/CompatibleCuratorCacheBridge.java
@@ -0,0 +1,144 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.curator.framework.recipes.cache;
+
+import com.google.common.collect.Sets;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.listen.Listenable;
+import org.apache.curator.framework.listen.StandardListenerManager;
+import java.util.Collections;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.ExecutorService;
+import java.util.stream.Stream;
+import java.util.stream.StreamSupport;
+
+import static
org.apache.curator.framework.recipes.cache.CuratorCacheListener.Type.*;
+
+/**
+ * Version of CuratorCacheBridge for pre-ZK 3.6 - uses TreeCache instead of
CuratorCache
+ */
+@SuppressWarnings("deprecation")
+class CompatibleCuratorCacheBridge implements CuratorCacheBridge,
TreeCacheListener
+{
+ private final TreeCache cache;
+ private final StandardListenerManager<CuratorCacheListener>
listenerManager = StandardListenerManager.standard();
+
+ CompatibleCuratorCacheBridge(CuratorFramework client, String path,
CuratorCache.Options[] optionsArg, ExecutorService executorService, boolean
cacheData)
+ {
+ Set<CuratorCache.Options> options = (optionsArg != null) ?
Sets.newHashSet(optionsArg) : Collections.emptySet();
+ TreeCache.Builder builder = TreeCache.newBuilder(client,
path).setCacheData(cacheData);
+ if ( options.contains(CuratorCache.Options.SINGLE_NODE_CACHE) )
+ {
+ builder.setMaxDepth(0);
+ }
+ if ( options.contains(CuratorCache.Options.COMPRESSED_DATA) )
+ {
+ builder.setDataIsCompressed(true);
+ }
+ if ( executorService != null )
+ {
+ builder.setExecutor(executorService);
+ }
+ cache = builder.build();
+ }
+
+ @Override
+ public void start()
+ {
+ try
+ {
+ cache.getListenable().addListener(this);
+
+ cache.start();
+ }
+ catch ( Exception e )
+ {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Override
+ public void close()
+ {
+ cache.close();
+ }
+
+ @Override
+ public boolean isCuratorCache()
+ {
+ return false;
+ }
+
+ @Override
+ public Listenable<CuratorCacheListener> listenable()
+ {
+ return listenerManager;
+ }
+
+ @Override
+ public Optional<ChildData> get(String path)
+ {
+ return Optional.ofNullable(cache.getCurrentData(path));
+ }
+
+ @Override
+ public int size()
+ {
+ return cache.size();
+ }
+
+ @Override
+ public Stream<ChildData> stream()
+ {
+ Iterable<ChildData> iterable = cache::iterator;
+ return StreamSupport.stream(iterable.spliterator(), false);
+ }
+
+ @Override
+ public void childEvent(CuratorFramework client, TreeCacheEvent event)
throws Exception
+ {
+ switch ( event.getType() )
+ {
+ case NODE_ADDED:
+ {
+ listenerManager.forEach(listener ->
listener.event(NODE_CREATED, null, event.getData()));
+ break;
+ }
+
+ case NODE_REMOVED:
+ {
+ listenerManager.forEach(listener ->
listener.event(NODE_DELETED, event.getData(), null));
+ break;
+ }
+
+ case NODE_UPDATED:
+ {
+ listenerManager.forEach(listener ->
listener.event(NODE_CHANGED, event.getOldData(), event.getData()));
+ break;
+ }
+
+ case INITIALIZED:
+ {
+ listenerManager.forEach(CuratorCacheListener::initialized);
+ break;
+ }
+ }
+ }
+}
diff --git
a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/CuratorCache.java
b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/CuratorCache.java
index 8edca58..8005f03 100644
---
a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/CuratorCache.java
+++
b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/CuratorCache.java
@@ -98,6 +98,21 @@ public interface CuratorCache extends Closeable,
CuratorCacheAccessor
}
/**
+ * Start a Curator Cache Bridge builder. A Curator Cache Bridge is
+ * a facade that uses {@link
org.apache.curator.framework.recipes.cache.CuratorCache} if
+ * persistent watches are available or {@link
org.apache.curator.framework.recipes.cache.TreeCache}
+ * otherwise (i.e. if you are using ZooKeeper 3.5.x).
+ *
+ * @param client Curator client
+ * @param path path to cache
+ * @return bridge builder
+ */
+ static CuratorCacheBridgeBuilder bridgeBuilder(CuratorFramework client,
String path)
+ {
+ return new CuratorCacheBridgeBuilderImpl(client, path);
+ }
+
+ /**
* Start the cache. This will cause a complete refresh from the cache's
root node and generate
* events for all nodes found, etc.
*/
diff --git
a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/CuratorCacheBridge.java
b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/CuratorCacheBridge.java
new file mode 100644
index 0000000..b6ef9b9
--- /dev/null
+++
b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/CuratorCacheBridge.java
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.curator.framework.recipes.cache;
+
+/**
+ * A facade that uses {@link
org.apache.curator.framework.recipes.cache.CuratorCache} if
+ * persistent watches are available or a {@link
org.apache.curator.framework.recipes.cache.TreeCache}
+ * otherwise
+ */
+@SuppressWarnings("deprecation")
+public interface CuratorCacheBridge extends CuratorCache
+{
+ /**
+ * Returns true if the underlying cache is {@link
org.apache.curator.framework.recipes.cache.CuratorCache} (i.e. ZooKeeper 3.6+).
+ * Otherwise it is {@link
org.apache.curator.framework.recipes.cache.TreeCache} (i.e. ZooKeeper 3.5.x)
+ *
+ * @return true/false
+ */
+ boolean isCuratorCache();
+}
diff --git
a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/CuratorCacheBridgeBuilder.java
b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/CuratorCacheBridgeBuilder.java
new file mode 100644
index 0000000..f84f56c
--- /dev/null
+++
b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/CuratorCacheBridgeBuilder.java
@@ -0,0 +1,57 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.curator.framework.recipes.cache;
+
+import java.util.concurrent.ExecutorService;
+
+public interface CuratorCacheBridgeBuilder
+{
+ /**
+ * @param options any options
+ * @return this
+ */
+ CuratorCacheBridgeBuilder withOptions(CuratorCache.Options... options);
+
+ /**
+ * The bridge cache will not retain the data bytes. i.e. ChildData objects
+ * returned by the cache will always return {@code null} for {@link
ChildData#getData()}
+ *
+ * @return this
+ */
+ CuratorCacheBridgeBuilder withDataNotCached();
+
+ /**
+ * If the old {@link org.apache.curator.framework.recipes.cache.TreeCache}
is used by the bridge
+ * (i.e. you are using ZooKeeper 3.5.x) then this executor service is
passed to {@link
org.apache.curator.framework.recipes.cache.TreeCache.Builder#setExecutor(java.util.concurrent.ExecutorService)}.
+ * For {@link org.apache.curator.framework.recipes.cache.CuratorCache}
this is not used and will be ignored (a warning will be logged).
+ *
+ * @param executorService executor to use for ZooKeeper 3.5.x
+ * @return this
+ */
+ @SuppressWarnings("deprecation")
+ CuratorCacheBridgeBuilder withExecutorService(ExecutorService
executorService);
+
+ /**
+ * Return a new Curator Cache Bridge based on the builder methods that
have been called
+ *
+ * @return new Curator Cache Bridge
+ */
+ CuratorCacheBridge build();
+}
\ No newline at end of file
diff --git
a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/CuratorCacheBridgeBuilderImpl.java
b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/CuratorCacheBridgeBuilderImpl.java
new file mode 100644
index 0000000..054c0ba
--- /dev/null
+++
b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/CuratorCacheBridgeBuilderImpl.java
@@ -0,0 +1,77 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.curator.framework.recipes.cache;
+
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.utils.Compatibility;
+import org.slf4j.LoggerFactory;
+import java.util.concurrent.ExecutorService;
+
+class CuratorCacheBridgeBuilderImpl implements CuratorCacheBridgeBuilder
+{
+ private final CuratorFramework client;
+ private final String path;
+ private CuratorCache.Options[] options;
+ private boolean cacheData = true;
+ private ExecutorService executorService = null;
+ private final boolean forceTreeCache =
Boolean.getBoolean("curator-cache-bridge-force-tree-cache");
+
+ CuratorCacheBridgeBuilderImpl(CuratorFramework client, String path)
+ {
+ this.client = client;
+ this.path = path;
+ }
+
+ @Override
+ public CuratorCacheBridgeBuilder withOptions(CuratorCache.Options...
options)
+ {
+ this.options = options;
+ return this;
+ }
+
+ @Override
+ public CuratorCacheBridgeBuilder withDataNotCached()
+ {
+ cacheData = false;
+ return this;
+ }
+
+ @Override
+ public CuratorCacheBridgeBuilder withExecutorService(ExecutorService
executorService)
+ {
+ this.executorService = executorService;
+ return this;
+ }
+
+ @Override
+ public CuratorCacheBridge build()
+ {
+ if ( !forceTreeCache && Compatibility.hasPersistentWatchers() )
+ {
+ if ( executorService != null )
+ {
+ LoggerFactory.getLogger(getClass()).warn("CuratorCache does
not support custom ExecutorService");
+ }
+ CuratorCacheStorage storage = cacheData ?
CuratorCacheStorage.standard() : CuratorCacheStorage.dataNotCached();
+ return new CuratorCacheImpl(client, storage, path, options, null);
+ }
+ return new CompatibleCuratorCacheBridge(client, path, options,
executorService, cacheData);
+ }
+}
\ No newline at end of file
diff --git
a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/CuratorCacheImpl.java
b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/CuratorCacheImpl.java
index 7a79690..1446204 100644
---
a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/CuratorCacheImpl.java
+++
b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/CuratorCacheImpl.java
@@ -45,7 +45,7 @@ import static
org.apache.curator.framework.recipes.cache.CuratorCacheListener.Ty
import static org.apache.zookeeper.KeeperException.Code.NONODE;
import static org.apache.zookeeper.KeeperException.Code.OK;
-class CuratorCacheImpl implements CuratorCache
+class CuratorCacheImpl implements CuratorCache, CuratorCacheBridge
{
private final Logger log = LoggerFactory.getLogger(getClass());
private final AtomicReference<State> state = new
AtomicReference<>(State.LATENT);
@@ -103,6 +103,12 @@ class CuratorCacheImpl implements CuratorCache
}
@Override
+ public boolean isCuratorCache()
+ {
+ return true;
+ }
+
+ @Override
public Listenable<CuratorCacheListener> listenable()
{
return listenerManager;
diff --git
a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/CuratorCacheStorage.java
b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/CuratorCacheStorage.java
index a65c3f0..7540b56 100644
---
a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/CuratorCacheStorage.java
+++
b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/CuratorCacheStorage.java
@@ -42,7 +42,7 @@ public interface CuratorCacheStorage extends
CuratorCacheAccessor
*
* @return storage instance that does not retain data bytes
*/
- static CuratorCacheStorage bytesNotCached()
+ static CuratorCacheStorage dataNotCached()
{
return new StandardCuratorCacheStorage(false);
}
diff --git
a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/TreeCacheListenerWrapper.java
b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/TreeCacheListenerWrapper.java
index 570799b..9c99f69 100644
---
a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/TreeCacheListenerWrapper.java
+++
b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/TreeCacheListenerWrapper.java
@@ -20,9 +20,6 @@
package org.apache.curator.framework.recipes.cache;
import org.apache.curator.framework.CuratorFramework;
-import org.apache.curator.framework.recipes.cache.ChildData;
-import org.apache.curator.framework.recipes.cache.TreeCacheEvent;
-import org.apache.curator.framework.recipes.cache.TreeCacheListener;
class TreeCacheListenerWrapper implements CuratorCacheListener
{
@@ -42,19 +39,19 @@ class TreeCacheListenerWrapper implements
CuratorCacheListener
{
case NODE_CREATED:
{
- sendEvent(data, TreeCacheEvent.Type.NODE_ADDED);
+ sendEvent(data, null, TreeCacheEvent.Type.NODE_ADDED);
break;
}
case NODE_CHANGED:
{
- sendEvent(data, TreeCacheEvent.Type.NODE_UPDATED);
+ sendEvent(data, oldData, TreeCacheEvent.Type.NODE_UPDATED);
break;
}
case NODE_DELETED:
{
- sendEvent(oldData, TreeCacheEvent.Type.NODE_REMOVED);
+ sendEvent(oldData, null, TreeCacheEvent.Type.NODE_REMOVED);
break;
}
}
@@ -63,12 +60,12 @@ class TreeCacheListenerWrapper implements
CuratorCacheListener
@Override
public void initialized()
{
- sendEvent(null, TreeCacheEvent.Type.INITIALIZED);
+ sendEvent(null, null, TreeCacheEvent.Type.INITIALIZED);
}
- private void sendEvent(ChildData node, TreeCacheEvent.Type type)
+ private void sendEvent(ChildData node, ChildData oldNode,
TreeCacheEvent.Type type)
{
- TreeCacheEvent event = new TreeCacheEvent(type, node);
+ TreeCacheEvent event = new TreeCacheEvent(type, node, oldNode);
try
{
listener.childEvent(client, event);
diff --git
a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/nodes/GroupMember.java
b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/nodes/GroupMember.java
index 8cd1f65..681e3fd 100644
---
a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/nodes/GroupMember.java
+++
b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/nodes/GroupMember.java
@@ -24,21 +24,27 @@ import com.google.common.collect.ImmutableMap;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.cache.ChildData;
-import org.apache.curator.framework.recipes.cache.PathChildrenCache;
+import org.apache.curator.framework.recipes.cache.CuratorCache;
+import org.apache.curator.framework.recipes.cache.CuratorCacheBridge;
import org.apache.curator.utils.CloseableUtils;
import org.apache.curator.utils.ThreadUtils;
import org.apache.curator.utils.ZKPaths;
+import org.apache.zookeeper.CreateMode;
import java.io.Closeable;
+import java.util.Iterator;
import java.util.Map;
+import static
org.apache.curator.framework.recipes.cache.CuratorCacheAccessor.parentPathFilter;
+
/**
* Group membership management. Adds this instance into a group and
* keeps a cache of members in the group
*/
public class GroupMember implements Closeable
{
- private final PersistentEphemeralNode pen;
- private final PathChildrenCache cache;
+ private final PersistentNode pen;
+ private final CuratorCacheBridge cache;
+ private final String membershipPath;
private final String thisId;
/**
@@ -59,10 +65,11 @@ public class GroupMember implements Closeable
*/
public GroupMember(CuratorFramework client, String membershipPath, String
thisId, byte[] payload)
{
+ this.membershipPath = membershipPath;
this.thisId = Preconditions.checkNotNull(thisId, "thisId cannot be
null");
- cache = newPathChildrenCache(client, membershipPath);
- pen = newPersistentEphemeralNode(client, membershipPath, thisId,
payload);
+ cache = CuratorCache.bridgeBuilder(client, membershipPath).build();
+ pen = new PersistentNode(client, CreateMode.EPHEMERAL, false,
ZKPaths.makePath(membershipPath, thisId), payload);
}
/**
@@ -121,8 +128,11 @@ public class GroupMember implements Closeable
{
ImmutableMap.Builder<String, byte[]> builder = ImmutableMap.builder();
boolean thisIdAdded = false;
- for ( ChildData data : cache.getCurrentData() )
+
+ Iterator<ChildData> iterator =
cache.stream().filter(parentPathFilter(membershipPath)).iterator();
+ while ( iterator.hasNext() )
{
+ ChildData data = iterator.next();
String id = idFromPath(data.getPath());
thisIdAdded = thisIdAdded || id.equals(thisId);
builder.put(id, data.getData());
@@ -144,14 +154,4 @@ public class GroupMember implements Closeable
{
return ZKPaths.getNodeFromPath(path);
}
-
- protected PersistentEphemeralNode
newPersistentEphemeralNode(CuratorFramework client, String membershipPath,
String thisId, byte[] payload)
- {
- return new PersistentEphemeralNode(client,
PersistentEphemeralNode.Mode.EPHEMERAL, ZKPaths.makePath(membershipPath,
thisId), payload);
- }
-
- protected PathChildrenCache newPathChildrenCache(CuratorFramework client,
String membershipPath)
- {
- return new PathChildrenCache(client, membershipPath, true);
- }
}
diff --git
a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestCuratorCacheBridge.java
b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestCuratorCacheBridge.java
new file mode 100644
index 0000000..b663ff2
--- /dev/null
+++
b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestCuratorCacheBridge.java
@@ -0,0 +1,66 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.curator.framework.recipes.cache;
+
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.CuratorFrameworkFactory;
+import org.apache.curator.retry.RetryOneTime;
+import org.apache.curator.test.compatibility.CuratorTestBase;
+import org.apache.curator.utils.Compatibility;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+public class TestCuratorCacheBridge extends CuratorTestBase
+{
+ @Test
+ public void testImplementationSelection()
+ {
+ try (CuratorFramework client =
CuratorFrameworkFactory.newClient(server.getConnectString(), new
RetryOneTime(1)))
+ {
+ CuratorCacheBridge cache = CuratorCache.bridgeBuilder(client,
"/foo").build();
+ if ( Compatibility.hasPersistentWatchers() )
+ {
+ Assert.assertTrue(cache instanceof CuratorCacheImpl);
+ Assert.assertTrue(cache.isCuratorCache());
+ }
+ else
+ {
+ Assert.assertTrue(cache instanceof
CompatibleCuratorCacheBridge);
+ Assert.assertFalse(cache.isCuratorCache());
+ }
+ }
+ }
+
+ @Test
+ public void testForceTreeCache()
+ {
+ System.setProperty("curator-cache-bridge-force-tree-cache", "true");
+ try (CuratorFramework client =
CuratorFrameworkFactory.newClient(server.getConnectString(), new
RetryOneTime(1)))
+ {
+ CuratorCacheBridge cache = CuratorCache.bridgeBuilder(client,
"/foo").build();
+ Assert.assertTrue(cache instanceof CompatibleCuratorCacheBridge);
+ Assert.assertFalse(cache.isCuratorCache());
+ }
+ finally
+ {
+ System.clearProperty("curator-cache-bridge-force-tree-cache");
+ }
+ }
+}
diff --git
a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/nodes/TestGroupMember.java
b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/nodes/TestGroupMember.java
index 2da051f..4395b36 100644
---
a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/nodes/TestGroupMember.java
+++
b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/nodes/TestGroupMember.java
@@ -25,11 +25,13 @@ import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.RetryOneTime;
import org.apache.curator.test.BaseClassForTests;
import org.apache.curator.test.Timing;
+import org.apache.curator.test.compatibility.CuratorTestBase;
import org.apache.curator.utils.CloseableUtils;
import org.testng.Assert;
import org.testng.annotations.Test;
import java.util.Map;
+@Test(groups = CuratorTestBase.zk35TestCompatibilityGroup)
public class TestGroupMember extends BaseClassForTests
{
// NOTE - don't need many tests as this class is just a wrapper around two
existing recipes
diff --git
a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/ModeledCacheImpl.java
b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/ModeledCacheImpl.java
index 5c015f4..df7200d 100644
---
a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/ModeledCacheImpl.java
+++
b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/ModeledCacheImpl.java
@@ -19,9 +19,13 @@
package org.apache.curator.x.async.modeled.details;
import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.EnsureContainers;
import org.apache.curator.framework.listen.Listenable;
import org.apache.curator.framework.listen.StandardListenerManager;
-import org.apache.curator.framework.recipes.cache.TreeCache;
+import org.apache.curator.framework.recipes.cache.CuratorCache;
+import org.apache.curator.framework.recipes.cache.CuratorCacheBridge;
+import org.apache.curator.framework.recipes.cache.CuratorCacheBridgeBuilder;
+import org.apache.curator.framework.recipes.cache.CuratorCacheListener;
import org.apache.curator.framework.recipes.cache.TreeCacheEvent;
import org.apache.curator.framework.recipes.cache.TreeCacheListener;
import org.apache.curator.utils.ThreadUtils;
@@ -42,11 +46,12 @@ import java.util.stream.Collectors;
class ModeledCacheImpl<T> implements TreeCacheListener, ModeledCache<T>
{
- private final TreeCache cache;
+ private final CuratorCacheBridge cache;
private final Map<ZPath, Entry<T>> entries = new ConcurrentHashMap<>();
private final ModelSerializer<T> serializer;
private final StandardListenerManager<ModeledCacheListener<T>>
listenerContainer = StandardListenerManager.standard();
private final ZPath basePath;
+ private final EnsureContainers ensureContainers;
private static final class Entry<T>
{
@@ -69,19 +74,32 @@ class ModeledCacheImpl<T> implements TreeCacheListener,
ModeledCache<T>
basePath = modelSpec.path();
this.serializer = modelSpec.serializer();
- cache = TreeCache.newBuilder(client, basePath.fullPath())
- .setCacheData(false)
-
.setDataIsCompressed(modelSpec.createOptions().contains(CreateOption.compress))
- .setExecutor(executor)
-
.setCreateParentNodes(modelSpec.createOptions().contains(CreateOption.createParentsIfNeeded)
|| modelSpec.createOptions().contains(CreateOption.createParentsAsContainers))
- .build();
+ CuratorCacheBridgeBuilder bridgeBuilder =
CuratorCache.bridgeBuilder(client,
basePath.fullPath()).withDataNotCached().withExecutorService(executor);
+ if ( modelSpec.createOptions().contains(CreateOption.compress) )
+ {
+ bridgeBuilder =
bridgeBuilder.withOptions(CuratorCache.Options.COMPRESSED_DATA);
+ }
+ cache = bridgeBuilder.build();
+
cache.listenable().addListener(CuratorCacheListener.builder().forTreeCache(client,
this).build());
+
+ if (
modelSpec.createOptions().contains(CreateOption.createParentsIfNeeded) ||
modelSpec.createOptions().contains(CreateOption.createParentsAsContainers) )
+ {
+ ensureContainers = new EnsureContainers(client,
basePath.fullPath());
+ }
+ else
+ {
+ ensureContainers = null;
+ }
}
public void start()
{
try
{
- cache.getListenable().addListener(this);
+ if ( ensureContainers != null )
+ {
+ ensureContainers.ensure();
+ }
cache.start();
}
catch ( Exception e )
@@ -92,7 +110,6 @@ class ModeledCacheImpl<T> implements TreeCacheListener,
ModeledCache<T>
public void close()
{
- cache.getListenable().removeListener(this);
cache.close();
entries.clear();
}
@@ -148,7 +165,7 @@ class ModeledCacheImpl<T> implements TreeCacheListener,
ModeledCache<T>
}
}
- private void internalChildEvent(TreeCacheEvent event) throws Exception
+ private void internalChildEvent(TreeCacheEvent event)
{
switch ( event.getType() )
{
@@ -156,16 +173,13 @@ class ModeledCacheImpl<T> implements TreeCacheListener,
ModeledCache<T>
case NODE_UPDATED:
{
ZPath path = ZPath.parse(event.getData().getPath());
- if ( !path.equals(basePath) )
+ byte[] bytes = event.getData().getData();
+ if ( (bytes != null) && (bytes.length > 0) ) // otherwise it's
probably just a parent node being created
{
- byte[] bytes = event.getData().getData();
- if ( (bytes != null) && (bytes.length > 0) ) // otherwise
it's probably just a parent node being created
- {
- T model = serializer.deserialize(bytes);
- entries.put(path, new Entry<>(event.getData().getStat(),
model));
- ModeledCacheListener.Type type = (event.getType() ==
TreeCacheEvent.Type.NODE_ADDED) ? ModeledCacheListener.Type.NODE_ADDED :
ModeledCacheListener.Type.NODE_UPDATED;
- accept(type, path, event.getData().getStat(), model);
- }
+ T model = serializer.deserialize(bytes);
+ entries.put(path, new Entry<>(event.getData().getStat(),
model));
+ ModeledCacheListener.Type type = (event.getType() ==
TreeCacheEvent.Type.NODE_ADDED) ? ModeledCacheListener.Type.NODE_ADDED :
ModeledCacheListener.Type.NODE_UPDATED;
+ accept(type, path, event.getData().getStat(), model);
}
break;
}
@@ -173,13 +187,10 @@ class ModeledCacheImpl<T> implements TreeCacheListener,
ModeledCache<T>
case NODE_REMOVED:
{
ZPath path = ZPath.parse(event.getData().getPath());
- if ( !path.equals(basePath) )
- {
- Entry<T> entry = entries.remove(path);
- T model = (entry != null) ? entry.model :
serializer.deserialize(event.getData().getData());
- Stat stat = (entry != null) ? entry.stat :
event.getData().getStat();
- accept(ModeledCacheListener.Type.NODE_REMOVED, path, stat,
model);
- }
+ Entry<T> entry = entries.remove(path);
+ T model = (entry != null) ? entry.model :
serializer.deserialize(event.getData().getData());
+ Stat stat = (entry != null) ? entry.stat :
event.getData().getStat();
+ accept(ModeledCacheListener.Type.NODE_REMOVED, path, stat, model);
break;
}
diff --git
a/curator-x-async/src/test/java/org/apache/curator/x/async/modeled/TestCachedModeledFramework.java
b/curator-x-async/src/test/java/org/apache/curator/x/async/modeled/TestCachedModeledFramework.java
index 2d33c13..70f9c66 100644
---
a/curator-x-async/src/test/java/org/apache/curator/x/async/modeled/TestCachedModeledFramework.java
+++
b/curator-x-async/src/test/java/org/apache/curator/x/async/modeled/TestCachedModeledFramework.java
@@ -21,6 +21,7 @@ package org.apache.curator.x.async.modeled;
import com.google.common.collect.Sets;
import org.apache.curator.framework.state.ConnectionState;
import org.apache.curator.test.Timing;
+import org.apache.curator.test.compatibility.CuratorTestBase;
import org.apache.curator.x.async.modeled.cached.CachedModeledFramework;
import org.apache.curator.x.async.modeled.cached.ModeledCacheListener;
import org.apache.curator.x.async.modeled.models.TestModel;
@@ -38,6 +39,7 @@ import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;
+@Test(groups = CuratorTestBase.zk35TestCompatibilityGroup)
public class TestCachedModeledFramework extends TestModeledFrameworkBase
{
@Test
diff --git
a/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/ServiceCache.java
b/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/ServiceCache.java
index a122d69..d409f99 100644
---
a/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/ServiceCache.java
+++
b/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/ServiceCache.java
@@ -23,6 +23,7 @@ import
org.apache.curator.x.discovery.details.InstanceProvider;
import org.apache.curator.x.discovery.details.ServiceCacheListener;
import java.io.Closeable;
import java.util.List;
+import java.util.concurrent.CountDownLatch;
public interface ServiceCache<T> extends Closeable,
Listenable<ServiceCacheListener>, InstanceProvider<T>
{
@@ -41,4 +42,6 @@ public interface ServiceCache<T> extends Closeable,
Listenable<ServiceCacheListe
* @throws Exception errors
*/
public void start() throws Exception;
+
+ CountDownLatch startImmediate() throws Exception;
}
diff --git
a/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/ServiceCacheBuilder.java
b/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/ServiceCacheBuilder.java
index 326a16c..cfec2c4 100644
---
a/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/ServiceCacheBuilder.java
+++
b/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/ServiceCacheBuilder.java
@@ -18,7 +18,6 @@
*/
package org.apache.curator.x.discovery;
-import org.apache.curator.utils.CloseableExecutorService;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ThreadFactory;
@@ -59,13 +58,4 @@ public interface ServiceCacheBuilder<T>
* @return this
*/
public ServiceCacheBuilder<T> executorService(ExecutorService
executorService);
-
- /**
- * Optional CloseableExecutorService to use for the cache's background
thread. The specified ExecutorService
- * overrides any prior ThreadFactory or ExecutorService set on the
ServiceCacheBuilder.
- *
- * @param executorService an instance of CloseableExecutorService
- * @return this
- */
- public ServiceCacheBuilder<T> executorService(CloseableExecutorService
executorService);
}
diff --git
a/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/ServiceProviderBuilder.java
b/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/ServiceProviderBuilder.java
index 02948a3..4c0544d 100644
---
a/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/ServiceProviderBuilder.java
+++
b/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/ServiceProviderBuilder.java
@@ -18,7 +18,6 @@
*/
package org.apache.curator.x.discovery;
-import org.apache.curator.utils.CloseableExecutorService;
import org.apache.curator.x.discovery.strategies.RoundRobinStrategy;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ThreadFactory;
@@ -30,7 +29,7 @@ public interface ServiceProviderBuilder<T>
*
* @return provider
*/
- public ServiceProvider<T> build();
+ ServiceProvider<T> build();
/**
* required - set the name of the service to be provided
@@ -38,7 +37,7 @@ public interface ServiceProviderBuilder<T>
* @param serviceName the name of the service
* @return this
*/
- public ServiceProviderBuilder<T> serviceName(String serviceName);
+ ServiceProviderBuilder<T> serviceName(String serviceName);
/**
* optional - set the provider strategy. The default is {@link
RoundRobinStrategy}
@@ -46,7 +45,7 @@ public interface ServiceProviderBuilder<T>
* @param providerStrategy strategy to use
* @return this
*/
- public ServiceProviderBuilder<T> providerStrategy(ProviderStrategy<T>
providerStrategy);
+ ServiceProviderBuilder<T> providerStrategy(ProviderStrategy<T>
providerStrategy);
/**
* optional - the thread factory to use for creating internal threads. The
specified ThreadFactory overrides
@@ -57,7 +56,7 @@ public interface ServiceProviderBuilder<T>
* @deprecated use {@link #executorService(ExecutorService)} instead
*/
@Deprecated
- public ServiceProviderBuilder<T> threadFactory(ThreadFactory
threadFactory);
+ ServiceProviderBuilder<T> threadFactory(ThreadFactory threadFactory);
/**
* Set the down instance policy
@@ -65,7 +64,7 @@ public interface ServiceProviderBuilder<T>
* @param downInstancePolicy new policy
* @return this
*/
- public ServiceProviderBuilder<T> downInstancePolicy(DownInstancePolicy
downInstancePolicy);
+ ServiceProviderBuilder<T> downInstancePolicy(DownInstancePolicy
downInstancePolicy);
/**
* Add an instance filter. NOTE: this does not remove previously added
filters. i.e.
@@ -75,7 +74,7 @@ public interface ServiceProviderBuilder<T>
* @param filter filter to add
* @return this
*/
- public ServiceProviderBuilder<T> additionalFilter(InstanceFilter<T>
filter);
+ ServiceProviderBuilder<T> additionalFilter(InstanceFilter<T> filter);
/**
* Optional ExecutorService to use for the cache's background thread. The
specified ExecutorService
@@ -85,14 +84,5 @@ public interface ServiceProviderBuilder<T>
* @param executorService executor service
* @return this
*/
- public ServiceProviderBuilder<T> executorService(ExecutorService
executorService);
-
- /**
- * Optional CloseableExecutorService to use for the cache's background
thread. The specified CloseableExecutorService
- * overrides any prior ThreadFactory or CloseableExecutorService set on
the ServiceProviderBuilder.
- *
- * @param executorService an instance of CloseableExecutorService
- * @return this
- */
- public ServiceProviderBuilder<T> executorService(CloseableExecutorService
executorService);
+ ServiceProviderBuilder<T> executorService(ExecutorService executorService);
}
diff --git
a/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceCacheBuilderImpl.java
b/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceCacheBuilderImpl.java
index 7647c0f..cf7f468 100644
---
a/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceCacheBuilderImpl.java
+++
b/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceCacheBuilderImpl.java
@@ -18,7 +18,6 @@
*/
package org.apache.curator.x.discovery.details;
-import org.apache.curator.utils.CloseableExecutorService;
import org.apache.curator.x.discovery.ServiceCache;
import org.apache.curator.x.discovery.ServiceCacheBuilder;
import java.util.concurrent.ExecutorService;
@@ -32,7 +31,7 @@ class ServiceCacheBuilderImpl<T> implements
ServiceCacheBuilder<T>
private ServiceDiscoveryImpl<T> discovery;
private String name;
private ThreadFactory threadFactory;
- private CloseableExecutorService executorService;
+ private ExecutorService executorService;
ServiceCacheBuilderImpl(ServiceDiscoveryImpl<T> discovery)
{
@@ -47,13 +46,13 @@ class ServiceCacheBuilderImpl<T> implements
ServiceCacheBuilder<T>
@Override
public ServiceCache<T> build()
{
- if (executorService != null)
+ if (threadFactory != null)
{
- return new ServiceCacheImpl<T>(discovery, name, executorService);
+ return new ServiceCacheImpl<T>(discovery, name, threadFactory);
}
else
{
- return new ServiceCacheImpl<T>(discovery, name, threadFactory);
+ return new ServiceCacheImpl<T>(discovery, name, executorService);
}
}
@@ -92,20 +91,9 @@ class ServiceCacheBuilderImpl<T> implements
ServiceCacheBuilder<T>
* @return this
*/
@Override
- public ServiceCacheBuilder<T> executorService(ExecutorService
executorService) {
- return executorService(new CloseableExecutorService(executorService,
false));
- }
-
- /**
- * Optional CloseableExecutorService to use for the cache's background
thread
- *
- * @param executorService an instance of CloseableExecutorService
- * @return this
- */
- @Override
- public ServiceCacheBuilder<T> executorService(CloseableExecutorService
executorService) {
+ public ServiceCacheBuilder<T> executorService(ExecutorService
executorService)
+ {
this.executorService = executorService;
- this.threadFactory = null;
return this;
}
}
diff --git
a/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceCacheImpl.java
b/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceCacheImpl.java
index c154836..ae24fa4 100644
---
a/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceCacheImpl.java
+++
b/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceCacheImpl.java
@@ -20,26 +20,27 @@
package org.apache.curator.x.discovery.details;
import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Function;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.EnsureContainers;
import org.apache.curator.framework.listen.StandardListenerManager;
import org.apache.curator.framework.recipes.cache.ChildData;
-import org.apache.curator.framework.recipes.cache.PathChildrenCache;
+import org.apache.curator.framework.recipes.cache.CuratorCache;
+import org.apache.curator.framework.recipes.cache.CuratorCacheBridge;
+import org.apache.curator.framework.recipes.cache.CuratorCacheListener;
import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent;
import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener;
-import org.apache.curator.utils.CloseableExecutorService;
import org.apache.curator.utils.CloseableUtils;
import org.apache.curator.utils.ZKPaths;
import org.apache.curator.x.discovery.ServiceCache;
import org.apache.curator.x.discovery.ServiceInstance;
-import java.io.IOException;
import java.util.List;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executor;
+import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicReference;
@@ -48,19 +49,21 @@ public class ServiceCacheImpl<T> implements
ServiceCache<T>, PathChildrenCacheLi
{
private final StandardListenerManager<ServiceCacheListener>
listenerContainer = StandardListenerManager.standard();
private final ServiceDiscoveryImpl<T> discovery;
- private final AtomicReference<State> state = new
AtomicReference<State>(State.LATENT);
- private final PathChildrenCache cache;
+ private final AtomicReference<State> state = new
AtomicReference<>(State.LATENT);
+ private final CuratorCacheBridge cache;
private final ConcurrentMap<String, ServiceInstance<T>> instances =
Maps.newConcurrentMap();
+ private final EnsureContainers ensureContainers;
+ private final CountDownLatch initializedLatch = new CountDownLatch(1);
private enum State
{
LATENT, STARTED, STOPPED
}
- private static CloseableExecutorService convertThreadFactory(ThreadFactory
threadFactory)
+ private static ExecutorService convertThreadFactory(ThreadFactory
threadFactory)
{
Preconditions.checkNotNull(threadFactory, "threadFactory cannot be
null");
- return new
CloseableExecutorService(Executors.newSingleThreadExecutor(threadFactory),
true);
+ return Executors.newSingleThreadExecutor(threadFactory);
}
ServiceCacheImpl(ServiceDiscoveryImpl<T> discovery, String name,
ThreadFactory threadFactory)
@@ -68,16 +71,24 @@ public class ServiceCacheImpl<T> implements
ServiceCache<T>, PathChildrenCacheLi
this(discovery, name, convertThreadFactory(threadFactory));
}
- ServiceCacheImpl(ServiceDiscoveryImpl<T> discovery, String name,
CloseableExecutorService executorService)
+ ServiceCacheImpl(ServiceDiscoveryImpl<T> discovery, String name,
ExecutorService executorService)
{
Preconditions.checkNotNull(discovery, "discovery cannot be null");
Preconditions.checkNotNull(name, "name cannot be null");
- Preconditions.checkNotNull(executorService, "executorService cannot be
null");
this.discovery = discovery;
- cache = new PathChildrenCache(discovery.getClient(),
discovery.pathForName(name), true, false, executorService);
- cache.getListenable().addListener(this);
+ String path = discovery.pathForName(name);
+ cache = CuratorCache.bridgeBuilder(discovery.getClient(), path)
+ .withExecutorService(executorService)
+ .withDataNotCached()
+ .build();
+ CuratorCacheListener listener = CuratorCacheListener.builder()
+ .forPathChildrenCache(discovery.getClient(), this)
+ .forInitialized(this::initialized)
+ .build();
+ cache.listenable().addListener(listener);
+ ensureContainers = new EnsureContainers(discovery.getClient(), path);
}
@Override
@@ -93,11 +104,19 @@ public class ServiceCacheImpl<T> implements
ServiceCache<T>, PathChildrenCacheLi
@Override
public void start() throws Exception
{
+ startImmediate().await();
+ }
+
+ @Override
+ public CountDownLatch startImmediate() throws Exception
+ {
Preconditions.checkState(state.compareAndSet(State.LATENT,
State.STARTED), "Cannot be started more than once");
- cache.start(true);
+ ensureContainers.ensure();
+ cache.start();
if ( debugStartLatch != null )
{
+ initializedLatch.await();
debugStartLatch.countDown();
debugStartLatch = null;
}
@@ -107,18 +126,11 @@ public class ServiceCacheImpl<T> implements
ServiceCache<T>, PathChildrenCacheLi
debugStartWaitLatch = null;
}
- for ( ChildData childData : cache.getCurrentData() )
- {
- if ( childData.getData() != null ) // else already processed by
the cache listener
- {
- addInstance(childData, true);
- }
- }
- discovery.cacheOpened(this);
+ return initializedLatch;
}
@Override
- public void close() throws IOException
+ public void close()
{
Preconditions.checkState(state.compareAndSet(State.STARTED,
State.STOPPED), "Already closed or has not been started");
@@ -152,7 +164,7 @@ public class ServiceCacheImpl<T> implements
ServiceCache<T>, PathChildrenCacheLi
}
@Override
- public void childEvent(CuratorFramework client, PathChildrenCacheEvent
event) throws Exception
+ public void childEvent(CuratorFramework client, PathChildrenCacheEvent
event)
{
boolean notifyListeners = false;
switch ( event.getType() )
@@ -160,7 +172,7 @@ public class ServiceCacheImpl<T> implements
ServiceCache<T>, PathChildrenCacheLi
case CHILD_ADDED:
case CHILD_UPDATED:
{
- addInstance(event.getData(), false);
+ addInstance(event.getData());
notifyListeners = true;
break;
}
@@ -173,7 +185,7 @@ public class ServiceCacheImpl<T> implements
ServiceCache<T>, PathChildrenCacheLi
}
}
- if ( notifyListeners )
+ if ( notifyListeners && (initializedLatch.getCount() == 0) )
{
listenerContainer.forEach(ServiceCacheListener::cacheChanged);
}
@@ -184,18 +196,23 @@ public class ServiceCacheImpl<T> implements
ServiceCache<T>, PathChildrenCacheLi
return ZKPaths.getNodeFromPath(childData.getPath());
}
- private void addInstance(ChildData childData, boolean onlyIfAbsent) throws
Exception
+ private void addInstance(ChildData childData)
{
- String instanceId = instanceIdFromData(childData);
- ServiceInstance<T> serviceInstance =
discovery.getSerializer().deserialize(childData.getData());
- if ( onlyIfAbsent )
+ try
{
- instances.putIfAbsent(instanceId, serviceInstance);
+ String instanceId = instanceIdFromData(childData);
+ ServiceInstance<T> serviceInstance =
discovery.getSerializer().deserialize(childData.getData());
+ instances.put(instanceId, serviceInstance);
}
- else
+ catch ( Exception e )
{
- instances.put(instanceId, serviceInstance);
+ throw new RuntimeException(e);
}
- cache.clearDataBytes(childData.getPath(),
childData.getStat().getVersion());
+ }
+
+ private void initialized()
+ {
+ discovery.cacheOpened(this);
+ initializedLatch.countDown();
}
}
diff --git
a/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceDiscoveryImpl.java
b/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceDiscoveryImpl.java
index 476705c..57d6f16 100644
---
a/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceDiscoveryImpl.java
+++
b/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceDiscoveryImpl.java
@@ -26,8 +26,9 @@ import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import org.apache.curator.framework.CuratorFramework;
-import org.apache.curator.framework.recipes.cache.NodeCache;
-import org.apache.curator.framework.recipes.cache.NodeCacheListener;
+import org.apache.curator.framework.recipes.cache.CuratorCache;
+import org.apache.curator.framework.recipes.cache.CuratorCacheBridge;
+import org.apache.curator.framework.recipes.cache.CuratorCacheListener;
import org.apache.curator.framework.state.ConnectionState;
import org.apache.curator.framework.state.ConnectionStateListener;
import org.apache.curator.utils.CloseableUtils;
@@ -92,7 +93,7 @@ public class ServiceDiscoveryImpl<T> implements
ServiceDiscovery<T>
private static class Entry<T>
{
private volatile ServiceInstance<T> service;
- private volatile NodeCache cache;
+ private volatile CuratorCacheBridge cache;
private Entry(ServiceInstance<T> service)
{
@@ -277,8 +278,7 @@ public class ServiceDiscoveryImpl<T> implements
ServiceDiscovery<T>
@Override
public ServiceCacheBuilder<T> serviceCacheBuilder()
{
- return new ServiceCacheBuilderImpl<T>(this)
- .threadFactory(ThreadUtils.newThreadFactory("ServiceCache"));
+ return new ServiceCacheBuilderImpl<T>(this);
}
/**
@@ -458,52 +458,48 @@ public class ServiceDiscoveryImpl<T> implements
ServiceDiscovery<T>
}
}
- private NodeCache makeNodeCache(final ServiceInstance<T> instance)
+ private CuratorCacheBridge makeNodeCache(final ServiceInstance<T> instance)
{
if ( !watchInstances )
{
return null;
}
- final NodeCache nodeCache = new NodeCache(client,
pathForInstance(instance.getName(), instance.getId()));
- try
- {
- nodeCache.start(true);
- }
- catch ( InterruptedException e)
- {
- Thread.currentThread().interrupt();
- return null;
- }
- catch ( Exception e )
- {
- log.error("Could not start node cache for: " + instance, e);
- }
- NodeCacheListener listener = new NodeCacheListener()
- {
- @Override
- public void nodeChanged() throws Exception
- {
- if ( nodeCache.getCurrentData() != null )
+ CuratorCacheBridge cache = CuratorCache.bridgeBuilder(client,
pathForInstance(instance.getName(), instance.getId()))
+ .withOptions(CuratorCache.Options.SINGLE_NODE_CACHE)
+ .withDataNotCached()
+ .build();
+ CuratorCacheListener listener = CuratorCacheListener.builder()
+ .afterInitialized()
+ .forAll((__, ___, data) -> {
+ if ( data != null )
{
- ServiceInstance<T> newInstance =
serializer.deserialize(nodeCache.getCurrentData().getData());
- Entry<T> entry = services.get(newInstance.getId());
- if ( entry != null )
+ try
{
- synchronized(entry)
+ ServiceInstance<T> newInstance =
serializer.deserialize(data.getData());
+ Entry<T> entry = services.get(newInstance.getId());
+ if ( entry != null )
{
- entry.service = newInstance;
+ synchronized(entry)
+ {
+ entry.service = newInstance;
+ }
}
}
+ catch ( Exception e )
+ {
+ log.debug("Could not deserialize: " + data.getPath());
+ }
}
else
{
log.warn("Instance data has been deleted for: " +
instance);
}
- }
- };
- nodeCache.getListenable().addListener(listener);
- return nodeCache;
+ })
+ .build();
+ cache.listenable().addListener(listener);
+ cache.start();
+ return cache;
}
private void internalUnregisterService(final Entry<T> entry) throws
Exception
diff --git
a/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceProviderBuilderImpl.java
b/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceProviderBuilderImpl.java
index e36700b..2f0bbb2 100644
---
a/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceProviderBuilderImpl.java
+++
b/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceProviderBuilderImpl.java
@@ -19,7 +19,6 @@
package org.apache.curator.x.discovery.details;
import com.google.common.collect.Lists;
-import org.apache.curator.utils.CloseableExecutorService;
import org.apache.curator.x.discovery.DownInstancePolicy;
import org.apache.curator.x.discovery.InstanceFilter;
import org.apache.curator.x.discovery.ProviderStrategy;
@@ -39,7 +38,7 @@ class ServiceProviderBuilderImpl<T> implements
ServiceProviderBuilder<T>
private String serviceName;
private ProviderStrategy<T> providerStrategy;
private ThreadFactory threadFactory;
- private CloseableExecutorService executorService;
+ private ExecutorService executorService;
private List<InstanceFilter<T>> filters = Lists.newArrayList();
private DownInstancePolicy downInstancePolicy = new DownInstancePolicy();
@@ -111,12 +110,6 @@ class ServiceProviderBuilderImpl<T> implements
ServiceProviderBuilder<T>
@Override
public ServiceProviderBuilder<T> executorService(ExecutorService
executorService)
{
- return executorService(new CloseableExecutorService(executorService));
- }
-
- @Override
- public ServiceProviderBuilder<T> executorService(CloseableExecutorService
executorService)
- {
this.executorService = executorService;
this.threadFactory = null;
return this;
diff --git
a/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceProviderImpl.java
b/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceProviderImpl.java
index d9787e4..45b4b9f 100644
---
a/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceProviderImpl.java
+++
b/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceProviderImpl.java
@@ -19,7 +19,6 @@
package org.apache.curator.x.discovery.details;
import com.google.common.collect.Lists;
-import org.apache.curator.utils.CloseableExecutorService;
import org.apache.curator.x.discovery.DownInstancePolicy;
import org.apache.curator.x.discovery.InstanceFilter;
import org.apache.curator.x.discovery.ProviderStrategy;
@@ -51,38 +50,27 @@ public class ServiceProviderImpl<T> implements
ServiceProvider<T>
this(discovery, serviceName, providerStrategy, threadFactory, null,
filters, downInstancePolicy);
}
- public ServiceProviderImpl(ServiceDiscoveryImpl<T> discovery, String
serviceName, ProviderStrategy<T> providerStrategy, CloseableExecutorService
executorService, List<InstanceFilter<T>> filters, DownInstancePolicy
downInstancePolicy)
- {
- this(discovery, serviceName, providerStrategy, null, executorService,
filters, downInstancePolicy);
- }
-
- protected ServiceProviderImpl(ServiceDiscoveryImpl<T> discovery, String
serviceName, ProviderStrategy<T> providerStrategy, ThreadFactory threadFactory,
CloseableExecutorService executorService, List<InstanceFilter<T>> filters,
DownInstancePolicy downInstancePolicy)
+ protected ServiceProviderImpl(ServiceDiscoveryImpl<T> discovery, String
serviceName, ProviderStrategy<T> providerStrategy, ThreadFactory threadFactory,
ExecutorService executorService, List<InstanceFilter<T>> filters,
DownInstancePolicy downInstancePolicy)
{
this.discovery = discovery;
this.providerStrategy = providerStrategy;
- downInstanceManager = new DownInstanceManager<T>(downInstancePolicy);
- final ServiceCacheBuilder builder =
discovery.serviceCacheBuilder().name(serviceName);
+ downInstanceManager = new DownInstanceManager<>(downInstancePolicy);
+ final ServiceCacheBuilder<T> builder =
discovery.serviceCacheBuilder().name(serviceName);
if (executorService != null)
{
builder.executorService(executorService);
} else
{
+ //noinspection deprecation
builder.threadFactory(threadFactory);
}
cache = builder.build();
ArrayList<InstanceFilter<T>> localFilters =
Lists.newArrayList(filters);
localFilters.add(downInstanceManager);
- localFilters.add(new InstanceFilter<T>()
- {
- @Override
- public boolean apply(ServiceInstance<T> instance)
- {
- return instance.isEnabled();
- }
- });
- instanceProvider = new FilteredInstanceProvider<T>(cache,
localFilters);
+ localFilters.add(ServiceInstance::isEnabled);
+ instanceProvider = new FilteredInstanceProvider<>(cache, localFilters);
}
/**
diff --git
a/curator-x-discovery/src/test/java/org/apache/curator/x/discovery/ServiceCacheLeakTester.java
b/curator-x-discovery/src/test/java/org/apache/curator/x/discovery/ServiceCacheLeakTester.java
index 590c55c..3d357f6 100644
---
a/curator-x-discovery/src/test/java/org/apache/curator/x/discovery/ServiceCacheLeakTester.java
+++
b/curator-x-discovery/src/test/java/org/apache/curator/x/discovery/ServiceCacheLeakTester.java
@@ -22,9 +22,12 @@ import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.RetryOneTime;
import org.apache.curator.test.TestingServer;
+import org.apache.curator.test.compatibility.CuratorTestBase;
import org.apache.curator.utils.CloseableUtils;
import org.apache.curator.x.discovery.strategies.RandomStrategy;
+import org.testng.annotations.Test;
+@Test(groups = CuratorTestBase.zk35TestCompatibilityGroup)
public class ServiceCacheLeakTester
{
public static void main(String[] args) throws Exception
diff --git
a/curator-x-discovery/src/test/java/org/apache/curator/x/discovery/TestServiceCache.java
b/curator-x-discovery/src/test/java/org/apache/curator/x/discovery/TestServiceCache.java
index fda5c26..6d24586 100644
---
a/curator-x-discovery/src/test/java/org/apache/curator/x/discovery/TestServiceCache.java
+++
b/curator-x-discovery/src/test/java/org/apache/curator/x/discovery/TestServiceCache.java
@@ -27,7 +27,9 @@ import org.apache.curator.retry.RetryOneTime;
import org.apache.curator.test.BaseClassForTests;
import org.apache.curator.test.ExecuteCalledWatchingExecutorService;
import org.apache.curator.test.Timing;
+import org.apache.curator.test.compatibility.CuratorTestBase;
import org.apache.curator.utils.CloseableUtils;
+import org.apache.curator.utils.Compatibility;
import org.apache.curator.x.discovery.details.ServiceCacheListener;
import org.testng.Assert;
import org.testng.annotations.Test;
@@ -40,6 +42,7 @@ import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
+@Test(groups = CuratorTestBase.zk35TestCompatibilityGroup)
public class TestServiceCache extends BaseClassForTests
{
@Test
@@ -261,6 +264,11 @@ public class TestServiceCache extends BaseClassForTests
@Test
public void testExecutorServiceIsInvoked() throws Exception
{
+ if ( Compatibility.hasPersistentWatchers() )
+ {
+ return; // for ZK 3.6 the underlying cache ignores the executor
+ }
+
List<Closeable> closeables = Lists.newArrayList();
try
{
diff --git
a/curator-x-discovery/src/test/java/org/apache/curator/x/discovery/details/TestServiceCacheRace.java
b/curator-x-discovery/src/test/java/org/apache/curator/x/discovery/details/TestServiceCacheRace.java
index 06d63b9..9604c19 100644
---
a/curator-x-discovery/src/test/java/org/apache/curator/x/discovery/details/TestServiceCacheRace.java
+++
b/curator-x-discovery/src/test/java/org/apache/curator/x/discovery/details/TestServiceCacheRace.java
@@ -25,6 +25,7 @@ import org.apache.curator.framework.state.ConnectionState;
import org.apache.curator.retry.RetryOneTime;
import org.apache.curator.test.BaseClassForTests;
import org.apache.curator.test.Timing;
+import org.apache.curator.test.compatibility.CuratorTestBase;
import org.apache.curator.utils.CloseableExecutorService;
import org.apache.curator.utils.CloseableUtils;
import org.apache.curator.x.discovery.ServiceCache;
@@ -40,6 +41,7 @@ import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executors;
+@Test(groups = CuratorTestBase.zk35TestCompatibilityGroup)
public class TestServiceCacheRace extends BaseClassForTests
{
private final Timing timing = new Timing();
diff --git
a/curator-x-discovery/src/test/java/org/apache/curator/x/discovery/details/TestServiceDiscovery.java
b/curator-x-discovery/src/test/java/org/apache/curator/x/discovery/details/TestServiceDiscovery.java
index a1c8cfe..7a23e16 100644
---
a/curator-x-discovery/src/test/java/org/apache/curator/x/discovery/details/TestServiceDiscovery.java
+++
b/curator-x-discovery/src/test/java/org/apache/curator/x/discovery/details/TestServiceDiscovery.java
@@ -26,6 +26,7 @@ import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.RetryOneTime;
import org.apache.curator.test.BaseClassForTests;
import org.apache.curator.test.Timing;
+import org.apache.curator.test.compatibility.CuratorTestBase;
import org.apache.curator.utils.CloseableUtils;
import org.apache.curator.x.discovery.ServiceDiscovery;
import org.apache.curator.x.discovery.ServiceDiscoveryBuilder;
@@ -38,6 +39,7 @@ import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Semaphore;
+@Test(groups = CuratorTestBase.zk35TestCompatibilityGroup)
public class TestServiceDiscovery extends BaseClassForTests
{
private static final Comparator<ServiceInstance<Void>> comparator = new
Comparator<ServiceInstance<Void>>()
diff --git
a/curator-x-discovery/src/test/java/org/apache/curator/x/discovery/details/TestServiceDiscoveryBuilder.java
b/curator-x-discovery/src/test/java/org/apache/curator/x/discovery/details/TestServiceDiscoveryBuilder.java
index 312c884..fd0c438 100644
---
a/curator-x-discovery/src/test/java/org/apache/curator/x/discovery/details/TestServiceDiscoveryBuilder.java
+++
b/curator-x-discovery/src/test/java/org/apache/curator/x/discovery/details/TestServiceDiscoveryBuilder.java
@@ -22,15 +22,17 @@ import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.RetryOneTime;
import org.apache.curator.test.BaseClassForTests;
+import org.apache.curator.test.compatibility.CuratorTestBase;
import org.apache.curator.x.discovery.ServiceDiscoveryBuilder;
import org.apache.curator.x.discovery.ServiceInstance;
import org.testng.Assert;
import org.testng.annotations.Test;
+@Test(groups = CuratorTestBase.zk35TestCompatibilityGroup)
public class TestServiceDiscoveryBuilder extends BaseClassForTests
{
@Test
- public void testDefaultSerializer() throws Exception
+ public void testDefaultSerializer()
{
CuratorFramework client =
CuratorFrameworkFactory.newClient(server.getConnectString(), new
RetryOneTime(1));
ServiceDiscoveryBuilder<Object> builder =
ServiceDiscoveryBuilder.builder(Object.class).client(client);
@@ -41,7 +43,7 @@ public class TestServiceDiscoveryBuilder extends
BaseClassForTests
}
@Test
- public void testSetSerializer() throws Exception
+ public void testSetSerializer()
{
CuratorFramework client =
CuratorFrameworkFactory.newClient(server.getConnectString(), new
RetryOneTime(1));
ServiceDiscoveryBuilder<Object> builder =
ServiceDiscoveryBuilder.builder(Object.class).client(client);
diff --git
a/curator-x-discovery/src/test/java/org/apache/curator/x/discovery/details/TestServiceProvider.java
b/curator-x-discovery/src/test/java/org/apache/curator/x/discovery/details/TestServiceProvider.java
index d7358fe..b743e07 100644
---
a/curator-x-discovery/src/test/java/org/apache/curator/x/discovery/details/TestServiceProvider.java
+++
b/curator-x-discovery/src/test/java/org/apache/curator/x/discovery/details/TestServiceProvider.java
@@ -26,6 +26,7 @@ import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.RetryOneTime;
import org.apache.curator.test.BaseClassForTests;
+import org.apache.curator.test.compatibility.CuratorTestBase;
import org.apache.curator.utils.CloseableUtils;
import org.apache.curator.x.discovery.ServiceDiscovery;
import org.apache.curator.x.discovery.ServiceDiscoveryBuilder;
@@ -36,6 +37,7 @@ import org.testng.annotations.Test;
import com.google.common.collect.Lists;
+@Test(groups = CuratorTestBase.zk35TestCompatibilityGroup)
public class TestServiceProvider extends BaseClassForTests
{
diff --git a/src/site/confluence/breaking-changes.confluence
b/src/site/confluence/breaking-changes.confluence
index 3170a16..af0dc04 100644
--- a/src/site/confluence/breaking-changes.confluence
+++ b/src/site/confluence/breaking-changes.confluence
@@ -8,3 +8,6 @@ need to use Curator with ZooKeeper 3.4.x you will need to use a
previous version
* Exhibitor support has been removed.
* {{ConnectionHandlingPolicy}} and related classes have been removed.
* The {{Reaper}} and {{ChildReaper}} classes/recipes have been removed. You
should use ZooKeeper container nodes instead.
+* {{newPersistentEphemeralNode(}} and {{newPathChildrenCache}} were removed
from {{GroupMember}}
+* {{ServiceCacheBuilder<T> executorService(CloseableExecutorService
executorService)} was removed from {{ServiceCacheBuilder}}
+* {{ServiceProviderBuilder<T> executorService(CloseableExecutorService
executorService);)} was removed from {{ServiceProviderBuilder}}