This is an automated email from the ASF dual-hosted git repository. randgalt pushed a commit to branch CURATOR-549-zk36-persistent-watcher-bridge in repository https://gitbox.apache.org/repos/asf/curator.git
commit 20acdac9b1e2979236823997b832fefe24d56a16 Author: randgalt <randg...@apache.org> AuthorDate: Sun Nov 3 17:53:02 2019 -0500 CURATOR-549 Creates a simple bridge that, when using ZK 3.6.0 creates a CuratorCache, and for earlier versions creates a TreeCache. The curator-test-zk35 module ensures that both code paths are tested. --- .../cache/CompatibleCuratorCacheBridge.java | 113 ++++++++++++++++ .../recipes/cache/CuratorCacheBridge.java | 33 +++-- .../recipes/cache/CuratorCacheBuilder.java | 7 + .../recipes/cache/CuratorCacheBuilderImpl.java | 21 ++- .../framework/recipes/cache/CuratorCacheImpl.java | 2 +- .../cache/PathChildrenCacheListenerWrapper.java | 11 +- .../curator/framework/recipes/cache/TreeCache.java | 36 ++++-- curator-test-zk35/pom.xml | 46 +++++++ curator-x-async/pom.xml | 15 +++ .../details/CachedModeledFrameworkImpl.java | 11 +- .../x/async/modeled/details/ModeledCacheImpl.java | 50 +++++-- .../modeled/details/ModeledFrameworkImpl.java | 4 +- .../async/modeled/TestCachedModeledFramework.java | 2 + .../x/async/modeled/TestModeledFrameworkBase.java | 4 +- curator-x-discovery/pom.xml | 15 +++ .../apache/curator/x/discovery/ServiceCache.java | 20 ++- .../curator/x/discovery/ServiceProvider.java | 24 +++- .../discovery/details/ServiceCacheBuilderImpl.java | 6 +- .../x/discovery/details/ServiceCacheImpl.java | 143 ++++++++++++--------- .../x/discovery/details/ServiceDiscoveryImpl.java | 66 +++++----- .../x/discovery/details/ServiceProviderImpl.java | 8 ++ .../curator/x/discovery/TestServiceCache.java | 2 + .../x/discovery/details/TestServiceDiscovery.java | 2 + .../x/discovery/details/TestWatchedInstances.java | 2 + pom.xml | 14 ++ 25 files changed, 497 insertions(+), 160 deletions(-) diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/CompatibleCuratorCacheBridge.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/CompatibleCuratorCacheBridge.java new file mode 100644 index 0000000..a09196f --- /dev/null +++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/CompatibleCuratorCacheBridge.java @@ -0,0 +1,113 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.curator.framework.recipes.cache; + +import com.google.common.base.Preconditions; +import com.google.common.collect.Sets; +import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.framework.listen.Listenable; +import org.apache.curator.framework.listen.StandardListenerManager; +import java.util.Collections; +import java.util.Set; +import java.util.concurrent.Executor; + +import static org.apache.curator.framework.recipes.cache.CuratorCacheListener.Type.*; + +class CompatibleCuratorCacheBridge implements CuratorCacheBridge, TreeCacheListener +{ + private final TreeCache cache; + private final StandardListenerManager<CuratorCacheListener> listenerManager = StandardListenerManager.standard(); + + CompatibleCuratorCacheBridge(CuratorFramework client, String path, CuratorCache.Options[] optionsArg, Executor executor) + { + Set<CuratorCache.Options> options = (optionsArg != null) ? Sets.newHashSet(optionsArg) : Collections.emptySet(); + TreeCache.Builder builder = TreeCache.newBuilder(client, path).setCacheData(false); + if ( options.contains(CuratorCache.Options.SINGLE_NODE_CACHE) ) + { + builder.setMaxDepth(0); + } + if ( options.contains(CuratorCache.Options.COMPRESSED_DATA) ) + { + builder.setDataIsCompressed(true); + } + if ( executor != null ) + { + builder.setExecutor(executor); + } + cache = builder.build(); + } + + @Override + public void start() + { + try + { + cache.getListenable().addListener(this); + + cache.start(); + } + catch ( Exception e ) + { + throw new RuntimeException(e); + } + } + + @Override + public void close() + { + cache.close(); + } + + @Override + public Listenable<CuratorCacheListener> listenable() + { + return listenerManager; + } + + @Override + public void childEvent(CuratorFramework client, TreeCacheEvent event) throws Exception + { + switch ( event.getType() ) + { + case NODE_ADDED: + { + listenerManager.forEach(listener -> listener.event(NODE_CREATED, null, event.getData())); + break; + } + + case NODE_REMOVED: + { + listenerManager.forEach(listener -> listener.event(NODE_DELETED, event.getData(), null)); + break; + } + + case NODE_UPDATED: + { + listenerManager.forEach(listener -> listener.event(NODE_CHANGED, null, event.getData())); + break; + } + + case INITIALIZED: + { + listenerManager.forEach(CuratorCacheListener::initialized); + break; + } + } + } +} diff --git a/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/ServiceCache.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/CuratorCacheBridge.java similarity index 54% copy from curator-x-discovery/src/main/java/org/apache/curator/x/discovery/ServiceCache.java copy to curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/CuratorCacheBridge.java index a122d69..7103877 100644 --- a/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/ServiceCache.java +++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/CuratorCacheBridge.java @@ -16,29 +16,34 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.curator.x.discovery; +package org.apache.curator.framework.recipes.cache; import org.apache.curator.framework.listen.Listenable; -import org.apache.curator.x.discovery.details.InstanceProvider; -import org.apache.curator.x.discovery.details.ServiceCacheListener; import java.io.Closeable; -import java.util.List; -public interface ServiceCache<T> extends Closeable, Listenable<ServiceCacheListener>, InstanceProvider<T> +/** + * A facade that uses {@link org.apache.curator.framework.recipes.cache.CuratorCache} if + * persistent watches are available or a {@link org.apache.curator.framework.recipes.cache.TreeCache} + * otherwise + */ +public interface CuratorCacheBridge extends Closeable { /** - * Return the current list of instances. NOTE: there is no guarantee of freshness. This is - * merely the last known list of instances. However, the list is updated via a ZooKeeper watcher - * so it should be fresh within a window of a second or two. - * - * @return the list + * Start the cache. This will cause a complete refresh from the cache's root node and generate + * events for all nodes found, etc. + */ + void start(); + + /** + * Close the cache, stop responding to events, etc. */ - public List<ServiceInstance<T>> getInstances(); + @Override + void close(); /** - * The cache must be started before use + * Return the listener container so that listeners can be registered to be notified of changes to the cache * - * @throws Exception errors + * @return listener container */ - public void start() throws Exception; + Listenable<CuratorCacheListener> listenable(); } diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/CuratorCacheBuilder.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/CuratorCacheBuilder.java index 35a5f26..ab80a6f 100644 --- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/CuratorCacheBuilder.java +++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/CuratorCacheBuilder.java @@ -60,4 +60,11 @@ public interface CuratorCacheBuilder * @return new Curator Cache */ CuratorCache build(); + + /** + * Return a new bridge cache based on the builder methods that have been called. + * + * @return new bridge cache + */ + CuratorCacheBridge buildBridge(); } \ No newline at end of file diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/CuratorCacheBuilderImpl.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/CuratorCacheBuilderImpl.java index 9f9e03d..43491b1 100644 --- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/CuratorCacheBuilderImpl.java +++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/CuratorCacheBuilderImpl.java @@ -19,7 +19,9 @@ package org.apache.curator.framework.recipes.cache; +import com.google.common.base.Preconditions; import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.utils.Compatibility; import java.util.concurrent.Executor; import java.util.function.Consumer; @@ -69,6 +71,23 @@ class CuratorCacheBuilderImpl implements CuratorCacheBuilder @Override public CuratorCache build() { - return new CuratorCacheImpl(client, storage, path, options, executor, exceptionHandler); + return internalBuild(); + } + + @Override + public CuratorCacheBridge buildBridge() + { + Preconditions.checkArgument(storage == null, "Custom CuratorCacheStorage is not supported by the TreeCache bridge"); + if ( Compatibility.hasPersistentWatchers() ) + { + return internalBuild(); + } + Preconditions.checkArgument(exceptionHandler == null, "ExceptionHandler is not supported by the TreeCache bridge"); + return new CompatibleCuratorCacheBridge(client, path, options, executor); + } + + private CuratorCacheImpl internalBuild() + { + return new CuratorCacheImpl(client, CuratorCacheStorage.bytesNotCached(), path, options, executor, exceptionHandler); } } \ No newline at end of file diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/CuratorCacheImpl.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/CuratorCacheImpl.java index 8916399..1e62a39 100644 --- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/CuratorCacheImpl.java +++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/CuratorCacheImpl.java @@ -44,7 +44,7 @@ import static org.apache.curator.framework.recipes.cache.CuratorCacheListener.Ty import static org.apache.zookeeper.KeeperException.Code.NONODE; import static org.apache.zookeeper.KeeperException.Code.OK; -class CuratorCacheImpl implements CuratorCache +class CuratorCacheImpl implements CuratorCache, CuratorCacheBridge { private final Logger log = LoggerFactory.getLogger(getClass()); private final AtomicReference<State> state = new AtomicReference<>(State.LATENT); diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/PathChildrenCacheListenerWrapper.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/PathChildrenCacheListenerWrapper.java index a9123c1..7e9730c 100644 --- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/PathChildrenCacheListenerWrapper.java +++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/PathChildrenCacheListenerWrapper.java @@ -39,19 +39,19 @@ class PathChildrenCacheListenerWrapper implements CuratorCacheListener { case NODE_CREATED: { - sendEvent(data, PathChildrenCacheEvent.Type.CHILD_ADDED); + sendEvent(new PathChildrenCacheEvent(PathChildrenCacheEvent.Type.CHILD_ADDED, data)); break; } case NODE_CHANGED: { - sendEvent(data, PathChildrenCacheEvent.Type.CHILD_UPDATED); + sendEvent(new PathChildrenCacheEvent(PathChildrenCacheEvent.Type.CHILD_UPDATED, data)); break; } case NODE_DELETED: { - sendEvent(oldData, PathChildrenCacheEvent.Type.CHILD_REMOVED); + sendEvent(new PathChildrenCacheEvent(PathChildrenCacheEvent.Type.CHILD_REMOVED, oldData)); break; } } @@ -60,12 +60,11 @@ class PathChildrenCacheListenerWrapper implements CuratorCacheListener @Override public void initialized() { - sendEvent(null, PathChildrenCacheEvent.Type.INITIALIZED); + sendEvent(new PathChildrenCacheEvent(PathChildrenCacheEvent.Type.INITIALIZED, null)); } - private void sendEvent(ChildData node, PathChildrenCacheEvent.Type type) + private void sendEvent(PathChildrenCacheEvent event) { - PathChildrenCacheEvent event = new PathChildrenCacheEvent(type, node); try { listener.childEvent(client, event); diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/TreeCache.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/TreeCache.java index e2f3a8b..121e72c 100644 --- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/TreeCache.java +++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/TreeCache.java @@ -52,6 +52,7 @@ import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.Executor; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.ThreadFactory; @@ -78,6 +79,7 @@ import static org.apache.curator.utils.PathUtils.validatePath; public class TreeCache implements Closeable { private static final Logger LOG = LoggerFactory.getLogger(TreeCache.class); + private final Executor executor; private final boolean createParentNodes; private final boolean disableZkWatches; private final TreeCacheSelector selector; @@ -89,6 +91,7 @@ public class TreeCache implements Closeable private boolean cacheData = true; private boolean dataIsCompressed = false; private ExecutorService executorService = null; + private Executor executor = null; private int maxDepth = Integer.MAX_VALUE; private boolean createParentNodes = false; private boolean disableZkWatches = false; @@ -105,12 +108,12 @@ public class TreeCache implements Closeable */ public TreeCache build() { - ExecutorService executor = executorService; - if ( executor == null ) + ExecutorService localExecutorService = executorService; + if ( (localExecutorService == null) && (executor == null) ) { - executor = Executors.newSingleThreadExecutor(defaultThreadFactory); + localExecutorService = Executors.newSingleThreadExecutor(defaultThreadFactory); } - return new TreeCache(client, path, cacheData, dataIsCompressed, maxDepth, executor, createParentNodes, disableZkWatches, selector); + return new TreeCache(client, path, cacheData, dataIsCompressed, maxDepth, localExecutorService, executor, createParentNodes, disableZkWatches, selector); } /** @@ -149,6 +152,15 @@ public class TreeCache implements Closeable } /** + * Sets the executor to publish events; a default executor will be created if not specified. + */ + public Builder setExecutor(Executor executor) + { + this.executor = checkNotNull(executor); + return this; + } + + /** * Sets the maximum depth to explore/watch. A {@code maxDepth} of {@code 0} will watch only * the root node (like {@link NodeCache}); a {@code maxDepth} of {@code 1} will watch the * root node and its immediate children (kind of like {@link PathChildrenCache}. @@ -564,7 +576,7 @@ public class TreeCache implements Closeable */ public TreeCache(CuratorFramework client, String path) { - this(client, path, true, false, Integer.MAX_VALUE, Executors.newSingleThreadExecutor(defaultThreadFactory), false, false, new DefaultTreeCacheSelector()); + this(client, path, true, false, Integer.MAX_VALUE, Executors.newSingleThreadExecutor(defaultThreadFactory), null, false, false, new DefaultTreeCacheSelector()); } /** @@ -573,12 +585,14 @@ public class TreeCache implements Closeable * @param cacheData if true, node contents are cached in addition to the stat * @param dataIsCompressed if true, data in the path is compressed * @param executorService Closeable ExecutorService to use for the TreeCache's background thread + * @param executor executor to use for the TreeCache's background thread * @param createParentNodes true to create parent nodes as containers * @param disableZkWatches true to disable Zookeeper watches * @param selector the selector to use */ - TreeCache(CuratorFramework client, String path, boolean cacheData, boolean dataIsCompressed, int maxDepth, final ExecutorService executorService, boolean createParentNodes, boolean disableZkWatches, TreeCacheSelector selector) + TreeCache(CuratorFramework client, String path, boolean cacheData, boolean dataIsCompressed, int maxDepth, final ExecutorService executorService, final Executor executor, boolean createParentNodes, boolean disableZkWatches, TreeCacheSelector selector) { + this.executor = executor; this.createParentNodes = createParentNodes; this.selector = Preconditions.checkNotNull(selector, "selector cannot be null"); this.root = new TreeNode(validatePath(path), null); @@ -588,7 +602,7 @@ public class TreeCache implements Closeable this.dataIsCompressed = dataIsCompressed; this.maxDepth = maxDepth; this.disableZkWatches = disableZkWatches; - this.executorService = Preconditions.checkNotNull(executorService, "executorService cannot be null"); + this.executorService = executorService; } /** @@ -623,7 +637,10 @@ public class TreeCache implements Closeable client.removeWatchers(); client.getConnectionStateListenable().removeListener(connectionStateListener); listeners.clear(); - executorService.shutdown(); + if ( executorService != null ) + { + executorService.shutdown(); + } try { root.wasDeleted(); @@ -857,8 +874,9 @@ public class TreeCache implements Closeable { if ( treeState.get() != TreeState.CLOSED ) { + Executor localExecutor = (executorService != null) ? executorService : executor; LOG.debug("publishEvent: {}", event); - executorService.submit(new Runnable() + localExecutor.execute(new Runnable() { @Override public void run() diff --git a/curator-test-zk35/pom.xml b/curator-test-zk35/pom.xml index 5803893..4ca8356 100644 --- a/curator-test-zk35/pom.xml +++ b/curator-test-zk35/pom.xml @@ -89,6 +89,18 @@ <dependency> <groupId>org.apache.curator</groupId> + <artifactId>curator-x-discovery</artifactId> + <exclusions> + <exclusion> + <groupId>org.apache.zookeeper</groupId> + <artifactId>zookeeper</artifactId> + </exclusion> + </exclusions> + <scope>test</scope> + </dependency> + + <dependency> + <groupId>org.apache.curator</groupId> <artifactId>curator-recipes</artifactId> <exclusions> <exclusion> @@ -114,6 +126,32 @@ </dependency> <dependency> + <groupId>org.apache.curator</groupId> + <artifactId>curator-x-async</artifactId> + <exclusions> + <exclusion> + <groupId>org.apache.zookeeper</groupId> + <artifactId>zookeeper</artifactId> + </exclusion> + </exclusions> + <type>test-jar</type> + <scope>test</scope> + </dependency> + + <dependency> + <groupId>org.apache.curator</groupId> + <artifactId>curator-x-discovery</artifactId> + <exclusions> + <exclusion> + <groupId>org.apache.zookeeper</groupId> + <artifactId>zookeeper</artifactId> + </exclusion> + </exclusions> + <type>test-jar</type> + <scope>test</scope> + </dependency> + + <dependency> <groupId>org.testng</groupId> <artifactId>testng</artifactId> <scope>test</scope> @@ -124,6 +162,12 @@ <artifactId>slf4j-log4j12</artifactId> <scope>test</scope> </dependency> + + <dependency> + <groupId>com.fasterxml.jackson.core</groupId> + <artifactId>jackson-databind</artifactId> + <scope>test</scope> + </dependency> </dependencies> <build> @@ -151,6 +195,8 @@ <dependenciesToScan> <dependency>org.apache.curator:curator-framework</dependency> <dependency>org.apache.curator:curator-recipes</dependency> + <dependency>org.apache.curator:curator-x-async</dependency> + <dependency>org.apache.curator:curator-x-discovery</dependency> </dependenciesToScan> <groups>zk35,zk35Compatibility</groups> <excludedGroups>zk36</excludedGroups> diff --git a/curator-x-async/pom.xml b/curator-x-async/pom.xml index 5ffd774..a32dbd3 100644 --- a/curator-x-async/pom.xml +++ b/curator-x-async/pom.xml @@ -49,4 +49,19 @@ <scope>test</scope> </dependency> </dependencies> + + <build> + <plugins> + <plugin> + <artifactId>maven-jar-plugin</artifactId> + <executions> + <execution> + <goals> + <goal>test-jar</goal> + </goals> + </execution> + </executions> + </plugin> + </plugins> + </build> </project> \ No newline at end of file diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/CachedModeledFrameworkImpl.java b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/CachedModeledFrameworkImpl.java index c897b4e..2dd5625 100644 --- a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/CachedModeledFrameworkImpl.java +++ b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/CachedModeledFrameworkImpl.java @@ -37,7 +37,6 @@ import org.apache.zookeeper.server.DataTree; import java.util.List; import java.util.Map; import java.util.Optional; -import java.util.concurrent.Executor; import java.util.concurrent.ExecutorService; import java.util.function.Function; import java.util.function.Supplier; @@ -47,18 +46,16 @@ class CachedModeledFrameworkImpl<T> implements CachedModeledFramework<T> { private final ModeledFramework<T> client; private final ModeledCacheImpl<T> cache; - private final Executor executor; CachedModeledFrameworkImpl(ModeledFramework<T> client, ExecutorService executor) { - this(client, new ModeledCacheImpl<>(client.unwrap().unwrap(), client.modelSpec(), executor), executor); + this(client, new ModeledCacheImpl<>(client.unwrap().unwrap(), client.modelSpec(), executor)); } - private CachedModeledFrameworkImpl(ModeledFramework<T> client, ModeledCacheImpl<T> cache, Executor executor) + private CachedModeledFrameworkImpl(ModeledFramework<T> client, ModeledCacheImpl<T> cache) { this.client = client; this.cache = cache; - this.executor = executor; } @Override @@ -118,7 +115,7 @@ class CachedModeledFrameworkImpl<T> implements CachedModeledFramework<T> @Override public CachedModeledFramework<T> child(Object child) { - return new CachedModeledFrameworkImpl<>(client.child(child), cache, executor); + return new CachedModeledFrameworkImpl<>(client.child(child), cache); } @Override @@ -130,7 +127,7 @@ class CachedModeledFrameworkImpl<T> implements CachedModeledFramework<T> @Override public CachedModeledFramework<T> withPath(ZPath path) { - return new CachedModeledFrameworkImpl<>(client.withPath(path), cache, executor); + return new CachedModeledFrameworkImpl<>(client.withPath(path), cache); } @Override diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/ModeledCacheImpl.java b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/ModeledCacheImpl.java index b95e92d..54f01c1 100644 --- a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/ModeledCacheImpl.java +++ b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/ModeledCacheImpl.java @@ -19,34 +19,42 @@ package org.apache.curator.x.async.modeled.details; import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.framework.EnsureContainers; import org.apache.curator.framework.listen.Listenable; import org.apache.curator.framework.listen.ListenerContainer; -import org.apache.curator.framework.recipes.cache.TreeCache; +import org.apache.curator.framework.recipes.cache.CuratorCache; +import org.apache.curator.framework.recipes.cache.CuratorCacheBridge; +import org.apache.curator.framework.recipes.cache.CuratorCacheBuilder; +import org.apache.curator.framework.recipes.cache.CuratorCacheListener; import org.apache.curator.framework.recipes.cache.TreeCacheEvent; import org.apache.curator.framework.recipes.cache.TreeCacheListener; import org.apache.curator.utils.ThreadUtils; +import org.apache.curator.utils.ZKPaths; import org.apache.curator.x.async.api.CreateOption; import org.apache.curator.x.async.modeled.ModelSerializer; import org.apache.curator.x.async.modeled.ModelSpec; +import org.apache.curator.x.async.modeled.ZNode; import org.apache.curator.x.async.modeled.ZPath; import org.apache.curator.x.async.modeled.cached.ModeledCache; import org.apache.curator.x.async.modeled.cached.ModeledCacheListener; -import org.apache.curator.x.async.modeled.ZNode; import org.apache.zookeeper.data.Stat; import java.util.AbstractMap; import java.util.Map; import java.util.Optional; +import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutorService; import java.util.stream.Collectors; class ModeledCacheImpl<T> implements TreeCacheListener, ModeledCache<T> { - private final TreeCache cache; + private final CuratorCacheBridge cache; private final Map<ZPath, Entry<T>> entries = new ConcurrentHashMap<>(); private final ModelSerializer<T> serializer; private final ListenerContainer<ModeledCacheListener<T>> listenerContainer = new ListenerContainer<>(); private final ZPath basePath; + private final CuratorFramework client; + private final Set<CreateOption> options; private static final class Entry<T> { @@ -62,6 +70,7 @@ class ModeledCacheImpl<T> implements TreeCacheListener, ModeledCache<T> ModeledCacheImpl(CuratorFramework client, ModelSpec<T> modelSpec, ExecutorService executor) { + this.client = client; if ( !modelSpec.path().isResolved() && !modelSpec.path().isRoot() && modelSpec.path().parent().isResolved() ) { modelSpec = modelSpec.parent(); // i.e. the last item is a parameter @@ -69,19 +78,39 @@ class ModeledCacheImpl<T> implements TreeCacheListener, ModeledCache<T> basePath = modelSpec.path(); this.serializer = modelSpec.serializer(); - cache = TreeCache.newBuilder(client, basePath.fullPath()) - .setCacheData(false) - .setDataIsCompressed(modelSpec.createOptions().contains(CreateOption.compress)) - .setExecutor(executor) - .setCreateParentNodes(modelSpec.createOptions().contains(CreateOption.createParentsIfNeeded) || modelSpec.createOptions().contains(CreateOption.createParentsAsContainers)) - .build(); + options = modelSpec.createOptions(); + CuratorCacheBuilder builder = CuratorCache.builder(client, basePath.fullPath()); + if ( modelSpec.createOptions().contains(CreateOption.compress) ) + { + builder.withOptions(CuratorCache.Options.COMPRESSED_DATA); + } + if ( executor != null ) + { + builder.withExecutor(executor); + } + cache = builder.buildBridge(); } public void start() { + CuratorCacheListener listener = CuratorCacheListener.builder() + .forTreeCache(client, this) + .build(); try { - cache.getListenable().addListener(this); + if ( options.contains(CreateOption.createParentsIfNeeded) ) + { + if ( options.contains(CreateOption.createParentsAsContainers) ) + { + new EnsureContainers(client, basePath.fullPath()).ensure(); + } + else + { + ZKPaths.mkdirs(client.getZookeeperClient().getZooKeeper(), basePath.fullPath(), false, null, false); + } + } + + cache.listenable().addListener(listener); cache.start(); } catch ( Exception e ) @@ -92,7 +121,6 @@ class ModeledCacheImpl<T> implements TreeCacheListener, ModeledCache<T> public void close() { - cache.getListenable().removeListener(this); cache.close(); entries.clear(); } diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/ModeledFrameworkImpl.java b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/ModeledFrameworkImpl.java index dbbf3cb..799845f 100644 --- a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/ModeledFrameworkImpl.java +++ b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/ModeledFrameworkImpl.java @@ -113,14 +113,14 @@ public class ModeledFrameworkImpl<T> implements ModeledFramework<T> @Override public CachedModeledFramework<T> cached() { - return cached(ThreadUtils.newSingleThreadExecutor("CachedModeledFramework")); + return cached(null); } @Override public CachedModeledFramework<T> cached(ExecutorService executor) { Preconditions.checkState(!isWatched, "CachedModeledFramework cannot be used with watched instances as the internal cache would bypass the watchers."); - return new CachedModeledFrameworkImpl<>(this, Objects.requireNonNull(executor, "executor cannot be null")); + return new CachedModeledFrameworkImpl<>(this, null); } @Override diff --git a/curator-x-async/src/test/java/org/apache/curator/x/async/modeled/TestCachedModeledFramework.java b/curator-x-async/src/test/java/org/apache/curator/x/async/modeled/TestCachedModeledFramework.java index 2d33c13..830ee2b 100644 --- a/curator-x-async/src/test/java/org/apache/curator/x/async/modeled/TestCachedModeledFramework.java +++ b/curator-x-async/src/test/java/org/apache/curator/x/async/modeled/TestCachedModeledFramework.java @@ -21,6 +21,7 @@ package org.apache.curator.x.async.modeled; import com.google.common.collect.Sets; import org.apache.curator.framework.state.ConnectionState; import org.apache.curator.test.Timing; +import org.apache.curator.test.compatibility.CuratorTestBase; import org.apache.curator.x.async.modeled.cached.CachedModeledFramework; import org.apache.curator.x.async.modeled.cached.ModeledCacheListener; import org.apache.curator.x.async.modeled.models.TestModel; @@ -38,6 +39,7 @@ import java.util.function.Function; import java.util.stream.Collectors; import java.util.stream.Stream; +@Test(groups = CuratorTestBase.zk35CompatibilityGroup) public class TestCachedModeledFramework extends TestModeledFrameworkBase { @Test diff --git a/curator-x-async/src/test/java/org/apache/curator/x/async/modeled/TestModeledFrameworkBase.java b/curator-x-async/src/test/java/org/apache/curator/x/async/modeled/TestModeledFrameworkBase.java index 61a4570..5660539 100644 --- a/curator-x-async/src/test/java/org/apache/curator/x/async/modeled/TestModeledFrameworkBase.java +++ b/curator-x-async/src/test/java/org/apache/curator/x/async/modeled/TestModeledFrameworkBase.java @@ -37,7 +37,7 @@ public class TestModeledFrameworkBase extends CompletableBaseClassForTests protected ModelSpec<TestNewerModel> newModelSpec; protected AsyncCuratorFramework async; - @BeforeMethod + @BeforeMethod(alwaysRun = true) @Override public void setup() throws Exception { @@ -54,7 +54,7 @@ public class TestModeledFrameworkBase extends CompletableBaseClassForTests newModelSpec = ModelSpec.builder(path, newSerializer).build(); } - @AfterMethod + @AfterMethod(alwaysRun = true) @Override public void teardown() throws Exception { diff --git a/curator-x-discovery/pom.xml b/curator-x-discovery/pom.xml index 824231d..50b2dea 100644 --- a/curator-x-discovery/pom.xml +++ b/curator-x-discovery/pom.xml @@ -80,4 +80,19 @@ <scope>test</scope> </dependency> </dependencies> + + <build> + <plugins> + <plugin> + <artifactId>maven-jar-plugin</artifactId> + <executions> + <execution> + <goals> + <goal>test-jar</goal> + </goals> + </execution> + </executions> + </plugin> + </plugins> + </build> </project> diff --git a/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/ServiceCache.java b/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/ServiceCache.java index a122d69..270005e 100644 --- a/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/ServiceCache.java +++ b/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/ServiceCache.java @@ -23,6 +23,7 @@ import org.apache.curator.x.discovery.details.InstanceProvider; import org.apache.curator.x.discovery.details.ServiceCacheListener; import java.io.Closeable; import java.util.List; +import java.util.concurrent.CountDownLatch; public interface ServiceCache<T> extends Closeable, Listenable<ServiceCacheListener>, InstanceProvider<T> { @@ -33,12 +34,25 @@ public interface ServiceCache<T> extends Closeable, Listenable<ServiceCacheListe * * @return the list */ - public List<ServiceInstance<T>> getInstances(); + List<ServiceInstance<T>> getInstances(); /** - * The cache must be started before use + * The cache must be started before use. This method blocks while the internal + * cache is loaded. * * @throws Exception errors */ - public void start() throws Exception; + void start() throws Exception; + + /** + * The cache must be started before use. This version returns immediately. + * Use the returned latch to block until the cache is loaded + * + * @return a latch that can be used to block until the cache is loaded + * @throws Exception errors + */ + default CountDownLatch startImmediate() throws Exception + { + throw new UnsupportedOperationException(); + } } diff --git a/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/ServiceProvider.java b/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/ServiceProvider.java index f542ed3..d09885b 100644 --- a/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/ServiceProvider.java +++ b/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/ServiceProvider.java @@ -23,6 +23,7 @@ import org.apache.curator.x.discovery.details.InstanceProvider; import java.io.Closeable; import java.io.IOException; import java.util.Collection; +import java.util.concurrent.CountDownLatch; /** * The main API for Discovery. This class is essentially a facade over a {@link ProviderStrategy} @@ -31,11 +32,24 @@ import java.util.Collection; public interface ServiceProvider<T> extends Closeable { /** - * The provider must be started before use + * The provider must be started before use. This method blocks while the internal + * cache is loaded. * * @throws Exception any errors */ - public void start() throws Exception; + void start() throws Exception; + + /** + * The provider must be started before use. This version returns immediately. + * Use the returned latch to block until the cache is loaded + * + * @return a latch that can be used to block until the cache is loaded + * @throws Exception errors + */ + default CountDownLatch startImmediate() throws Exception + { + throw new UnsupportedOperationException(); + } /** * Return an instance for a single use. <b>IMPORTANT: </b> users @@ -44,7 +58,7 @@ public interface ServiceProvider<T> extends Closeable * @return the instance to use * @throws Exception any errors */ - public ServiceInstance<T> getInstance() throws Exception; + ServiceInstance<T> getInstance() throws Exception; /** * Return the current available set of instances <b>IMPORTANT: </b> users @@ -53,7 +67,7 @@ public interface ServiceProvider<T> extends Closeable * @return all known instances * @throws Exception any errors */ - public Collection<ServiceInstance<T>> getAllInstances() throws Exception; + Collection<ServiceInstance<T>> getAllInstances() throws Exception; /** * Take note of an error connecting to the given instance. The instance will potentially @@ -61,7 +75,7 @@ public interface ServiceProvider<T> extends Closeable * * @param instance instance that had an error */ - public void noteError(ServiceInstance<T> instance); + void noteError(ServiceInstance<T> instance); /** * Close the provider. Note: it's the provider's responsibility to close any caches it manages diff --git a/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceCacheBuilderImpl.java b/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceCacheBuilderImpl.java index 8922233..501e289 100644 --- a/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceCacheBuilderImpl.java +++ b/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceCacheBuilderImpl.java @@ -47,13 +47,13 @@ class ServiceCacheBuilderImpl<T> implements ServiceCacheBuilder<T> @Override public ServiceCache<T> build() { - if (executorService != null) + if (threadFactory != null) { - return new ServiceCacheImpl<T>(discovery, name, executorService); + return new ServiceCacheImpl<T>(discovery, name, threadFactory); } else { - return new ServiceCacheImpl<T>(discovery, name, threadFactory); + return new ServiceCacheImpl<T>(discovery, name, executorService); } } diff --git a/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceCacheImpl.java b/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceCacheImpl.java index d1a31ad..a752ead 100644 --- a/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceCacheImpl.java +++ b/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceCacheImpl.java @@ -16,6 +16,7 @@ * specific language governing permissions and limitations * under the License. */ + package org.apache.curator.x.discovery.details; import com.google.common.annotations.VisibleForTesting; @@ -23,14 +24,20 @@ import com.google.common.base.Function; import com.google.common.base.Preconditions; import com.google.common.collect.Lists; import com.google.common.collect.Maps; -import org.apache.curator.utils.CloseableExecutorService; -import org.apache.curator.utils.CloseableUtils; import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.framework.EnsureContainers; import org.apache.curator.framework.listen.ListenerContainer; import org.apache.curator.framework.recipes.cache.ChildData; -import org.apache.curator.framework.recipes.cache.PathChildrenCache; +import org.apache.curator.framework.recipes.cache.CuratorCache; +import org.apache.curator.framework.recipes.cache.CuratorCacheBridge; +import org.apache.curator.framework.recipes.cache.CuratorCacheBuilder; +import org.apache.curator.framework.recipes.cache.CuratorCacheListener; +import org.apache.curator.framework.recipes.cache.CuratorCacheStorage; import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent; import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener; +import org.apache.curator.utils.CloseableExecutorService; +import org.apache.curator.utils.CloseableUtils; +import org.apache.curator.utils.Compatibility; import org.apache.curator.utils.ZKPaths; import org.apache.curator.x.discovery.ServiceCache; import org.apache.curator.x.discovery.ServiceInstance; @@ -45,17 +52,17 @@ import java.util.concurrent.atomic.AtomicReference; public class ServiceCacheImpl<T> implements ServiceCache<T>, PathChildrenCacheListener { - private final ListenerContainer<ServiceCacheListener> listenerContainer = new ListenerContainer<ServiceCacheListener>(); - private final ServiceDiscoveryImpl<T> discovery; - private final AtomicReference<State> state = new AtomicReference<State>(State.LATENT); - private final PathChildrenCache cache; - private final ConcurrentMap<String, ServiceInstance<T>> instances = Maps.newConcurrentMap(); + private final ListenerContainer<ServiceCacheListener> listenerContainer = new ListenerContainer<ServiceCacheListener>(); + private final ServiceDiscoveryImpl<T> discovery; + private final AtomicReference<State> state = new AtomicReference<State>(State.LATENT); + private final CuratorCacheBridge cache; + private final ConcurrentMap<String, ServiceInstance<T>> instances = Maps.newConcurrentMap(); + private final CountDownLatch initializedLatch = new CountDownLatch(1); + private String path; private enum State { - LATENT, - STARTED, - STOPPED + LATENT, STARTED, STOPPED } private static CloseableExecutorService convertThreadFactory(ThreadFactory threadFactory) @@ -73,12 +80,20 @@ public class ServiceCacheImpl<T> implements ServiceCache<T>, PathChildrenCacheLi { Preconditions.checkNotNull(discovery, "discovery cannot be null"); Preconditions.checkNotNull(name, "name cannot be null"); - Preconditions.checkNotNull(executorService, "executorService cannot be null"); this.discovery = discovery; - cache = new PathChildrenCache(discovery.getClient(), discovery.pathForName(name), true, false, executorService); - cache.getListenable().addListener(this); + path = discovery.pathForName(name); + + CuratorCacheBuilder builder = CuratorCache.builder(discovery.getClient(), path); + if ( executorService != null ) + { + builder.withExecutor(executorService::submit); + } + cache = builder.buildBridge(); + + CuratorCacheListener listener = CuratorCacheListener.builder().forPathChildrenCache(discovery.getClient(), this).forInitialized(this::initialized).build(); + cache.listenable().addListener(listener); } @Override @@ -94,9 +109,17 @@ public class ServiceCacheImpl<T> implements ServiceCache<T>, PathChildrenCacheLi @Override public void start() throws Exception { + startImmediate().await(); + } + + @Override + public CountDownLatch startImmediate() throws Exception + { Preconditions.checkState(state.compareAndSet(State.LATENT, State.STARTED), "Cannot be started more than once"); - cache.start(true); + new EnsureContainers(discovery.getClient(), path).ensure(); + + cache.start(); if ( debugStartLatch != null ) { debugStartLatch.countDown(); @@ -108,14 +131,7 @@ public class ServiceCacheImpl<T> implements ServiceCache<T>, PathChildrenCacheLi debugStartWaitLatch = null; } - for ( ChildData childData : cache.getCurrentData() ) - { - if ( childData.getData() != null ) // else already processed by the cache listener - { - addInstance(childData, true); - } - } - discovery.cacheOpened(this); + return initializedLatch; } @Override @@ -123,18 +139,15 @@ public class ServiceCacheImpl<T> implements ServiceCache<T>, PathChildrenCacheLi { Preconditions.checkState(state.compareAndSet(State.STARTED, State.STOPPED), "Already closed or has not been started"); - listenerContainer.forEach - ( - new Function<ServiceCacheListener, Void>() - { - @Override - public Void apply(ServiceCacheListener listener) - { - discovery.getClient().getConnectionStateListenable().removeListener(listener); - return null; - } - } - ); + listenerContainer.forEach(new Function<ServiceCacheListener, Void>() + { + @Override + public Void apply(ServiceCacheListener listener) + { + discovery.getClient().getConnectionStateListenable().removeListener(listener); + return null; + } + }); listenerContainer.clear(); CloseableUtils.closeQuietly(cache); @@ -166,39 +179,42 @@ public class ServiceCacheImpl<T> implements ServiceCache<T>, PathChildrenCacheLi @Override public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception { - boolean notifyListeners = false; + boolean notifyListeners = false; switch ( event.getType() ) { - case CHILD_ADDED: - case CHILD_UPDATED: + case CHILD_ADDED: + case CHILD_UPDATED: + { + if ( !event.getData().getPath().equals(path) ) { - addInstance(event.getData(), false); + addInstance(event.getData()); notifyListeners = true; - break; } + break; + } - case CHILD_REMOVED: + case CHILD_REMOVED: + { + if ( !event.getData().getPath().equals(path) ) { instances.remove(instanceIdFromData(event.getData())); notifyListeners = true; - break; } + break; + } } - if ( notifyListeners ) + if ( notifyListeners && (initializedLatch.getCount() == 0) ) { - listenerContainer.forEach - ( - new Function<ServiceCacheListener, Void>() + listenerContainer.forEach(new Function<ServiceCacheListener, Void>() + { + @Override + public Void apply(ServiceCacheListener listener) { - @Override - public Void apply(ServiceCacheListener listener) - { - listener.cacheChanged(); - return null; - } + listener.cacheChanged(); + return null; } - ); + }); } } @@ -207,18 +223,23 @@ public class ServiceCacheImpl<T> implements ServiceCache<T>, PathChildrenCacheLi return ZKPaths.getNodeFromPath(childData.getPath()); } - private void addInstance(ChildData childData, boolean onlyIfAbsent) throws Exception + private void addInstance(ChildData childData) { - String instanceId = instanceIdFromData(childData); - ServiceInstance<T> serviceInstance = discovery.getSerializer().deserialize(childData.getData()); - if ( onlyIfAbsent ) + try { - instances.putIfAbsent(instanceId, serviceInstance); + String instanceId = instanceIdFromData(childData); + ServiceInstance<T> serviceInstance = discovery.getSerializer().deserialize(childData.getData()); + instances.put(instanceId, serviceInstance); } - else + catch ( Exception e ) { - instances.put(instanceId, serviceInstance); + throw new RuntimeException(e); } - cache.clearDataBytes(childData.getPath(), childData.getStat().getVersion()); + } + + private void initialized() + { + discovery.cacheOpened(this); + initializedLatch.countDown(); } } diff --git a/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceDiscoveryImpl.java b/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceDiscoveryImpl.java index 476705c..12b81f4 100644 --- a/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceDiscoveryImpl.java +++ b/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceDiscoveryImpl.java @@ -26,8 +26,10 @@ import com.google.common.collect.Lists; import com.google.common.collect.Maps; import com.google.common.collect.Sets; import org.apache.curator.framework.CuratorFramework; -import org.apache.curator.framework.recipes.cache.NodeCache; -import org.apache.curator.framework.recipes.cache.NodeCacheListener; +import org.apache.curator.framework.recipes.cache.ChildData; +import org.apache.curator.framework.recipes.cache.CuratorCache; +import org.apache.curator.framework.recipes.cache.CuratorCacheBridge; +import org.apache.curator.framework.recipes.cache.CuratorCacheListener; import org.apache.curator.framework.state.ConnectionState; import org.apache.curator.framework.state.ConnectionStateListener; import org.apache.curator.utils.CloseableUtils; @@ -92,7 +94,7 @@ public class ServiceDiscoveryImpl<T> implements ServiceDiscovery<T> private static class Entry<T> { private volatile ServiceInstance<T> service; - private volatile NodeCache cache; + private volatile CuratorCacheBridge cache; private Entry(ServiceInstance<T> service) { @@ -277,8 +279,7 @@ public class ServiceDiscoveryImpl<T> implements ServiceDiscovery<T> @Override public ServiceCacheBuilder<T> serviceCacheBuilder() { - return new ServiceCacheBuilderImpl<T>(this) - .threadFactory(ThreadUtils.newThreadFactory("ServiceCache")); + return new ServiceCacheBuilderImpl<T>(this); } /** @@ -458,52 +459,47 @@ public class ServiceDiscoveryImpl<T> implements ServiceDiscovery<T> } } - private NodeCache makeNodeCache(final ServiceInstance<T> instance) + private CuratorCacheBridge makeNodeCache(final ServiceInstance<T> instance) { if ( !watchInstances ) { return null; } - final NodeCache nodeCache = new NodeCache(client, pathForInstance(instance.getName(), instance.getId())); - try - { - nodeCache.start(true); - } - catch ( InterruptedException e) - { - Thread.currentThread().interrupt(); - return null; - } - catch ( Exception e ) - { - log.error("Could not start node cache for: " + instance, e); - } - NodeCacheListener listener = new NodeCacheListener() - { - @Override - public void nodeChanged() throws Exception - { - if ( nodeCache.getCurrentData() != null ) + CuratorCacheBridge cache = CuratorCache.builder(client, pathForInstance(instance.getName(), instance.getId())) + .withOptions(CuratorCache.Options.SINGLE_NODE_CACHE) + .buildBridge(); + CuratorCacheListener listener = CuratorCacheListener.builder() + .afterInitialized() + .forAll((__, ___, data) -> { + if ( data != null ) { - ServiceInstance<T> newInstance = serializer.deserialize(nodeCache.getCurrentData().getData()); - Entry<T> entry = services.get(newInstance.getId()); - if ( entry != null ) + try { - synchronized(entry) + ServiceInstance<T> newInstance = serializer.deserialize(data.getData()); + Entry<T> entry = services.get(newInstance.getId()); + if ( entry != null ) { - entry.service = newInstance; + synchronized(entry) + { + entry.service = newInstance; + } } } + catch ( Exception e ) + { + log.debug("Could not deserialize: " + data.getPath()); + } } else { log.warn("Instance data has been deleted for: " + instance); } - } - }; - nodeCache.getListenable().addListener(listener); - return nodeCache; + }) + .build(); + cache.listenable().addListener(listener); + cache.start(); + return cache; } private void internalUnregisterService(final Entry<T> entry) throws Exception diff --git a/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceProviderImpl.java b/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceProviderImpl.java index 2ab1434..a411f33 100644 --- a/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceProviderImpl.java +++ b/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceProviderImpl.java @@ -29,6 +29,7 @@ import java.io.IOException; import java.util.ArrayList; import java.util.Collection; import java.util.List; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.ThreadFactory; /** @@ -76,6 +77,13 @@ public class ServiceProviderImpl<T> implements ServiceProvider<T> discovery.providerOpened(this); } + @Override + public CountDownLatch startImmediate() throws Exception + { + discovery.providerOpened(this); + return cache.startImmediate(); + } + /** * {@inheritDoc} */ diff --git a/curator-x-discovery/src/test/java/org/apache/curator/x/discovery/TestServiceCache.java b/curator-x-discovery/src/test/java/org/apache/curator/x/discovery/TestServiceCache.java index fda5c26..2a9d2d8 100644 --- a/curator-x-discovery/src/test/java/org/apache/curator/x/discovery/TestServiceCache.java +++ b/curator-x-discovery/src/test/java/org/apache/curator/x/discovery/TestServiceCache.java @@ -27,6 +27,7 @@ import org.apache.curator.retry.RetryOneTime; import org.apache.curator.test.BaseClassForTests; import org.apache.curator.test.ExecuteCalledWatchingExecutorService; import org.apache.curator.test.Timing; +import org.apache.curator.test.compatibility.CuratorTestBase; import org.apache.curator.utils.CloseableUtils; import org.apache.curator.x.discovery.details.ServiceCacheListener; import org.testng.Assert; @@ -40,6 +41,7 @@ import java.util.concurrent.Executors; import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; +@Test(groups = CuratorTestBase.zk35CompatibilityGroup) public class TestServiceCache extends BaseClassForTests { @Test diff --git a/curator-x-discovery/src/test/java/org/apache/curator/x/discovery/details/TestServiceDiscovery.java b/curator-x-discovery/src/test/java/org/apache/curator/x/discovery/details/TestServiceDiscovery.java index 54719a5..d86e193 100644 --- a/curator-x-discovery/src/test/java/org/apache/curator/x/discovery/details/TestServiceDiscovery.java +++ b/curator-x-discovery/src/test/java/org/apache/curator/x/discovery/details/TestServiceDiscovery.java @@ -26,6 +26,7 @@ import org.apache.curator.framework.CuratorFrameworkFactory; import org.apache.curator.retry.RetryOneTime; import org.apache.curator.test.BaseClassForTests; import org.apache.curator.test.Timing; +import org.apache.curator.test.compatibility.CuratorTestBase; import org.apache.curator.utils.CloseableUtils; import org.apache.curator.utils.Compatibility; import org.apache.curator.x.discovery.ServiceDiscovery; @@ -39,6 +40,7 @@ import java.util.List; import java.util.concurrent.CountDownLatch; import java.util.concurrent.Semaphore; +@Test(groups = CuratorTestBase.zk35CompatibilityGroup) public class TestServiceDiscovery extends BaseClassForTests { private static final Comparator<ServiceInstance<Void>> comparator = new Comparator<ServiceInstance<Void>>() diff --git a/curator-x-discovery/src/test/java/org/apache/curator/x/discovery/details/TestWatchedInstances.java b/curator-x-discovery/src/test/java/org/apache/curator/x/discovery/details/TestWatchedInstances.java index 2d03c47..a96ff6f 100644 --- a/curator-x-discovery/src/test/java/org/apache/curator/x/discovery/details/TestWatchedInstances.java +++ b/curator-x-discovery/src/test/java/org/apache/curator/x/discovery/details/TestWatchedInstances.java @@ -24,6 +24,7 @@ import org.apache.curator.framework.CuratorFrameworkFactory; import org.apache.curator.retry.RetryOneTime; import org.apache.curator.test.BaseClassForTests; import org.apache.curator.test.Timing; +import org.apache.curator.test.compatibility.CuratorTestBase; import org.apache.curator.utils.CloseableUtils; import org.apache.curator.x.discovery.ServiceDiscovery; import org.apache.curator.x.discovery.ServiceDiscoveryBuilder; @@ -35,6 +36,7 @@ import java.util.Arrays; import java.util.Collections; import java.util.List; +@Test(groups = CuratorTestBase.zk35CompatibilityGroup) public class TestWatchedInstances extends BaseClassForTests { @Test diff --git a/pom.xml b/pom.xml index e223de6..00083c4 100644 --- a/pom.xml +++ b/pom.xml @@ -385,6 +385,13 @@ <groupId>org.apache.curator</groupId> <artifactId>curator-x-discovery</artifactId> <version>${project.version}</version> + <type>test-jar</type> + </dependency> + + <dependency> + <groupId>org.apache.curator</groupId> + <artifactId>curator-x-discovery</artifactId> + <version>${project.version}</version> </dependency> <dependency> @@ -400,6 +407,13 @@ </dependency> <dependency> + <groupId>org.apache.curator</groupId> + <artifactId>curator-x-async</artifactId> + <version>${project.version}</version> + <type>test-jar</type> + </dependency> + + <dependency> <groupId>org.apache.commons</groupId> <artifactId>commons-math</artifactId> <version>${commons-math-version}</version>