This is an automated email from the ASF dual-hosted git repository. randgalt pushed a commit to branch CURATOR-549-zk36-persistent-watcher-bridge in repository https://gitbox.apache.org/repos/asf/curator.git
commit e388abd6c86a13f1debed01b7bf3934dd3ecd3e9 Author: randgalt <randg...@apache.org> AuthorDate: Fri Mar 20 14:48:18 2020 -0500 CURATOR-549 Creates a simple bridge that, when using ZK 3.6.0 creates a CuratorCache, and for earlier versions creates a TreeCache. The curator-test-zk35 module ensures that both code paths are tested. --- .../cache/CompatibleCuratorCacheBridge.java | 131 +++++++++++++++++++++ .../framework/recipes/cache/CuratorCache.java | 15 +++ .../recipes/cache/CuratorCacheBridge.java | 58 +++++++++ .../recipes/cache/CuratorCacheBridgeBuilder.java | 57 +++++++++ .../cache/CuratorCacheBridgeBuilderImpl.java | 76 ++++++++++++ .../framework/recipes/cache/CuratorCacheImpl.java | 8 +- .../framework/recipes/nodes/GroupMember.java | 28 ++--- .../framework/recipes/nodes/TestGroupMember.java | 2 + .../x/async/modeled/details/ModeledCacheImpl.java | 65 +++++----- .../async/modeled/TestCachedModeledFramework.java | 2 + .../apache/curator/x/discovery/ServiceCache.java | 3 + .../curator/x/discovery/ServiceCacheBuilder.java | 10 -- .../x/discovery/ServiceProviderBuilder.java | 24 ++-- .../discovery/details/ServiceCacheBuilderImpl.java | 24 +--- .../x/discovery/details/ServiceCacheImpl.java | 83 +++++++------ .../x/discovery/details/ServiceDiscoveryImpl.java | 66 +++++------ .../details/ServiceProviderBuilderImpl.java | 9 +- .../x/discovery/details/ServiceProviderImpl.java | 24 +--- .../x/discovery/ServiceCacheLeakTester.java | 3 + .../curator/x/discovery/TestServiceCache.java | 8 ++ .../x/discovery/details/TestServiceCacheRace.java | 2 + .../x/discovery/details/TestServiceDiscovery.java | 2 + .../details/TestServiceDiscoveryBuilder.java | 6 +- .../x/discovery/details/TestServiceProvider.java | 2 + src/site/confluence/breaking-changes.confluence | 3 + 25 files changed, 526 insertions(+), 185 deletions(-) diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/CompatibleCuratorCacheBridge.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/CompatibleCuratorCacheBridge.java new file mode 100644 index 0000000..5dd71fd --- /dev/null +++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/CompatibleCuratorCacheBridge.java @@ -0,0 +1,131 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.curator.framework.recipes.cache; + +import com.google.common.collect.Sets; +import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.framework.listen.Listenable; +import org.apache.curator.framework.listen.StandardListenerManager; +import java.util.Collections; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ExecutorService; +import java.util.stream.Stream; + +import static org.apache.curator.framework.recipes.cache.CuratorCacheListener.Type.*; + +/** + * Version of CuratorCacheBridge for pre-ZK 3.6 - uses TreeCache instead of CuratorCache + */ +@SuppressWarnings("deprecation") +class CompatibleCuratorCacheBridge implements CuratorCacheBridge, TreeCacheListener +{ + private final TreeCache cache; + private final StandardListenerManager<CuratorCacheListener> listenerManager = StandardListenerManager.standard(); + private final String path; + + CompatibleCuratorCacheBridge(CuratorFramework client, String path, CuratorCache.Options[] optionsArg, ExecutorService executorService, boolean cacheData) + { + this.path = path; + Set<CuratorCache.Options> options = (optionsArg != null) ? Sets.newHashSet(optionsArg) : Collections.emptySet(); + TreeCache.Builder builder = TreeCache.newBuilder(client, path).setCacheData(cacheData); + if ( options.contains(CuratorCache.Options.SINGLE_NODE_CACHE) ) + { + builder.setMaxDepth(0); + } + if ( options.contains(CuratorCache.Options.COMPRESSED_DATA) ) + { + builder.setDataIsCompressed(true); + } + if ( executorService != null ) + { + builder.setExecutor(executorService); + } + cache = builder.build(); + } + + @Override + public void start() + { + try + { + cache.getListenable().addListener(this); + + cache.start(); + } + catch ( Exception e ) + { + throw new RuntimeException(e); + } + } + + @Override + public void close() + { + cache.close(); + } + + @Override + public Listenable<CuratorCacheListener> listenable() + { + return listenerManager; + } + + @Override + public Stream<ChildData> streamRootChildren() + { + Map<String, ChildData> currentChildren = cache.getCurrentChildren(path); + if ( currentChildren == null ) + { + return Stream.empty(); + } + return currentChildren.values().stream(); + } + + @Override + public void childEvent(CuratorFramework client, TreeCacheEvent event) throws Exception + { + switch ( event.getType() ) + { + case NODE_ADDED: + { + listenerManager.forEach(listener -> listener.event(NODE_CREATED, null, event.getData())); + break; + } + + case NODE_REMOVED: + { + listenerManager.forEach(listener -> listener.event(NODE_DELETED, event.getData(), null)); + break; + } + + case NODE_UPDATED: + { + listenerManager.forEach(listener -> listener.event(NODE_CHANGED, null, event.getData())); + break; + } + + case INITIALIZED: + { + listenerManager.forEach(CuratorCacheListener::initialized); + break; + } + } + } +} diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/CuratorCache.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/CuratorCache.java index 7ebf56d..e1fcca9 100644 --- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/CuratorCache.java +++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/CuratorCache.java @@ -89,6 +89,21 @@ public interface CuratorCache extends Closeable, CuratorCacheAccessor } /** + * Start a Curator Cache Bridge builder. A Curator Cache Bridge is + * a facade that uses {@link org.apache.curator.framework.recipes.cache.CuratorCache} if + * persistent watches are available or {@link org.apache.curator.framework.recipes.cache.TreeCache} + * otherwise (i.e. if you are using ZooKeeper 3.5.x). + * + * @param client Curator client + * @param path path to cache + * @return bridge builder + */ + static CuratorCacheBridgeBuilder bridgeBuilder(CuratorFramework client, String path) + { + return new CuratorCacheBridgeBuilderImpl(client, path); + } + + /** * Start the cache. This will cause a complete refresh from the cache's root node and generate * events for all nodes found, etc. */ diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/CuratorCacheBridge.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/CuratorCacheBridge.java new file mode 100644 index 0000000..e7d8868 --- /dev/null +++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/CuratorCacheBridge.java @@ -0,0 +1,58 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.curator.framework.recipes.cache; + +import org.apache.curator.framework.listen.Listenable; +import java.io.Closeable; +import java.util.stream.Stream; + +/** + * A facade that uses {@link org.apache.curator.framework.recipes.cache.CuratorCache} if + * persistent watches are available or a {@link org.apache.curator.framework.recipes.cache.TreeCache} + * otherwise + */ +@SuppressWarnings("deprecation") +public interface CuratorCacheBridge extends Closeable +{ + /** + * Start the cache. This will cause a complete refresh from the cache's root node and generate + * events for all nodes found, etc. + */ + void start(); + + /** + * Close the cache, stop responding to events, etc. + */ + @Override + void close(); + + /** + * Return the listener container so that listeners can be registered to be notified of changes to the cache + * + * @return listener container + */ + Listenable<CuratorCacheListener> listenable(); + + /** + * Return a stream over the storage entries that are the immediate children of the root node. + * + * @return stream over root entries + */ + Stream<ChildData> streamRootChildren(); +} diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/CuratorCacheBridgeBuilder.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/CuratorCacheBridgeBuilder.java new file mode 100644 index 0000000..4a1920f --- /dev/null +++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/CuratorCacheBridgeBuilder.java @@ -0,0 +1,57 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.curator.framework.recipes.cache; + +import java.util.concurrent.ExecutorService; + +public interface CuratorCacheBridgeBuilder +{ + /** + * @param options any options + * @return this + */ + CuratorCacheBridgeBuilder withOptions(CuratorCache.Options... options); + + /** + * The bridge cache will not retain the data bytes. i.e. ChildData objects + * returned by the cache will always return {@code null} for {@link ChildData#getData()} + * + * @return this + */ + CuratorCacheBridgeBuilder withBytesNotCached(); + + /** + * If the old {@link org.apache.curator.framework.recipes.cache.TreeCache} is used by the bridge + * (i.e. you are using ZooKeeper 3.5.x) then this executor service is passed to {@link org.apache.curator.framework.recipes.cache.TreeCache.Builder#setExecutor(java.util.concurrent.ExecutorService)}. + * For {@link org.apache.curator.framework.recipes.cache.CuratorCache} this is not used and will be ignored (a warning will be logged). + * + * @param executorService executor to use for ZooKeeper 3.5.x + * @return this + */ + @SuppressWarnings("deprecation") + CuratorCacheBridgeBuilder withExecutorService(ExecutorService executorService); + + /** + * Return a new Curator Cache Bridge based on the builder methods that have been called + * + * @return new Curator Cache Bridge + */ + CuratorCacheBridge build(); +} \ No newline at end of file diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/CuratorCacheBridgeBuilderImpl.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/CuratorCacheBridgeBuilderImpl.java new file mode 100644 index 0000000..2076992 --- /dev/null +++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/CuratorCacheBridgeBuilderImpl.java @@ -0,0 +1,76 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.curator.framework.recipes.cache; + +import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.utils.Compatibility; +import org.slf4j.LoggerFactory; +import java.util.concurrent.ExecutorService; + +class CuratorCacheBridgeBuilderImpl implements CuratorCacheBridgeBuilder +{ + private final CuratorFramework client; + private final String path; + private CuratorCache.Options[] options; + private boolean cacheData = true; + private ExecutorService executorService = null; + + CuratorCacheBridgeBuilderImpl(CuratorFramework client, String path) + { + this.client = client; + this.path = path; + } + + @Override + public CuratorCacheBridgeBuilder withOptions(CuratorCache.Options... options) + { + this.options = options; + return this; + } + + @Override + public CuratorCacheBridgeBuilder withBytesNotCached() + { + cacheData = false; + return this; + } + + @Override + public CuratorCacheBridgeBuilder withExecutorService(ExecutorService executorService) + { + this.executorService = executorService; + return this; + } + + @Override + public CuratorCacheBridge build() + { + if ( Compatibility.hasPersistentWatchers() ) + { + if ( executorService != null ) + { + LoggerFactory.getLogger(getClass()).warn("CuratorCache does not support custom ExecutorService"); + } + CuratorCacheStorage storage = cacheData ? CuratorCacheStorage.standard() : CuratorCacheStorage.bytesNotCached(); + return new CuratorCacheImpl(client, storage, path, options, null, null); + } + return new CompatibleCuratorCacheBridge(client, path, options, executorService, cacheData); + } +} \ No newline at end of file diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/CuratorCacheImpl.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/CuratorCacheImpl.java index c207b26..a5cc57c 100644 --- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/CuratorCacheImpl.java +++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/CuratorCacheImpl.java @@ -47,7 +47,7 @@ import static org.apache.curator.framework.recipes.cache.CuratorCacheListener.Ty import static org.apache.zookeeper.KeeperException.Code.NONODE; import static org.apache.zookeeper.KeeperException.Code.OK; -class CuratorCacheImpl implements CuratorCache +class CuratorCacheImpl implements CuratorCache, CuratorCacheBridge { private final Logger log = LoggerFactory.getLogger(getClass()); private final AtomicReference<State> state = new AtomicReference<>(State.LATENT); @@ -136,6 +136,12 @@ class CuratorCacheImpl implements CuratorCache return storage.streamImmediateChildren(fromParent); } + @Override + public Stream<ChildData> streamRootChildren() + { + return storage.streamImmediateChildren(path); + } + @VisibleForTesting CuratorCacheStorage storage() { diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/nodes/GroupMember.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/nodes/GroupMember.java index 8cd1f65..954b78d 100644 --- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/nodes/GroupMember.java +++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/nodes/GroupMember.java @@ -24,11 +24,14 @@ import com.google.common.collect.ImmutableMap; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; import org.apache.curator.framework.recipes.cache.ChildData; -import org.apache.curator.framework.recipes.cache.PathChildrenCache; +import org.apache.curator.framework.recipes.cache.CuratorCache; +import org.apache.curator.framework.recipes.cache.CuratorCacheBridge; import org.apache.curator.utils.CloseableUtils; import org.apache.curator.utils.ThreadUtils; import org.apache.curator.utils.ZKPaths; +import org.apache.zookeeper.CreateMode; import java.io.Closeable; +import java.util.Iterator; import java.util.Map; /** @@ -37,8 +40,8 @@ import java.util.Map; */ public class GroupMember implements Closeable { - private final PersistentEphemeralNode pen; - private final PathChildrenCache cache; + private final PersistentNode pen; + private final CuratorCacheBridge cache; private final String thisId; /** @@ -61,8 +64,8 @@ public class GroupMember implements Closeable { this.thisId = Preconditions.checkNotNull(thisId, "thisId cannot be null"); - cache = newPathChildrenCache(client, membershipPath); - pen = newPersistentEphemeralNode(client, membershipPath, thisId, payload); + cache = CuratorCache.bridgeBuilder(client, membershipPath).build(); + pen = new PersistentNode(client, CreateMode.EPHEMERAL, false, ZKPaths.makePath(membershipPath, thisId), payload); } /** @@ -121,8 +124,11 @@ public class GroupMember implements Closeable { ImmutableMap.Builder<String, byte[]> builder = ImmutableMap.builder(); boolean thisIdAdded = false; - for ( ChildData data : cache.getCurrentData() ) + + Iterator<ChildData> iterator = cache.streamRootChildren().iterator(); + while ( iterator.hasNext() ) { + ChildData data = iterator.next(); String id = idFromPath(data.getPath()); thisIdAdded = thisIdAdded || id.equals(thisId); builder.put(id, data.getData()); @@ -144,14 +150,4 @@ public class GroupMember implements Closeable { return ZKPaths.getNodeFromPath(path); } - - protected PersistentEphemeralNode newPersistentEphemeralNode(CuratorFramework client, String membershipPath, String thisId, byte[] payload) - { - return new PersistentEphemeralNode(client, PersistentEphemeralNode.Mode.EPHEMERAL, ZKPaths.makePath(membershipPath, thisId), payload); - } - - protected PathChildrenCache newPathChildrenCache(CuratorFramework client, String membershipPath) - { - return new PathChildrenCache(client, membershipPath, true); - } } diff --git a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/nodes/TestGroupMember.java b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/nodes/TestGroupMember.java index 2da051f..4395b36 100644 --- a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/nodes/TestGroupMember.java +++ b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/nodes/TestGroupMember.java @@ -25,11 +25,13 @@ import org.apache.curator.framework.CuratorFrameworkFactory; import org.apache.curator.retry.RetryOneTime; import org.apache.curator.test.BaseClassForTests; import org.apache.curator.test.Timing; +import org.apache.curator.test.compatibility.CuratorTestBase; import org.apache.curator.utils.CloseableUtils; import org.testng.Assert; import org.testng.annotations.Test; import java.util.Map; +@Test(groups = CuratorTestBase.zk35TestCompatibilityGroup) public class TestGroupMember extends BaseClassForTests { // NOTE - don't need many tests as this class is just a wrapper around two existing recipes diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/ModeledCacheImpl.java b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/ModeledCacheImpl.java index 5c015f4..e324ac4 100644 --- a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/ModeledCacheImpl.java +++ b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/ModeledCacheImpl.java @@ -19,9 +19,13 @@ package org.apache.curator.x.async.modeled.details; import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.framework.EnsureContainers; import org.apache.curator.framework.listen.Listenable; import org.apache.curator.framework.listen.StandardListenerManager; -import org.apache.curator.framework.recipes.cache.TreeCache; +import org.apache.curator.framework.recipes.cache.CuratorCache; +import org.apache.curator.framework.recipes.cache.CuratorCacheBridge; +import org.apache.curator.framework.recipes.cache.CuratorCacheBridgeBuilder; +import org.apache.curator.framework.recipes.cache.CuratorCacheListener; import org.apache.curator.framework.recipes.cache.TreeCacheEvent; import org.apache.curator.framework.recipes.cache.TreeCacheListener; import org.apache.curator.utils.ThreadUtils; @@ -42,11 +46,12 @@ import java.util.stream.Collectors; class ModeledCacheImpl<T> implements TreeCacheListener, ModeledCache<T> { - private final TreeCache cache; + private final CuratorCacheBridge cache; private final Map<ZPath, Entry<T>> entries = new ConcurrentHashMap<>(); private final ModelSerializer<T> serializer; private final StandardListenerManager<ModeledCacheListener<T>> listenerContainer = StandardListenerManager.standard(); private final ZPath basePath; + private final EnsureContainers ensureContainers; private static final class Entry<T> { @@ -69,19 +74,32 @@ class ModeledCacheImpl<T> implements TreeCacheListener, ModeledCache<T> basePath = modelSpec.path(); this.serializer = modelSpec.serializer(); - cache = TreeCache.newBuilder(client, basePath.fullPath()) - .setCacheData(false) - .setDataIsCompressed(modelSpec.createOptions().contains(CreateOption.compress)) - .setExecutor(executor) - .setCreateParentNodes(modelSpec.createOptions().contains(CreateOption.createParentsIfNeeded) || modelSpec.createOptions().contains(CreateOption.createParentsAsContainers)) - .build(); + CuratorCacheBridgeBuilder bridgeBuilder = CuratorCache.bridgeBuilder(client, basePath.fullPath()).withBytesNotCached().withExecutorService(executor); + if ( modelSpec.createOptions().contains(CreateOption.compress) ) + { + bridgeBuilder = bridgeBuilder.withOptions(CuratorCache.Options.COMPRESSED_DATA); + } + cache = bridgeBuilder.build(); + cache.listenable().addListener(CuratorCacheListener.builder().forTreeCache(client, this).build()); + + if ( modelSpec.createOptions().contains(CreateOption.createParentsIfNeeded) || modelSpec.createOptions().contains(CreateOption.createParentsAsContainers) ) + { + ensureContainers = new EnsureContainers(client, basePath.fullPath()); + } + else + { + ensureContainers = null; + } } public void start() { try { - cache.getListenable().addListener(this); + if ( ensureContainers != null ) + { + ensureContainers.ensure(); + } cache.start(); } catch ( Exception e ) @@ -92,7 +110,6 @@ class ModeledCacheImpl<T> implements TreeCacheListener, ModeledCache<T> public void close() { - cache.getListenable().removeListener(this); cache.close(); entries.clear(); } @@ -148,7 +165,7 @@ class ModeledCacheImpl<T> implements TreeCacheListener, ModeledCache<T> } } - private void internalChildEvent(TreeCacheEvent event) throws Exception + private void internalChildEvent(TreeCacheEvent event) { switch ( event.getType() ) { @@ -156,16 +173,13 @@ class ModeledCacheImpl<T> implements TreeCacheListener, ModeledCache<T> case NODE_UPDATED: { ZPath path = ZPath.parse(event.getData().getPath()); - if ( !path.equals(basePath) ) + byte[] bytes = event.getData().getData(); + if ( (bytes != null) && (bytes.length > 0) ) // otherwise it's probably just a parent node being created { - byte[] bytes = event.getData().getData(); - if ( (bytes != null) && (bytes.length > 0) ) // otherwise it's probably just a parent node being created - { - T model = serializer.deserialize(bytes); - entries.put(path, new Entry<>(event.getData().getStat(), model)); - ModeledCacheListener.Type type = (event.getType() == TreeCacheEvent.Type.NODE_ADDED) ? ModeledCacheListener.Type.NODE_ADDED : ModeledCacheListener.Type.NODE_UPDATED; - accept(type, path, event.getData().getStat(), model); - } + T model = serializer.deserialize(bytes); + entries.put(path, new Entry<>(event.getData().getStat(), model)); + ModeledCacheListener.Type type = (event.getType() == TreeCacheEvent.Type.NODE_ADDED) ? ModeledCacheListener.Type.NODE_ADDED : ModeledCacheListener.Type.NODE_UPDATED; + accept(type, path, event.getData().getStat(), model); } break; } @@ -173,13 +187,10 @@ class ModeledCacheImpl<T> implements TreeCacheListener, ModeledCache<T> case NODE_REMOVED: { ZPath path = ZPath.parse(event.getData().getPath()); - if ( !path.equals(basePath) ) - { - Entry<T> entry = entries.remove(path); - T model = (entry != null) ? entry.model : serializer.deserialize(event.getData().getData()); - Stat stat = (entry != null) ? entry.stat : event.getData().getStat(); - accept(ModeledCacheListener.Type.NODE_REMOVED, path, stat, model); - } + Entry<T> entry = entries.remove(path); + T model = (entry != null) ? entry.model : serializer.deserialize(event.getData().getData()); + Stat stat = (entry != null) ? entry.stat : event.getData().getStat(); + accept(ModeledCacheListener.Type.NODE_REMOVED, path, stat, model); break; } diff --git a/curator-x-async/src/test/java/org/apache/curator/x/async/modeled/TestCachedModeledFramework.java b/curator-x-async/src/test/java/org/apache/curator/x/async/modeled/TestCachedModeledFramework.java index 2d33c13..70f9c66 100644 --- a/curator-x-async/src/test/java/org/apache/curator/x/async/modeled/TestCachedModeledFramework.java +++ b/curator-x-async/src/test/java/org/apache/curator/x/async/modeled/TestCachedModeledFramework.java @@ -21,6 +21,7 @@ package org.apache.curator.x.async.modeled; import com.google.common.collect.Sets; import org.apache.curator.framework.state.ConnectionState; import org.apache.curator.test.Timing; +import org.apache.curator.test.compatibility.CuratorTestBase; import org.apache.curator.x.async.modeled.cached.CachedModeledFramework; import org.apache.curator.x.async.modeled.cached.ModeledCacheListener; import org.apache.curator.x.async.modeled.models.TestModel; @@ -38,6 +39,7 @@ import java.util.function.Function; import java.util.stream.Collectors; import java.util.stream.Stream; +@Test(groups = CuratorTestBase.zk35TestCompatibilityGroup) public class TestCachedModeledFramework extends TestModeledFrameworkBase { @Test diff --git a/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/ServiceCache.java b/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/ServiceCache.java index a122d69..d409f99 100644 --- a/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/ServiceCache.java +++ b/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/ServiceCache.java @@ -23,6 +23,7 @@ import org.apache.curator.x.discovery.details.InstanceProvider; import org.apache.curator.x.discovery.details.ServiceCacheListener; import java.io.Closeable; import java.util.List; +import java.util.concurrent.CountDownLatch; public interface ServiceCache<T> extends Closeable, Listenable<ServiceCacheListener>, InstanceProvider<T> { @@ -41,4 +42,6 @@ public interface ServiceCache<T> extends Closeable, Listenable<ServiceCacheListe * @throws Exception errors */ public void start() throws Exception; + + CountDownLatch startImmediate() throws Exception; } diff --git a/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/ServiceCacheBuilder.java b/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/ServiceCacheBuilder.java index 326a16c..cfec2c4 100644 --- a/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/ServiceCacheBuilder.java +++ b/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/ServiceCacheBuilder.java @@ -18,7 +18,6 @@ */ package org.apache.curator.x.discovery; -import org.apache.curator.utils.CloseableExecutorService; import java.util.concurrent.ExecutorService; import java.util.concurrent.ThreadFactory; @@ -59,13 +58,4 @@ public interface ServiceCacheBuilder<T> * @return this */ public ServiceCacheBuilder<T> executorService(ExecutorService executorService); - - /** - * Optional CloseableExecutorService to use for the cache's background thread. The specified ExecutorService - * overrides any prior ThreadFactory or ExecutorService set on the ServiceCacheBuilder. - * - * @param executorService an instance of CloseableExecutorService - * @return this - */ - public ServiceCacheBuilder<T> executorService(CloseableExecutorService executorService); } diff --git a/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/ServiceProviderBuilder.java b/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/ServiceProviderBuilder.java index 02948a3..4c0544d 100644 --- a/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/ServiceProviderBuilder.java +++ b/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/ServiceProviderBuilder.java @@ -18,7 +18,6 @@ */ package org.apache.curator.x.discovery; -import org.apache.curator.utils.CloseableExecutorService; import org.apache.curator.x.discovery.strategies.RoundRobinStrategy; import java.util.concurrent.ExecutorService; import java.util.concurrent.ThreadFactory; @@ -30,7 +29,7 @@ public interface ServiceProviderBuilder<T> * * @return provider */ - public ServiceProvider<T> build(); + ServiceProvider<T> build(); /** * required - set the name of the service to be provided @@ -38,7 +37,7 @@ public interface ServiceProviderBuilder<T> * @param serviceName the name of the service * @return this */ - public ServiceProviderBuilder<T> serviceName(String serviceName); + ServiceProviderBuilder<T> serviceName(String serviceName); /** * optional - set the provider strategy. The default is {@link RoundRobinStrategy} @@ -46,7 +45,7 @@ public interface ServiceProviderBuilder<T> * @param providerStrategy strategy to use * @return this */ - public ServiceProviderBuilder<T> providerStrategy(ProviderStrategy<T> providerStrategy); + ServiceProviderBuilder<T> providerStrategy(ProviderStrategy<T> providerStrategy); /** * optional - the thread factory to use for creating internal threads. The specified ThreadFactory overrides @@ -57,7 +56,7 @@ public interface ServiceProviderBuilder<T> * @deprecated use {@link #executorService(ExecutorService)} instead */ @Deprecated - public ServiceProviderBuilder<T> threadFactory(ThreadFactory threadFactory); + ServiceProviderBuilder<T> threadFactory(ThreadFactory threadFactory); /** * Set the down instance policy @@ -65,7 +64,7 @@ public interface ServiceProviderBuilder<T> * @param downInstancePolicy new policy * @return this */ - public ServiceProviderBuilder<T> downInstancePolicy(DownInstancePolicy downInstancePolicy); + ServiceProviderBuilder<T> downInstancePolicy(DownInstancePolicy downInstancePolicy); /** * Add an instance filter. NOTE: this does not remove previously added filters. i.e. @@ -75,7 +74,7 @@ public interface ServiceProviderBuilder<T> * @param filter filter to add * @return this */ - public ServiceProviderBuilder<T> additionalFilter(InstanceFilter<T> filter); + ServiceProviderBuilder<T> additionalFilter(InstanceFilter<T> filter); /** * Optional ExecutorService to use for the cache's background thread. The specified ExecutorService @@ -85,14 +84,5 @@ public interface ServiceProviderBuilder<T> * @param executorService executor service * @return this */ - public ServiceProviderBuilder<T> executorService(ExecutorService executorService); - - /** - * Optional CloseableExecutorService to use for the cache's background thread. The specified CloseableExecutorService - * overrides any prior ThreadFactory or CloseableExecutorService set on the ServiceProviderBuilder. - * - * @param executorService an instance of CloseableExecutorService - * @return this - */ - public ServiceProviderBuilder<T> executorService(CloseableExecutorService executorService); + ServiceProviderBuilder<T> executorService(ExecutorService executorService); } diff --git a/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceCacheBuilderImpl.java b/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceCacheBuilderImpl.java index 7647c0f..cf7f468 100644 --- a/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceCacheBuilderImpl.java +++ b/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceCacheBuilderImpl.java @@ -18,7 +18,6 @@ */ package org.apache.curator.x.discovery.details; -import org.apache.curator.utils.CloseableExecutorService; import org.apache.curator.x.discovery.ServiceCache; import org.apache.curator.x.discovery.ServiceCacheBuilder; import java.util.concurrent.ExecutorService; @@ -32,7 +31,7 @@ class ServiceCacheBuilderImpl<T> implements ServiceCacheBuilder<T> private ServiceDiscoveryImpl<T> discovery; private String name; private ThreadFactory threadFactory; - private CloseableExecutorService executorService; + private ExecutorService executorService; ServiceCacheBuilderImpl(ServiceDiscoveryImpl<T> discovery) { @@ -47,13 +46,13 @@ class ServiceCacheBuilderImpl<T> implements ServiceCacheBuilder<T> @Override public ServiceCache<T> build() { - if (executorService != null) + if (threadFactory != null) { - return new ServiceCacheImpl<T>(discovery, name, executorService); + return new ServiceCacheImpl<T>(discovery, name, threadFactory); } else { - return new ServiceCacheImpl<T>(discovery, name, threadFactory); + return new ServiceCacheImpl<T>(discovery, name, executorService); } } @@ -92,20 +91,9 @@ class ServiceCacheBuilderImpl<T> implements ServiceCacheBuilder<T> * @return this */ @Override - public ServiceCacheBuilder<T> executorService(ExecutorService executorService) { - return executorService(new CloseableExecutorService(executorService, false)); - } - - /** - * Optional CloseableExecutorService to use for the cache's background thread - * - * @param executorService an instance of CloseableExecutorService - * @return this - */ - @Override - public ServiceCacheBuilder<T> executorService(CloseableExecutorService executorService) { + public ServiceCacheBuilder<T> executorService(ExecutorService executorService) + { this.executorService = executorService; - this.threadFactory = null; return this; } } diff --git a/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceCacheImpl.java b/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceCacheImpl.java index c154836..f0316ac 100644 --- a/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceCacheImpl.java +++ b/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceCacheImpl.java @@ -20,26 +20,27 @@ package org.apache.curator.x.discovery.details; import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Function; import com.google.common.base.Preconditions; import com.google.common.collect.Lists; import com.google.common.collect.Maps; import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.framework.EnsureContainers; import org.apache.curator.framework.listen.StandardListenerManager; import org.apache.curator.framework.recipes.cache.ChildData; -import org.apache.curator.framework.recipes.cache.PathChildrenCache; +import org.apache.curator.framework.recipes.cache.CuratorCache; +import org.apache.curator.framework.recipes.cache.CuratorCacheBridge; +import org.apache.curator.framework.recipes.cache.CuratorCacheListener; import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent; import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener; -import org.apache.curator.utils.CloseableExecutorService; import org.apache.curator.utils.CloseableUtils; import org.apache.curator.utils.ZKPaths; import org.apache.curator.x.discovery.ServiceCache; import org.apache.curator.x.discovery.ServiceInstance; -import java.io.IOException; import java.util.List; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.CountDownLatch; import java.util.concurrent.Executor; +import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.ThreadFactory; import java.util.concurrent.atomic.AtomicReference; @@ -48,19 +49,21 @@ public class ServiceCacheImpl<T> implements ServiceCache<T>, PathChildrenCacheLi { private final StandardListenerManager<ServiceCacheListener> listenerContainer = StandardListenerManager.standard(); private final ServiceDiscoveryImpl<T> discovery; - private final AtomicReference<State> state = new AtomicReference<State>(State.LATENT); - private final PathChildrenCache cache; + private final AtomicReference<State> state = new AtomicReference<>(State.LATENT); + private final CuratorCacheBridge cache; private final ConcurrentMap<String, ServiceInstance<T>> instances = Maps.newConcurrentMap(); + private final EnsureContainers ensureContainers; + private final CountDownLatch initializedLatch = new CountDownLatch(1); private enum State { LATENT, STARTED, STOPPED } - private static CloseableExecutorService convertThreadFactory(ThreadFactory threadFactory) + private static ExecutorService convertThreadFactory(ThreadFactory threadFactory) { Preconditions.checkNotNull(threadFactory, "threadFactory cannot be null"); - return new CloseableExecutorService(Executors.newSingleThreadExecutor(threadFactory), true); + return Executors.newSingleThreadExecutor(threadFactory); } ServiceCacheImpl(ServiceDiscoveryImpl<T> discovery, String name, ThreadFactory threadFactory) @@ -68,16 +71,24 @@ public class ServiceCacheImpl<T> implements ServiceCache<T>, PathChildrenCacheLi this(discovery, name, convertThreadFactory(threadFactory)); } - ServiceCacheImpl(ServiceDiscoveryImpl<T> discovery, String name, CloseableExecutorService executorService) + ServiceCacheImpl(ServiceDiscoveryImpl<T> discovery, String name, ExecutorService executorService) { Preconditions.checkNotNull(discovery, "discovery cannot be null"); Preconditions.checkNotNull(name, "name cannot be null"); - Preconditions.checkNotNull(executorService, "executorService cannot be null"); this.discovery = discovery; - cache = new PathChildrenCache(discovery.getClient(), discovery.pathForName(name), true, false, executorService); - cache.getListenable().addListener(this); + String path = discovery.pathForName(name); + cache = CuratorCache.bridgeBuilder(discovery.getClient(), path) + .withExecutorService(executorService) + .withBytesNotCached() + .build(); + CuratorCacheListener listener = CuratorCacheListener.builder() + .forPathChildrenCache(discovery.getClient(), this) + .forInitialized(this::initialized) + .build(); + cache.listenable().addListener(listener); + ensureContainers = new EnsureContainers(discovery.getClient(), path); } @Override @@ -93,11 +104,19 @@ public class ServiceCacheImpl<T> implements ServiceCache<T>, PathChildrenCacheLi @Override public void start() throws Exception { + startImmediate().await(); + } + + @Override + public CountDownLatch startImmediate() throws Exception + { Preconditions.checkState(state.compareAndSet(State.LATENT, State.STARTED), "Cannot be started more than once"); - cache.start(true); + ensureContainers.ensure(); + cache.start(); if ( debugStartLatch != null ) { + initializedLatch.await(); debugStartLatch.countDown(); debugStartLatch = null; } @@ -107,18 +126,11 @@ public class ServiceCacheImpl<T> implements ServiceCache<T>, PathChildrenCacheLi debugStartWaitLatch = null; } - for ( ChildData childData : cache.getCurrentData() ) - { - if ( childData.getData() != null ) // else already processed by the cache listener - { - addInstance(childData, true); - } - } - discovery.cacheOpened(this); + return initializedLatch; } @Override - public void close() throws IOException + public void close() { Preconditions.checkState(state.compareAndSet(State.STARTED, State.STOPPED), "Already closed or has not been started"); @@ -152,7 +164,7 @@ public class ServiceCacheImpl<T> implements ServiceCache<T>, PathChildrenCacheLi } @Override - public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception + public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) { boolean notifyListeners = false; switch ( event.getType() ) @@ -160,7 +172,7 @@ public class ServiceCacheImpl<T> implements ServiceCache<T>, PathChildrenCacheLi case CHILD_ADDED: case CHILD_UPDATED: { - addInstance(event.getData(), false); + addInstance(event.getData()); notifyListeners = true; break; } @@ -173,7 +185,7 @@ public class ServiceCacheImpl<T> implements ServiceCache<T>, PathChildrenCacheLi } } - if ( notifyListeners ) + if ( notifyListeners && (initializedLatch.getCount() == 0) ) { listenerContainer.forEach(ServiceCacheListener::cacheChanged); } @@ -184,18 +196,23 @@ public class ServiceCacheImpl<T> implements ServiceCache<T>, PathChildrenCacheLi return ZKPaths.getNodeFromPath(childData.getPath()); } - private void addInstance(ChildData childData, boolean onlyIfAbsent) throws Exception + private void addInstance(ChildData childData) { - String instanceId = instanceIdFromData(childData); - ServiceInstance<T> serviceInstance = discovery.getSerializer().deserialize(childData.getData()); - if ( onlyIfAbsent ) + try { - instances.putIfAbsent(instanceId, serviceInstance); + String instanceId = instanceIdFromData(childData); + ServiceInstance<T> serviceInstance = discovery.getSerializer().deserialize(childData.getData()); + instances.put(instanceId, serviceInstance); } - else + catch ( Exception e ) { - instances.put(instanceId, serviceInstance); + throw new RuntimeException(e); } - cache.clearDataBytes(childData.getPath(), childData.getStat().getVersion()); + } + + private void initialized() + { + discovery.cacheOpened(this); + initializedLatch.countDown(); } } diff --git a/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceDiscoveryImpl.java b/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceDiscoveryImpl.java index 476705c..c072213 100644 --- a/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceDiscoveryImpl.java +++ b/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceDiscoveryImpl.java @@ -26,8 +26,9 @@ import com.google.common.collect.Lists; import com.google.common.collect.Maps; import com.google.common.collect.Sets; import org.apache.curator.framework.CuratorFramework; -import org.apache.curator.framework.recipes.cache.NodeCache; -import org.apache.curator.framework.recipes.cache.NodeCacheListener; +import org.apache.curator.framework.recipes.cache.CuratorCache; +import org.apache.curator.framework.recipes.cache.CuratorCacheBridge; +import org.apache.curator.framework.recipes.cache.CuratorCacheListener; import org.apache.curator.framework.state.ConnectionState; import org.apache.curator.framework.state.ConnectionStateListener; import org.apache.curator.utils.CloseableUtils; @@ -92,7 +93,7 @@ public class ServiceDiscoveryImpl<T> implements ServiceDiscovery<T> private static class Entry<T> { private volatile ServiceInstance<T> service; - private volatile NodeCache cache; + private volatile CuratorCacheBridge cache; private Entry(ServiceInstance<T> service) { @@ -277,8 +278,7 @@ public class ServiceDiscoveryImpl<T> implements ServiceDiscovery<T> @Override public ServiceCacheBuilder<T> serviceCacheBuilder() { - return new ServiceCacheBuilderImpl<T>(this) - .threadFactory(ThreadUtils.newThreadFactory("ServiceCache")); + return new ServiceCacheBuilderImpl<T>(this); } /** @@ -458,52 +458,48 @@ public class ServiceDiscoveryImpl<T> implements ServiceDiscovery<T> } } - private NodeCache makeNodeCache(final ServiceInstance<T> instance) + private CuratorCacheBridge makeNodeCache(final ServiceInstance<T> instance) { if ( !watchInstances ) { return null; } - final NodeCache nodeCache = new NodeCache(client, pathForInstance(instance.getName(), instance.getId())); - try - { - nodeCache.start(true); - } - catch ( InterruptedException e) - { - Thread.currentThread().interrupt(); - return null; - } - catch ( Exception e ) - { - log.error("Could not start node cache for: " + instance, e); - } - NodeCacheListener listener = new NodeCacheListener() - { - @Override - public void nodeChanged() throws Exception - { - if ( nodeCache.getCurrentData() != null ) + CuratorCacheBridge cache = CuratorCache.bridgeBuilder(client, pathForInstance(instance.getName(), instance.getId())) + .withOptions(CuratorCache.Options.SINGLE_NODE_CACHE) + .withBytesNotCached() + .build(); + CuratorCacheListener listener = CuratorCacheListener.builder() + .afterInitialized() + .forAll((__, ___, data) -> { + if ( data != null ) { - ServiceInstance<T> newInstance = serializer.deserialize(nodeCache.getCurrentData().getData()); - Entry<T> entry = services.get(newInstance.getId()); - if ( entry != null ) + try { - synchronized(entry) + ServiceInstance<T> newInstance = serializer.deserialize(data.getData()); + Entry<T> entry = services.get(newInstance.getId()); + if ( entry != null ) { - entry.service = newInstance; + synchronized(entry) + { + entry.service = newInstance; + } } } + catch ( Exception e ) + { + log.debug("Could not deserialize: " + data.getPath()); + } } else { log.warn("Instance data has been deleted for: " + instance); } - } - }; - nodeCache.getListenable().addListener(listener); - return nodeCache; + }) + .build(); + cache.listenable().addListener(listener); + cache.start(); + return cache; } private void internalUnregisterService(final Entry<T> entry) throws Exception diff --git a/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceProviderBuilderImpl.java b/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceProviderBuilderImpl.java index e36700b..2f0bbb2 100644 --- a/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceProviderBuilderImpl.java +++ b/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceProviderBuilderImpl.java @@ -19,7 +19,6 @@ package org.apache.curator.x.discovery.details; import com.google.common.collect.Lists; -import org.apache.curator.utils.CloseableExecutorService; import org.apache.curator.x.discovery.DownInstancePolicy; import org.apache.curator.x.discovery.InstanceFilter; import org.apache.curator.x.discovery.ProviderStrategy; @@ -39,7 +38,7 @@ class ServiceProviderBuilderImpl<T> implements ServiceProviderBuilder<T> private String serviceName; private ProviderStrategy<T> providerStrategy; private ThreadFactory threadFactory; - private CloseableExecutorService executorService; + private ExecutorService executorService; private List<InstanceFilter<T>> filters = Lists.newArrayList(); private DownInstancePolicy downInstancePolicy = new DownInstancePolicy(); @@ -111,12 +110,6 @@ class ServiceProviderBuilderImpl<T> implements ServiceProviderBuilder<T> @Override public ServiceProviderBuilder<T> executorService(ExecutorService executorService) { - return executorService(new CloseableExecutorService(executorService)); - } - - @Override - public ServiceProviderBuilder<T> executorService(CloseableExecutorService executorService) - { this.executorService = executorService; this.threadFactory = null; return this; diff --git a/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceProviderImpl.java b/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceProviderImpl.java index d9787e4..45b4b9f 100644 --- a/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceProviderImpl.java +++ b/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceProviderImpl.java @@ -19,7 +19,6 @@ package org.apache.curator.x.discovery.details; import com.google.common.collect.Lists; -import org.apache.curator.utils.CloseableExecutorService; import org.apache.curator.x.discovery.DownInstancePolicy; import org.apache.curator.x.discovery.InstanceFilter; import org.apache.curator.x.discovery.ProviderStrategy; @@ -51,38 +50,27 @@ public class ServiceProviderImpl<T> implements ServiceProvider<T> this(discovery, serviceName, providerStrategy, threadFactory, null, filters, downInstancePolicy); } - public ServiceProviderImpl(ServiceDiscoveryImpl<T> discovery, String serviceName, ProviderStrategy<T> providerStrategy, CloseableExecutorService executorService, List<InstanceFilter<T>> filters, DownInstancePolicy downInstancePolicy) - { - this(discovery, serviceName, providerStrategy, null, executorService, filters, downInstancePolicy); - } - - protected ServiceProviderImpl(ServiceDiscoveryImpl<T> discovery, String serviceName, ProviderStrategy<T> providerStrategy, ThreadFactory threadFactory, CloseableExecutorService executorService, List<InstanceFilter<T>> filters, DownInstancePolicy downInstancePolicy) + protected ServiceProviderImpl(ServiceDiscoveryImpl<T> discovery, String serviceName, ProviderStrategy<T> providerStrategy, ThreadFactory threadFactory, ExecutorService executorService, List<InstanceFilter<T>> filters, DownInstancePolicy downInstancePolicy) { this.discovery = discovery; this.providerStrategy = providerStrategy; - downInstanceManager = new DownInstanceManager<T>(downInstancePolicy); - final ServiceCacheBuilder builder = discovery.serviceCacheBuilder().name(serviceName); + downInstanceManager = new DownInstanceManager<>(downInstancePolicy); + final ServiceCacheBuilder<T> builder = discovery.serviceCacheBuilder().name(serviceName); if (executorService != null) { builder.executorService(executorService); } else { + //noinspection deprecation builder.threadFactory(threadFactory); } cache = builder.build(); ArrayList<InstanceFilter<T>> localFilters = Lists.newArrayList(filters); localFilters.add(downInstanceManager); - localFilters.add(new InstanceFilter<T>() - { - @Override - public boolean apply(ServiceInstance<T> instance) - { - return instance.isEnabled(); - } - }); - instanceProvider = new FilteredInstanceProvider<T>(cache, localFilters); + localFilters.add(ServiceInstance::isEnabled); + instanceProvider = new FilteredInstanceProvider<>(cache, localFilters); } /** diff --git a/curator-x-discovery/src/test/java/org/apache/curator/x/discovery/ServiceCacheLeakTester.java b/curator-x-discovery/src/test/java/org/apache/curator/x/discovery/ServiceCacheLeakTester.java index 590c55c..3d357f6 100644 --- a/curator-x-discovery/src/test/java/org/apache/curator/x/discovery/ServiceCacheLeakTester.java +++ b/curator-x-discovery/src/test/java/org/apache/curator/x/discovery/ServiceCacheLeakTester.java @@ -22,9 +22,12 @@ import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; import org.apache.curator.retry.RetryOneTime; import org.apache.curator.test.TestingServer; +import org.apache.curator.test.compatibility.CuratorTestBase; import org.apache.curator.utils.CloseableUtils; import org.apache.curator.x.discovery.strategies.RandomStrategy; +import org.testng.annotations.Test; +@Test(groups = CuratorTestBase.zk35TestCompatibilityGroup) public class ServiceCacheLeakTester { public static void main(String[] args) throws Exception diff --git a/curator-x-discovery/src/test/java/org/apache/curator/x/discovery/TestServiceCache.java b/curator-x-discovery/src/test/java/org/apache/curator/x/discovery/TestServiceCache.java index fda5c26..6d24586 100644 --- a/curator-x-discovery/src/test/java/org/apache/curator/x/discovery/TestServiceCache.java +++ b/curator-x-discovery/src/test/java/org/apache/curator/x/discovery/TestServiceCache.java @@ -27,7 +27,9 @@ import org.apache.curator.retry.RetryOneTime; import org.apache.curator.test.BaseClassForTests; import org.apache.curator.test.ExecuteCalledWatchingExecutorService; import org.apache.curator.test.Timing; +import org.apache.curator.test.compatibility.CuratorTestBase; import org.apache.curator.utils.CloseableUtils; +import org.apache.curator.utils.Compatibility; import org.apache.curator.x.discovery.details.ServiceCacheListener; import org.testng.Assert; import org.testng.annotations.Test; @@ -40,6 +42,7 @@ import java.util.concurrent.Executors; import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; +@Test(groups = CuratorTestBase.zk35TestCompatibilityGroup) public class TestServiceCache extends BaseClassForTests { @Test @@ -261,6 +264,11 @@ public class TestServiceCache extends BaseClassForTests @Test public void testExecutorServiceIsInvoked() throws Exception { + if ( Compatibility.hasPersistentWatchers() ) + { + return; // for ZK 3.6 the underlying cache ignores the executor + } + List<Closeable> closeables = Lists.newArrayList(); try { diff --git a/curator-x-discovery/src/test/java/org/apache/curator/x/discovery/details/TestServiceCacheRace.java b/curator-x-discovery/src/test/java/org/apache/curator/x/discovery/details/TestServiceCacheRace.java index 06d63b9..9604c19 100644 --- a/curator-x-discovery/src/test/java/org/apache/curator/x/discovery/details/TestServiceCacheRace.java +++ b/curator-x-discovery/src/test/java/org/apache/curator/x/discovery/details/TestServiceCacheRace.java @@ -25,6 +25,7 @@ import org.apache.curator.framework.state.ConnectionState; import org.apache.curator.retry.RetryOneTime; import org.apache.curator.test.BaseClassForTests; import org.apache.curator.test.Timing; +import org.apache.curator.test.compatibility.CuratorTestBase; import org.apache.curator.utils.CloseableExecutorService; import org.apache.curator.utils.CloseableUtils; import org.apache.curator.x.discovery.ServiceCache; @@ -40,6 +41,7 @@ import java.util.List; import java.util.concurrent.CountDownLatch; import java.util.concurrent.Executors; +@Test(groups = CuratorTestBase.zk35TestCompatibilityGroup) public class TestServiceCacheRace extends BaseClassForTests { private final Timing timing = new Timing(); diff --git a/curator-x-discovery/src/test/java/org/apache/curator/x/discovery/details/TestServiceDiscovery.java b/curator-x-discovery/src/test/java/org/apache/curator/x/discovery/details/TestServiceDiscovery.java index a1c8cfe..7a23e16 100644 --- a/curator-x-discovery/src/test/java/org/apache/curator/x/discovery/details/TestServiceDiscovery.java +++ b/curator-x-discovery/src/test/java/org/apache/curator/x/discovery/details/TestServiceDiscovery.java @@ -26,6 +26,7 @@ import org.apache.curator.framework.CuratorFrameworkFactory; import org.apache.curator.retry.RetryOneTime; import org.apache.curator.test.BaseClassForTests; import org.apache.curator.test.Timing; +import org.apache.curator.test.compatibility.CuratorTestBase; import org.apache.curator.utils.CloseableUtils; import org.apache.curator.x.discovery.ServiceDiscovery; import org.apache.curator.x.discovery.ServiceDiscoveryBuilder; @@ -38,6 +39,7 @@ import java.util.List; import java.util.concurrent.CountDownLatch; import java.util.concurrent.Semaphore; +@Test(groups = CuratorTestBase.zk35TestCompatibilityGroup) public class TestServiceDiscovery extends BaseClassForTests { private static final Comparator<ServiceInstance<Void>> comparator = new Comparator<ServiceInstance<Void>>() diff --git a/curator-x-discovery/src/test/java/org/apache/curator/x/discovery/details/TestServiceDiscoveryBuilder.java b/curator-x-discovery/src/test/java/org/apache/curator/x/discovery/details/TestServiceDiscoveryBuilder.java index 312c884..fd0c438 100644 --- a/curator-x-discovery/src/test/java/org/apache/curator/x/discovery/details/TestServiceDiscoveryBuilder.java +++ b/curator-x-discovery/src/test/java/org/apache/curator/x/discovery/details/TestServiceDiscoveryBuilder.java @@ -22,15 +22,17 @@ import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; import org.apache.curator.retry.RetryOneTime; import org.apache.curator.test.BaseClassForTests; +import org.apache.curator.test.compatibility.CuratorTestBase; import org.apache.curator.x.discovery.ServiceDiscoveryBuilder; import org.apache.curator.x.discovery.ServiceInstance; import org.testng.Assert; import org.testng.annotations.Test; +@Test(groups = CuratorTestBase.zk35TestCompatibilityGroup) public class TestServiceDiscoveryBuilder extends BaseClassForTests { @Test - public void testDefaultSerializer() throws Exception + public void testDefaultSerializer() { CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new RetryOneTime(1)); ServiceDiscoveryBuilder<Object> builder = ServiceDiscoveryBuilder.builder(Object.class).client(client); @@ -41,7 +43,7 @@ public class TestServiceDiscoveryBuilder extends BaseClassForTests } @Test - public void testSetSerializer() throws Exception + public void testSetSerializer() { CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new RetryOneTime(1)); ServiceDiscoveryBuilder<Object> builder = ServiceDiscoveryBuilder.builder(Object.class).client(client); diff --git a/curator-x-discovery/src/test/java/org/apache/curator/x/discovery/details/TestServiceProvider.java b/curator-x-discovery/src/test/java/org/apache/curator/x/discovery/details/TestServiceProvider.java index d7358fe..b743e07 100644 --- a/curator-x-discovery/src/test/java/org/apache/curator/x/discovery/details/TestServiceProvider.java +++ b/curator-x-discovery/src/test/java/org/apache/curator/x/discovery/details/TestServiceProvider.java @@ -26,6 +26,7 @@ import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; import org.apache.curator.retry.RetryOneTime; import org.apache.curator.test.BaseClassForTests; +import org.apache.curator.test.compatibility.CuratorTestBase; import org.apache.curator.utils.CloseableUtils; import org.apache.curator.x.discovery.ServiceDiscovery; import org.apache.curator.x.discovery.ServiceDiscoveryBuilder; @@ -36,6 +37,7 @@ import org.testng.annotations.Test; import com.google.common.collect.Lists; +@Test(groups = CuratorTestBase.zk35TestCompatibilityGroup) public class TestServiceProvider extends BaseClassForTests { diff --git a/src/site/confluence/breaking-changes.confluence b/src/site/confluence/breaking-changes.confluence index 3170a16..af0dc04 100644 --- a/src/site/confluence/breaking-changes.confluence +++ b/src/site/confluence/breaking-changes.confluence @@ -8,3 +8,6 @@ need to use Curator with ZooKeeper 3.4.x you will need to use a previous version * Exhibitor support has been removed. * {{ConnectionHandlingPolicy}} and related classes have been removed. * The {{Reaper}} and {{ChildReaper}} classes/recipes have been removed. You should use ZooKeeper container nodes instead. +* {{newPersistentEphemeralNode(}} and {{newPathChildrenCache}} were removed from {{GroupMember}} +* {{ServiceCacheBuilder<T> executorService(CloseableExecutorService executorService)} was removed from {{ServiceCacheBuilder}} +* {{ServiceProviderBuilder<T> executorService(CloseableExecutorService executorService);)} was removed from {{ServiceProviderBuilder}}