This is an automated email from the ASF dual-hosted git repository. randgalt pushed a commit to branch CURATOR-549-zk36-persistent-watcher-bridge in repository https://gitbox.apache.org/repos/asf/curator.git
commit b0e4f4fd2b2193496337c0d473dfc72ae9888b9a Author: randgalt <randg...@apache.org> AuthorDate: Sun Nov 3 17:53:02 2019 -0500 CURATOR-549 Creates a simple bridge that, when using ZK 3.6.0 creates a CuratorCache, and for earlier versions creates a TreeCache. The curator-test-zk35 module ensures that both code paths are tested. --- .../cache/CompatibleCuratorCacheBridge.java | 126 +++++++++++++++++ .../recipes/cache/CuratorCacheBridge.java | 59 ++++++++ .../recipes/cache/CuratorCacheBuilder.java | 8 ++ .../recipes/cache/CuratorCacheBuilderImpl.java | 19 +++ .../framework/recipes/cache/CuratorCacheImpl.java | 9 +- .../recipes/cache/CuratorCacheListenerBuilder.java | 22 ++- .../cache/CuratorCacheListenerBuilderImpl.java | 19 ++- .../recipes/cache/CuratorCacheStorage.java | 1 + .../cache/PathChildrenCacheListenerWrapper.java | 11 +- .../curator/framework/recipes/cache/TreeCache.java | 36 +++-- .../framework/recipes/nodes/GroupMember.java | 43 +++--- .../recipes/cache/TestCuratorCacheWrappers.java | 2 +- .../framework/recipes/nodes/TestGroupMember.java | 2 + curator-test-zk35/pom.xml | 46 ++++++ curator-x-async/pom.xml | 15 ++ .../details/CachedModeledFrameworkImpl.java | 11 +- .../x/async/modeled/details/ModeledCacheImpl.java | 77 +++++++---- .../modeled/details/ModeledFrameworkImpl.java | 4 +- .../async/modeled/TestCachedModeledFramework.java | 2 + .../x/async/modeled/TestModeledFrameworkBase.java | 4 +- curator-x-discovery/pom.xml | 15 ++ .../apache/curator/x/discovery/ServiceCache.java | 20 ++- .../curator/x/discovery/ServiceProvider.java | 24 +++- .../discovery/details/ServiceCacheBuilderImpl.java | 6 +- .../x/discovery/details/ServiceCacheImpl.java | 154 ++++++++++++--------- .../x/discovery/details/ServiceDiscoveryImpl.java | 65 ++++----- .../x/discovery/details/ServiceProviderImpl.java | 8 ++ .../curator/x/discovery/TestServiceCache.java | 2 + .../x/discovery/details/TestServiceCacheRace.java | 2 + .../x/discovery/details/TestServiceDiscovery.java | 2 + .../x/discovery/details/TestWatchedInstances.java | 2 + pom.xml | 14 ++ 32 files changed, 632 insertions(+), 198 deletions(-) diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/CompatibleCuratorCacheBridge.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/CompatibleCuratorCacheBridge.java new file mode 100644 index 0000000..f7c1428 --- /dev/null +++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/CompatibleCuratorCacheBridge.java @@ -0,0 +1,126 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.curator.framework.recipes.cache; + +import com.google.common.collect.Sets; +import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.framework.listen.Listenable; +import org.apache.curator.framework.listen.StandardListenerManager; +import java.util.Collections; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.Executor; +import java.util.stream.Stream; + +import static org.apache.curator.framework.recipes.cache.CuratorCacheListener.Type.*; + +@SuppressWarnings("deprecation") +class CompatibleCuratorCacheBridge implements CuratorCacheBridge, TreeCacheListener +{ + private final TreeCache cache; + private final StandardListenerManager<CuratorCacheListener> listenerManager = StandardListenerManager.standard(); + + CompatibleCuratorCacheBridge(CuratorFramework client, String path, CuratorCache.Options[] optionsArg, Executor executor, boolean cacheData) + { + Set<CuratorCache.Options> options = (optionsArg != null) ? Sets.newHashSet(optionsArg) : Collections.emptySet(); + TreeCache.Builder builder = TreeCache.newBuilder(client, path).setCacheData(cacheData); + if ( options.contains(CuratorCache.Options.SINGLE_NODE_CACHE) ) + { + builder.setMaxDepth(0); + } + if ( options.contains(CuratorCache.Options.COMPRESSED_DATA) ) + { + builder.setDataIsCompressed(true); + } + if ( executor != null ) + { + builder.setExecutor(executor); + } + cache = builder.build(); + } + + @Override + public void start() + { + try + { + cache.getListenable().addListener(this); + + cache.start(); + } + catch ( Exception e ) + { + throw new RuntimeException(e); + } + } + + @Override + public void close() + { + cache.close(); + } + + @Override + public Listenable<CuratorCacheListener> listenable() + { + return listenerManager; + } + + @Override + public Stream<ChildData> streamImmediateChildren(String fromParent) + { + Map<String, ChildData> currentChildren = cache.getCurrentChildren(fromParent); + if ( currentChildren == null ) + { + return Stream.empty(); + } + return currentChildren.values().stream(); + } + + @Override + public void childEvent(CuratorFramework client, TreeCacheEvent event) throws Exception + { + switch ( event.getType() ) + { + case NODE_ADDED: + { + listenerManager.forEach(listener -> listener.event(NODE_CREATED, null, event.getData())); + break; + } + + case NODE_REMOVED: + { + listenerManager.forEach(listener -> listener.event(NODE_DELETED, event.getData(), null)); + break; + } + + case NODE_UPDATED: + { + listenerManager.forEach(listener -> listener.event(NODE_CHANGED, null, event.getData())); + break; + } + + case INITIALIZED: + { + listenerManager.forEach(CuratorCacheListener::initialized); + break; + } + } + } +} diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/CuratorCacheBridge.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/CuratorCacheBridge.java new file mode 100644 index 0000000..3e329fc --- /dev/null +++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/CuratorCacheBridge.java @@ -0,0 +1,59 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.curator.framework.recipes.cache; + +import org.apache.curator.framework.listen.Listenable; +import java.io.Closeable; +import java.util.stream.Stream; + +/** + * A facade that uses {@link org.apache.curator.framework.recipes.cache.CuratorCache} if + * persistent watches are available or a {@link org.apache.curator.framework.recipes.cache.TreeCache} + * otherwise + */ +@SuppressWarnings("deprecation") +public interface CuratorCacheBridge extends Closeable +{ + /** + * Start the cache. This will cause a complete refresh from the cache's root node and generate + * events for all nodes found, etc. + */ + void start(); + + /** + * Close the cache, stop responding to events, etc. + */ + @Override + void close(); + + /** + * Return the listener container so that listeners can be registered to be notified of changes to the cache + * + * @return listener container + */ + Listenable<CuratorCacheListener> listenable(); + + /** + * Return a stream over the storage entries that are the immediate children of the given node. + * + * @param fromParent the parent node - determines the children returned in the stream + * @return stream over entries + */ + Stream<ChildData> streamImmediateChildren(String fromParent); +} diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/CuratorCacheBuilder.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/CuratorCacheBuilder.java index 35a5f26..a249d60 100644 --- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/CuratorCacheBuilder.java +++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/CuratorCacheBuilder.java @@ -60,4 +60,12 @@ public interface CuratorCacheBuilder * @return new Curator Cache */ CuratorCache build(); + + /** + * Return a new bridge cache based on the builder methods that have been called. + * + * @param cacheData if true, keep the data bytes cached. If false, clear them after sending notifications + * @return new bridge cache + */ + CuratorCacheBridge buildBridge(boolean cacheData); } \ No newline at end of file diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/CuratorCacheBuilderImpl.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/CuratorCacheBuilderImpl.java index 9f9e03d..128bf8b 100644 --- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/CuratorCacheBuilderImpl.java +++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/CuratorCacheBuilderImpl.java @@ -19,7 +19,9 @@ package org.apache.curator.framework.recipes.cache; +import com.google.common.base.Preconditions; import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.utils.Compatibility; import java.util.concurrent.Executor; import java.util.function.Consumer; @@ -69,6 +71,23 @@ class CuratorCacheBuilderImpl implements CuratorCacheBuilder @Override public CuratorCache build() { + return internalBuild(null); + } + + @Override + public CuratorCacheBridge buildBridge(boolean cacheData) + { + Preconditions.checkArgument(storage == null, "Custom CuratorCacheStorage is not supported by the TreeCache bridge"); + if ( Compatibility.hasPersistentWatchers() ) + { + return internalBuild(cacheData ? null : CuratorCacheStorage.bytesNotCached()); + } + Preconditions.checkArgument(exceptionHandler == null, "ExceptionHandler is not supported by the TreeCache bridge"); + return new CompatibleCuratorCacheBridge(client, path, options, executor, cacheData); + } + + private CuratorCacheImpl internalBuild(CuratorCacheStorage storage) + { return new CuratorCacheImpl(client, storage, path, options, executor, exceptionHandler); } } \ No newline at end of file diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/CuratorCacheImpl.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/CuratorCacheImpl.java index ee95570..acbc754 100644 --- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/CuratorCacheImpl.java +++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/CuratorCacheImpl.java @@ -39,12 +39,13 @@ import java.util.concurrent.Executor; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; import java.util.function.Consumer; +import java.util.stream.Stream; import static org.apache.curator.framework.recipes.cache.CuratorCacheListener.Type.*; import static org.apache.zookeeper.KeeperException.Code.NONODE; import static org.apache.zookeeper.KeeperException.Code.OK; -class CuratorCacheImpl implements CuratorCache +class CuratorCacheImpl implements CuratorCache, CuratorCacheBridge { private final Logger log = LoggerFactory.getLogger(getClass()); private final AtomicReference<State> state = new AtomicReference<>(State.LATENT); @@ -112,6 +113,12 @@ class CuratorCacheImpl implements CuratorCache } @Override + public Stream<ChildData> streamImmediateChildren(String fromParent) + { + return storage.streamImmediateChildren(fromParent); + } + + @Override public Listenable<CuratorCacheListener> listenable() { return listenerManager; diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/CuratorCacheListenerBuilder.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/CuratorCacheListenerBuilder.java index c57e881..9381546 100644 --- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/CuratorCacheListenerBuilder.java +++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/CuratorCacheListenerBuilder.java @@ -22,6 +22,7 @@ package org.apache.curator.framework.recipes.cache; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.recipes.cache.CuratorCacheListener.Type; import java.util.function.Consumer; +import java.util.function.Predicate; public interface CuratorCacheListenerBuilder { @@ -88,9 +89,11 @@ public interface CuratorCacheListenerBuilder * * @param client the curator client * @param listener the listener to wrap - * @return a CuratorCacheListener that forwards to the given listener + * @param basePath the path used as the root in the cache. Only events with a parent path matching this + * base path are sent to the listener + * @return this */ - CuratorCacheListenerBuilder forPathChildrenCache(CuratorFramework client, PathChildrenCacheListener listener); + CuratorCacheListenerBuilder forPathChildrenCache(CuratorFramework client, PathChildrenCacheListener listener, String basePath); /** * Bridge listener. You can reuse old-style {@link org.apache.curator.framework.recipes.cache.TreeCacheListener}s @@ -101,7 +104,7 @@ public interface CuratorCacheListenerBuilder * * @param client the curator client * @param listener the listener to wrap - * @return a CuratorCacheListener that forwards to the given listener + * @return this */ CuratorCacheListenerBuilder forTreeCache(CuratorFramework client, TreeCacheListener listener); @@ -110,17 +113,28 @@ public interface CuratorCacheListenerBuilder * with CuratorCache. * * @param listener the listener to wrap - * @return a CuratorCacheListener that forwards to the given listener + * @return this */ CuratorCacheListenerBuilder forNodeCache(NodeCacheListener listener); /** * Make the built listener so that it only becomes active once {@link CuratorCacheListener#initialized()} has been called. * i.e. changes that occur as the cache is initializing are not sent to the listener + * + * @return this */ CuratorCacheListenerBuilder afterInitialized(); /** + * Make the built listener so that it is only called for paths that return true when applied + * to the given filter. + * + * @param pathFilter path filter + * @return this + */ + CuratorCacheListenerBuilder withPathFilter(Predicate<String> pathFilter); + + /** * Build and return a new listener based on the methods that have been previously called * * @return new listener diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/CuratorCacheListenerBuilderImpl.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/CuratorCacheListenerBuilderImpl.java index 4873868..479ba34 100644 --- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/CuratorCacheListenerBuilderImpl.java +++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/CuratorCacheListenerBuilderImpl.java @@ -20,14 +20,17 @@ package org.apache.curator.framework.recipes.cache; import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.utils.ZKPaths; import java.util.ArrayList; import java.util.List; import java.util.function.Consumer; +import java.util.function.Predicate; class CuratorCacheListenerBuilderImpl implements CuratorCacheListenerBuilder { private final List<CuratorCacheListener> listeners = new ArrayList<>(); private boolean afterInitializedOnly = false; + private Predicate<String> pathFilter = __ -> true; @Override public CuratorCacheListenerBuilder forAll(CuratorCacheListener listener) @@ -106,8 +109,9 @@ class CuratorCacheListenerBuilderImpl implements CuratorCacheListenerBuilder } @Override - public CuratorCacheListenerBuilder forPathChildrenCache(CuratorFramework client, PathChildrenCacheListener listener) + public CuratorCacheListenerBuilder forPathChildrenCache(CuratorFramework client, PathChildrenCacheListener listener, String basePath) { + pathFilter = p -> ZKPaths.getPathAndNode(p).getPath().equals(basePath); listeners.add(new PathChildrenCacheListenerWrapper(client, listener)); return this; } @@ -134,6 +138,13 @@ class CuratorCacheListenerBuilderImpl implements CuratorCacheListenerBuilder } @Override + public CuratorCacheListenerBuilder withPathFilter(Predicate<String> pathFilter) + { + this.pathFilter = (pathFilter != null) ? (p -> (p == null) || pathFilter.test(p)) : (__ -> true); + return this; + } + + @Override public CuratorCacheListener build() { List<CuratorCacheListener> copy = new ArrayList<>(listeners); @@ -146,7 +157,11 @@ class CuratorCacheListenerBuilderImpl implements CuratorCacheListenerBuilder { if ( isInitialized ) { - copy.forEach(l -> l.event(type, oldData, data)); + ChildData filterData = (data != null) ? data : oldData; + if ( pathFilter.test(filterData.getPath()) ) + { + copy.forEach(l -> l.event(type, oldData, data)); + } } } diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/CuratorCacheStorage.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/CuratorCacheStorage.java index e809263..30f2b81 100644 --- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/CuratorCacheStorage.java +++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/CuratorCacheStorage.java @@ -92,6 +92,7 @@ public interface CuratorCacheStorage /** * Return a stream over the storage entries that are the immediate children of the given node. * + * @param fromParent the parent node - determines the children returned in the stream * @return stream over entries */ Stream<ChildData> streamImmediateChildren(String fromParent); diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/PathChildrenCacheListenerWrapper.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/PathChildrenCacheListenerWrapper.java index a9123c1..7e9730c 100644 --- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/PathChildrenCacheListenerWrapper.java +++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/PathChildrenCacheListenerWrapper.java @@ -39,19 +39,19 @@ class PathChildrenCacheListenerWrapper implements CuratorCacheListener { case NODE_CREATED: { - sendEvent(data, PathChildrenCacheEvent.Type.CHILD_ADDED); + sendEvent(new PathChildrenCacheEvent(PathChildrenCacheEvent.Type.CHILD_ADDED, data)); break; } case NODE_CHANGED: { - sendEvent(data, PathChildrenCacheEvent.Type.CHILD_UPDATED); + sendEvent(new PathChildrenCacheEvent(PathChildrenCacheEvent.Type.CHILD_UPDATED, data)); break; } case NODE_DELETED: { - sendEvent(oldData, PathChildrenCacheEvent.Type.CHILD_REMOVED); + sendEvent(new PathChildrenCacheEvent(PathChildrenCacheEvent.Type.CHILD_REMOVED, oldData)); break; } } @@ -60,12 +60,11 @@ class PathChildrenCacheListenerWrapper implements CuratorCacheListener @Override public void initialized() { - sendEvent(null, PathChildrenCacheEvent.Type.INITIALIZED); + sendEvent(new PathChildrenCacheEvent(PathChildrenCacheEvent.Type.INITIALIZED, null)); } - private void sendEvent(ChildData node, PathChildrenCacheEvent.Type type) + private void sendEvent(PathChildrenCacheEvent event) { - PathChildrenCacheEvent event = new PathChildrenCacheEvent(type, node); try { listener.childEvent(client, event); diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/TreeCache.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/TreeCache.java index e2f3a8b..121e72c 100644 --- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/TreeCache.java +++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/TreeCache.java @@ -52,6 +52,7 @@ import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.Executor; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.ThreadFactory; @@ -78,6 +79,7 @@ import static org.apache.curator.utils.PathUtils.validatePath; public class TreeCache implements Closeable { private static final Logger LOG = LoggerFactory.getLogger(TreeCache.class); + private final Executor executor; private final boolean createParentNodes; private final boolean disableZkWatches; private final TreeCacheSelector selector; @@ -89,6 +91,7 @@ public class TreeCache implements Closeable private boolean cacheData = true; private boolean dataIsCompressed = false; private ExecutorService executorService = null; + private Executor executor = null; private int maxDepth = Integer.MAX_VALUE; private boolean createParentNodes = false; private boolean disableZkWatches = false; @@ -105,12 +108,12 @@ public class TreeCache implements Closeable */ public TreeCache build() { - ExecutorService executor = executorService; - if ( executor == null ) + ExecutorService localExecutorService = executorService; + if ( (localExecutorService == null) && (executor == null) ) { - executor = Executors.newSingleThreadExecutor(defaultThreadFactory); + localExecutorService = Executors.newSingleThreadExecutor(defaultThreadFactory); } - return new TreeCache(client, path, cacheData, dataIsCompressed, maxDepth, executor, createParentNodes, disableZkWatches, selector); + return new TreeCache(client, path, cacheData, dataIsCompressed, maxDepth, localExecutorService, executor, createParentNodes, disableZkWatches, selector); } /** @@ -149,6 +152,15 @@ public class TreeCache implements Closeable } /** + * Sets the executor to publish events; a default executor will be created if not specified. + */ + public Builder setExecutor(Executor executor) + { + this.executor = checkNotNull(executor); + return this; + } + + /** * Sets the maximum depth to explore/watch. A {@code maxDepth} of {@code 0} will watch only * the root node (like {@link NodeCache}); a {@code maxDepth} of {@code 1} will watch the * root node and its immediate children (kind of like {@link PathChildrenCache}. @@ -564,7 +576,7 @@ public class TreeCache implements Closeable */ public TreeCache(CuratorFramework client, String path) { - this(client, path, true, false, Integer.MAX_VALUE, Executors.newSingleThreadExecutor(defaultThreadFactory), false, false, new DefaultTreeCacheSelector()); + this(client, path, true, false, Integer.MAX_VALUE, Executors.newSingleThreadExecutor(defaultThreadFactory), null, false, false, new DefaultTreeCacheSelector()); } /** @@ -573,12 +585,14 @@ public class TreeCache implements Closeable * @param cacheData if true, node contents are cached in addition to the stat * @param dataIsCompressed if true, data in the path is compressed * @param executorService Closeable ExecutorService to use for the TreeCache's background thread + * @param executor executor to use for the TreeCache's background thread * @param createParentNodes true to create parent nodes as containers * @param disableZkWatches true to disable Zookeeper watches * @param selector the selector to use */ - TreeCache(CuratorFramework client, String path, boolean cacheData, boolean dataIsCompressed, int maxDepth, final ExecutorService executorService, boolean createParentNodes, boolean disableZkWatches, TreeCacheSelector selector) + TreeCache(CuratorFramework client, String path, boolean cacheData, boolean dataIsCompressed, int maxDepth, final ExecutorService executorService, final Executor executor, boolean createParentNodes, boolean disableZkWatches, TreeCacheSelector selector) { + this.executor = executor; this.createParentNodes = createParentNodes; this.selector = Preconditions.checkNotNull(selector, "selector cannot be null"); this.root = new TreeNode(validatePath(path), null); @@ -588,7 +602,7 @@ public class TreeCache implements Closeable this.dataIsCompressed = dataIsCompressed; this.maxDepth = maxDepth; this.disableZkWatches = disableZkWatches; - this.executorService = Preconditions.checkNotNull(executorService, "executorService cannot be null"); + this.executorService = executorService; } /** @@ -623,7 +637,10 @@ public class TreeCache implements Closeable client.removeWatchers(); client.getConnectionStateListenable().removeListener(connectionStateListener); listeners.clear(); - executorService.shutdown(); + if ( executorService != null ) + { + executorService.shutdown(); + } try { root.wasDeleted(); @@ -857,8 +874,9 @@ public class TreeCache implements Closeable { if ( treeState.get() != TreeState.CLOSED ) { + Executor localExecutor = (executorService != null) ? executorService : executor; LOG.debug("publishEvent: {}", event); - executorService.submit(new Runnable() + localExecutor.execute(new Runnable() { @Override public void run() diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/nodes/GroupMember.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/nodes/GroupMember.java index 8cd1f65..b80c71e 100644 --- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/nodes/GroupMember.java +++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/nodes/GroupMember.java @@ -20,16 +20,19 @@ package org.apache.curator.framework.recipes.nodes; import com.google.common.base.Preconditions; import com.google.common.base.Throwables; -import com.google.common.collect.ImmutableMap; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; -import org.apache.curator.framework.recipes.cache.ChildData; -import org.apache.curator.framework.recipes.cache.PathChildrenCache; +import org.apache.curator.framework.recipes.cache.CuratorCache; +import org.apache.curator.framework.recipes.cache.CuratorCacheBridge; import org.apache.curator.utils.CloseableUtils; import org.apache.curator.utils.ThreadUtils; import org.apache.curator.utils.ZKPaths; +import org.apache.zookeeper.CreateMode; import java.io.Closeable; +import java.util.AbstractMap; +import java.util.HashMap; import java.util.Map; +import java.util.stream.Collectors; /** * Group membership management. Adds this instance into a group and @@ -37,8 +40,9 @@ import java.util.Map; */ public class GroupMember implements Closeable { - private final PersistentEphemeralNode pen; - private final PathChildrenCache cache; + private final PersistentNode pen; + private final CuratorCacheBridge cache; + private final String membershipPath; private final String thisId; /** @@ -59,9 +63,10 @@ public class GroupMember implements Closeable */ public GroupMember(CuratorFramework client, String membershipPath, String thisId, byte[] payload) { + this.membershipPath = membershipPath; this.thisId = Preconditions.checkNotNull(thisId, "thisId cannot be null"); - cache = newPathChildrenCache(client, membershipPath); + cache = newCache(client, membershipPath); pen = newPersistentEphemeralNode(client, membershipPath, thisId, payload); } @@ -119,19 +124,11 @@ public class GroupMember implements Closeable */ public Map<String, byte[]> getCurrentMembers() { - ImmutableMap.Builder<String, byte[]> builder = ImmutableMap.builder(); - boolean thisIdAdded = false; - for ( ChildData data : cache.getCurrentData() ) - { - String id = idFromPath(data.getPath()); - thisIdAdded = thisIdAdded || id.equals(thisId); - builder.put(id, data.getData()); - } - if ( !thisIdAdded ) - { - builder.put(thisId, pen.getData()); // this instance is always a member - } - return builder.build(); + Map<String, byte[]> map = new HashMap<>(); + map.put(thisId, pen.getData()); + return cache.streamImmediateChildren(membershipPath) + .map(data -> new AbstractMap.SimpleEntry<>(idFromPath(data.getPath()), data.getData())) + .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue, (k, v) -> v, () -> map)); } /** @@ -145,13 +142,13 @@ public class GroupMember implements Closeable return ZKPaths.getNodeFromPath(path); } - protected PersistentEphemeralNode newPersistentEphemeralNode(CuratorFramework client, String membershipPath, String thisId, byte[] payload) + protected PersistentNode newPersistentEphemeralNode(CuratorFramework client, String membershipPath, String thisId, byte[] payload) { - return new PersistentEphemeralNode(client, PersistentEphemeralNode.Mode.EPHEMERAL, ZKPaths.makePath(membershipPath, thisId), payload); + return new PersistentNode(client, CreateMode.EPHEMERAL, false, ZKPaths.makePath(membershipPath, thisId), payload); } - protected PathChildrenCache newPathChildrenCache(CuratorFramework client, String membershipPath) + protected CuratorCacheBridge newCache(CuratorFramework client, String membershipPath) { - return new PathChildrenCache(client, membershipPath, true); + return CuratorCache.builder(client, membershipPath).buildBridge(true); } } diff --git a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestCuratorCacheWrappers.java b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestCuratorCacheWrappers.java index 4a75acf..edfecb5 100644 --- a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestCuratorCacheWrappers.java +++ b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestCuratorCacheWrappers.java @@ -56,7 +56,7 @@ public class TestCuratorCacheWrappers extends CuratorTestBase events.offer(event.getType()); } }; - cache.listenable().addListener(builder().forPathChildrenCache(client, listener).build()); + cache.listenable().addListener(builder().forPathChildrenCache(client, listener, "/test").build()); cache.start(); client.create().forPath("/test/one", "hey there".getBytes()); diff --git a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/nodes/TestGroupMember.java b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/nodes/TestGroupMember.java index 2da051f..85a2097 100644 --- a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/nodes/TestGroupMember.java +++ b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/nodes/TestGroupMember.java @@ -25,11 +25,13 @@ import org.apache.curator.framework.CuratorFrameworkFactory; import org.apache.curator.retry.RetryOneTime; import org.apache.curator.test.BaseClassForTests; import org.apache.curator.test.Timing; +import org.apache.curator.test.compatibility.CuratorTestBase; import org.apache.curator.utils.CloseableUtils; import org.testng.Assert; import org.testng.annotations.Test; import java.util.Map; +@Test(groups = CuratorTestBase.zk35CompatibilityGroup) public class TestGroupMember extends BaseClassForTests { // NOTE - don't need many tests as this class is just a wrapper around two existing recipes diff --git a/curator-test-zk35/pom.xml b/curator-test-zk35/pom.xml index 5803893..4ca8356 100644 --- a/curator-test-zk35/pom.xml +++ b/curator-test-zk35/pom.xml @@ -89,6 +89,18 @@ <dependency> <groupId>org.apache.curator</groupId> + <artifactId>curator-x-discovery</artifactId> + <exclusions> + <exclusion> + <groupId>org.apache.zookeeper</groupId> + <artifactId>zookeeper</artifactId> + </exclusion> + </exclusions> + <scope>test</scope> + </dependency> + + <dependency> + <groupId>org.apache.curator</groupId> <artifactId>curator-recipes</artifactId> <exclusions> <exclusion> @@ -114,6 +126,32 @@ </dependency> <dependency> + <groupId>org.apache.curator</groupId> + <artifactId>curator-x-async</artifactId> + <exclusions> + <exclusion> + <groupId>org.apache.zookeeper</groupId> + <artifactId>zookeeper</artifactId> + </exclusion> + </exclusions> + <type>test-jar</type> + <scope>test</scope> + </dependency> + + <dependency> + <groupId>org.apache.curator</groupId> + <artifactId>curator-x-discovery</artifactId> + <exclusions> + <exclusion> + <groupId>org.apache.zookeeper</groupId> + <artifactId>zookeeper</artifactId> + </exclusion> + </exclusions> + <type>test-jar</type> + <scope>test</scope> + </dependency> + + <dependency> <groupId>org.testng</groupId> <artifactId>testng</artifactId> <scope>test</scope> @@ -124,6 +162,12 @@ <artifactId>slf4j-log4j12</artifactId> <scope>test</scope> </dependency> + + <dependency> + <groupId>com.fasterxml.jackson.core</groupId> + <artifactId>jackson-databind</artifactId> + <scope>test</scope> + </dependency> </dependencies> <build> @@ -151,6 +195,8 @@ <dependenciesToScan> <dependency>org.apache.curator:curator-framework</dependency> <dependency>org.apache.curator:curator-recipes</dependency> + <dependency>org.apache.curator:curator-x-async</dependency> + <dependency>org.apache.curator:curator-x-discovery</dependency> </dependenciesToScan> <groups>zk35,zk35Compatibility</groups> <excludedGroups>zk36</excludedGroups> diff --git a/curator-x-async/pom.xml b/curator-x-async/pom.xml index 5ffd774..a32dbd3 100644 --- a/curator-x-async/pom.xml +++ b/curator-x-async/pom.xml @@ -49,4 +49,19 @@ <scope>test</scope> </dependency> </dependencies> + + <build> + <plugins> + <plugin> + <artifactId>maven-jar-plugin</artifactId> + <executions> + <execution> + <goals> + <goal>test-jar</goal> + </goals> + </execution> + </executions> + </plugin> + </plugins> + </build> </project> \ No newline at end of file diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/CachedModeledFrameworkImpl.java b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/CachedModeledFrameworkImpl.java index c897b4e..2dd5625 100644 --- a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/CachedModeledFrameworkImpl.java +++ b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/CachedModeledFrameworkImpl.java @@ -37,7 +37,6 @@ import org.apache.zookeeper.server.DataTree; import java.util.List; import java.util.Map; import java.util.Optional; -import java.util.concurrent.Executor; import java.util.concurrent.ExecutorService; import java.util.function.Function; import java.util.function.Supplier; @@ -47,18 +46,16 @@ class CachedModeledFrameworkImpl<T> implements CachedModeledFramework<T> { private final ModeledFramework<T> client; private final ModeledCacheImpl<T> cache; - private final Executor executor; CachedModeledFrameworkImpl(ModeledFramework<T> client, ExecutorService executor) { - this(client, new ModeledCacheImpl<>(client.unwrap().unwrap(), client.modelSpec(), executor), executor); + this(client, new ModeledCacheImpl<>(client.unwrap().unwrap(), client.modelSpec(), executor)); } - private CachedModeledFrameworkImpl(ModeledFramework<T> client, ModeledCacheImpl<T> cache, Executor executor) + private CachedModeledFrameworkImpl(ModeledFramework<T> client, ModeledCacheImpl<T> cache) { this.client = client; this.cache = cache; - this.executor = executor; } @Override @@ -118,7 +115,7 @@ class CachedModeledFrameworkImpl<T> implements CachedModeledFramework<T> @Override public CachedModeledFramework<T> child(Object child) { - return new CachedModeledFrameworkImpl<>(client.child(child), cache, executor); + return new CachedModeledFrameworkImpl<>(client.child(child), cache); } @Override @@ -130,7 +127,7 @@ class CachedModeledFrameworkImpl<T> implements CachedModeledFramework<T> @Override public CachedModeledFramework<T> withPath(ZPath path) { - return new CachedModeledFrameworkImpl<>(client.withPath(path), cache, executor); + return new CachedModeledFrameworkImpl<>(client.withPath(path), cache); } @Override diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/ModeledCacheImpl.java b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/ModeledCacheImpl.java index b95e92d..6fe866d 100644 --- a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/ModeledCacheImpl.java +++ b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/ModeledCacheImpl.java @@ -19,34 +19,42 @@ package org.apache.curator.x.async.modeled.details; import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.framework.EnsureContainers; import org.apache.curator.framework.listen.Listenable; import org.apache.curator.framework.listen.ListenerContainer; -import org.apache.curator.framework.recipes.cache.TreeCache; +import org.apache.curator.framework.recipes.cache.CuratorCache; +import org.apache.curator.framework.recipes.cache.CuratorCacheBridge; +import org.apache.curator.framework.recipes.cache.CuratorCacheBuilder; +import org.apache.curator.framework.recipes.cache.CuratorCacheListener; import org.apache.curator.framework.recipes.cache.TreeCacheEvent; import org.apache.curator.framework.recipes.cache.TreeCacheListener; import org.apache.curator.utils.ThreadUtils; +import org.apache.curator.utils.ZKPaths; import org.apache.curator.x.async.api.CreateOption; import org.apache.curator.x.async.modeled.ModelSerializer; import org.apache.curator.x.async.modeled.ModelSpec; +import org.apache.curator.x.async.modeled.ZNode; import org.apache.curator.x.async.modeled.ZPath; import org.apache.curator.x.async.modeled.cached.ModeledCache; import org.apache.curator.x.async.modeled.cached.ModeledCacheListener; -import org.apache.curator.x.async.modeled.ZNode; import org.apache.zookeeper.data.Stat; import java.util.AbstractMap; import java.util.Map; import java.util.Optional; +import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutorService; import java.util.stream.Collectors; class ModeledCacheImpl<T> implements TreeCacheListener, ModeledCache<T> { - private final TreeCache cache; + private final CuratorCacheBridge cache; private final Map<ZPath, Entry<T>> entries = new ConcurrentHashMap<>(); private final ModelSerializer<T> serializer; private final ListenerContainer<ModeledCacheListener<T>> listenerContainer = new ListenerContainer<>(); private final ZPath basePath; + private final CuratorFramework client; + private final Set<CreateOption> options; private static final class Entry<T> { @@ -62,6 +70,7 @@ class ModeledCacheImpl<T> implements TreeCacheListener, ModeledCache<T> ModeledCacheImpl(CuratorFramework client, ModelSpec<T> modelSpec, ExecutorService executor) { + this.client = client; if ( !modelSpec.path().isResolved() && !modelSpec.path().isRoot() && modelSpec.path().parent().isResolved() ) { modelSpec = modelSpec.parent(); // i.e. the last item is a parameter @@ -69,19 +78,40 @@ class ModeledCacheImpl<T> implements TreeCacheListener, ModeledCache<T> basePath = modelSpec.path(); this.serializer = modelSpec.serializer(); - cache = TreeCache.newBuilder(client, basePath.fullPath()) - .setCacheData(false) - .setDataIsCompressed(modelSpec.createOptions().contains(CreateOption.compress)) - .setExecutor(executor) - .setCreateParentNodes(modelSpec.createOptions().contains(CreateOption.createParentsIfNeeded) || modelSpec.createOptions().contains(CreateOption.createParentsAsContainers)) - .build(); + options = modelSpec.createOptions(); + CuratorCacheBuilder builder = CuratorCache.builder(client, basePath.fullPath()); + if ( modelSpec.createOptions().contains(CreateOption.compress) ) + { + builder.withOptions(CuratorCache.Options.COMPRESSED_DATA); + } + if ( executor != null ) + { + builder.withExecutor(executor); + } + cache = builder.buildBridge(false); } public void start() { + CuratorCacheListener listener = CuratorCacheListener.builder() + .forTreeCache(client, this) + .withPathFilter(path -> !path.equals(basePath.fullPath())) + .build(); try { - cache.getListenable().addListener(this); + if ( options.contains(CreateOption.createParentsIfNeeded) ) + { + if ( options.contains(CreateOption.createParentsAsContainers) ) + { + new EnsureContainers(client, basePath.fullPath()).ensure(); + } + else + { + ZKPaths.mkdirs(client.getZookeeperClient().getZooKeeper(), basePath.fullPath(), false, null, false); + } + } + + cache.listenable().addListener(listener); cache.start(); } catch ( Exception e ) @@ -92,7 +122,6 @@ class ModeledCacheImpl<T> implements TreeCacheListener, ModeledCache<T> public void close() { - cache.getListenable().removeListener(this); cache.close(); entries.clear(); } @@ -159,16 +188,13 @@ class ModeledCacheImpl<T> implements TreeCacheListener, ModeledCache<T> case NODE_UPDATED: { ZPath path = ZPath.parse(event.getData().getPath()); - if ( !path.equals(basePath) ) + byte[] bytes = event.getData().getData(); + if ( (bytes != null) && (bytes.length > 0) ) // otherwise it's probably just a parent node being created { - byte[] bytes = event.getData().getData(); - if ( (bytes != null) && (bytes.length > 0) ) // otherwise it's probably just a parent node being created - { - T model = serializer.deserialize(bytes); - entries.put(path, new Entry<>(event.getData().getStat(), model)); - ModeledCacheListener.Type type = (event.getType() == TreeCacheEvent.Type.NODE_ADDED) ? ModeledCacheListener.Type.NODE_ADDED : ModeledCacheListener.Type.NODE_UPDATED; - accept(type, path, event.getData().getStat(), model); - } + T model = serializer.deserialize(bytes); + entries.put(path, new Entry<>(event.getData().getStat(), model)); + ModeledCacheListener.Type type = (event.getType() == TreeCacheEvent.Type.NODE_ADDED) ? ModeledCacheListener.Type.NODE_ADDED : ModeledCacheListener.Type.NODE_UPDATED; + accept(type, path, event.getData().getStat(), model); } break; } @@ -176,13 +202,10 @@ class ModeledCacheImpl<T> implements TreeCacheListener, ModeledCache<T> case NODE_REMOVED: { ZPath path = ZPath.parse(event.getData().getPath()); - if ( !path.equals(basePath) ) - { - Entry<T> entry = entries.remove(path); - T model = (entry != null) ? entry.model : serializer.deserialize(event.getData().getData()); - Stat stat = (entry != null) ? entry.stat : event.getData().getStat(); - accept(ModeledCacheListener.Type.NODE_REMOVED, path, stat, model); - } + Entry<T> entry = entries.remove(path); + T model = (entry != null) ? entry.model : serializer.deserialize(event.getData().getData()); + Stat stat = (entry != null) ? entry.stat : event.getData().getStat(); + accept(ModeledCacheListener.Type.NODE_REMOVED, path, stat, model); break; } diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/ModeledFrameworkImpl.java b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/ModeledFrameworkImpl.java index dbbf3cb..799845f 100644 --- a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/ModeledFrameworkImpl.java +++ b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/ModeledFrameworkImpl.java @@ -113,14 +113,14 @@ public class ModeledFrameworkImpl<T> implements ModeledFramework<T> @Override public CachedModeledFramework<T> cached() { - return cached(ThreadUtils.newSingleThreadExecutor("CachedModeledFramework")); + return cached(null); } @Override public CachedModeledFramework<T> cached(ExecutorService executor) { Preconditions.checkState(!isWatched, "CachedModeledFramework cannot be used with watched instances as the internal cache would bypass the watchers."); - return new CachedModeledFrameworkImpl<>(this, Objects.requireNonNull(executor, "executor cannot be null")); + return new CachedModeledFrameworkImpl<>(this, null); } @Override diff --git a/curator-x-async/src/test/java/org/apache/curator/x/async/modeled/TestCachedModeledFramework.java b/curator-x-async/src/test/java/org/apache/curator/x/async/modeled/TestCachedModeledFramework.java index 2d33c13..830ee2b 100644 --- a/curator-x-async/src/test/java/org/apache/curator/x/async/modeled/TestCachedModeledFramework.java +++ b/curator-x-async/src/test/java/org/apache/curator/x/async/modeled/TestCachedModeledFramework.java @@ -21,6 +21,7 @@ package org.apache.curator.x.async.modeled; import com.google.common.collect.Sets; import org.apache.curator.framework.state.ConnectionState; import org.apache.curator.test.Timing; +import org.apache.curator.test.compatibility.CuratorTestBase; import org.apache.curator.x.async.modeled.cached.CachedModeledFramework; import org.apache.curator.x.async.modeled.cached.ModeledCacheListener; import org.apache.curator.x.async.modeled.models.TestModel; @@ -38,6 +39,7 @@ import java.util.function.Function; import java.util.stream.Collectors; import java.util.stream.Stream; +@Test(groups = CuratorTestBase.zk35CompatibilityGroup) public class TestCachedModeledFramework extends TestModeledFrameworkBase { @Test diff --git a/curator-x-async/src/test/java/org/apache/curator/x/async/modeled/TestModeledFrameworkBase.java b/curator-x-async/src/test/java/org/apache/curator/x/async/modeled/TestModeledFrameworkBase.java index 61a4570..5660539 100644 --- a/curator-x-async/src/test/java/org/apache/curator/x/async/modeled/TestModeledFrameworkBase.java +++ b/curator-x-async/src/test/java/org/apache/curator/x/async/modeled/TestModeledFrameworkBase.java @@ -37,7 +37,7 @@ public class TestModeledFrameworkBase extends CompletableBaseClassForTests protected ModelSpec<TestNewerModel> newModelSpec; protected AsyncCuratorFramework async; - @BeforeMethod + @BeforeMethod(alwaysRun = true) @Override public void setup() throws Exception { @@ -54,7 +54,7 @@ public class TestModeledFrameworkBase extends CompletableBaseClassForTests newModelSpec = ModelSpec.builder(path, newSerializer).build(); } - @AfterMethod + @AfterMethod(alwaysRun = true) @Override public void teardown() throws Exception { diff --git a/curator-x-discovery/pom.xml b/curator-x-discovery/pom.xml index 824231d..50b2dea 100644 --- a/curator-x-discovery/pom.xml +++ b/curator-x-discovery/pom.xml @@ -80,4 +80,19 @@ <scope>test</scope> </dependency> </dependencies> + + <build> + <plugins> + <plugin> + <artifactId>maven-jar-plugin</artifactId> + <executions> + <execution> + <goals> + <goal>test-jar</goal> + </goals> + </execution> + </executions> + </plugin> + </plugins> + </build> </project> diff --git a/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/ServiceCache.java b/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/ServiceCache.java index a122d69..270005e 100644 --- a/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/ServiceCache.java +++ b/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/ServiceCache.java @@ -23,6 +23,7 @@ import org.apache.curator.x.discovery.details.InstanceProvider; import org.apache.curator.x.discovery.details.ServiceCacheListener; import java.io.Closeable; import java.util.List; +import java.util.concurrent.CountDownLatch; public interface ServiceCache<T> extends Closeable, Listenable<ServiceCacheListener>, InstanceProvider<T> { @@ -33,12 +34,25 @@ public interface ServiceCache<T> extends Closeable, Listenable<ServiceCacheListe * * @return the list */ - public List<ServiceInstance<T>> getInstances(); + List<ServiceInstance<T>> getInstances(); /** - * The cache must be started before use + * The cache must be started before use. This method blocks while the internal + * cache is loaded. * * @throws Exception errors */ - public void start() throws Exception; + void start() throws Exception; + + /** + * The cache must be started before use. This version returns immediately. + * Use the returned latch to block until the cache is loaded + * + * @return a latch that can be used to block until the cache is loaded + * @throws Exception errors + */ + default CountDownLatch startImmediate() throws Exception + { + throw new UnsupportedOperationException(); + } } diff --git a/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/ServiceProvider.java b/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/ServiceProvider.java index f542ed3..d09885b 100644 --- a/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/ServiceProvider.java +++ b/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/ServiceProvider.java @@ -23,6 +23,7 @@ import org.apache.curator.x.discovery.details.InstanceProvider; import java.io.Closeable; import java.io.IOException; import java.util.Collection; +import java.util.concurrent.CountDownLatch; /** * The main API for Discovery. This class is essentially a facade over a {@link ProviderStrategy} @@ -31,11 +32,24 @@ import java.util.Collection; public interface ServiceProvider<T> extends Closeable { /** - * The provider must be started before use + * The provider must be started before use. This method blocks while the internal + * cache is loaded. * * @throws Exception any errors */ - public void start() throws Exception; + void start() throws Exception; + + /** + * The provider must be started before use. This version returns immediately. + * Use the returned latch to block until the cache is loaded + * + * @return a latch that can be used to block until the cache is loaded + * @throws Exception errors + */ + default CountDownLatch startImmediate() throws Exception + { + throw new UnsupportedOperationException(); + } /** * Return an instance for a single use. <b>IMPORTANT: </b> users @@ -44,7 +58,7 @@ public interface ServiceProvider<T> extends Closeable * @return the instance to use * @throws Exception any errors */ - public ServiceInstance<T> getInstance() throws Exception; + ServiceInstance<T> getInstance() throws Exception; /** * Return the current available set of instances <b>IMPORTANT: </b> users @@ -53,7 +67,7 @@ public interface ServiceProvider<T> extends Closeable * @return all known instances * @throws Exception any errors */ - public Collection<ServiceInstance<T>> getAllInstances() throws Exception; + Collection<ServiceInstance<T>> getAllInstances() throws Exception; /** * Take note of an error connecting to the given instance. The instance will potentially @@ -61,7 +75,7 @@ public interface ServiceProvider<T> extends Closeable * * @param instance instance that had an error */ - public void noteError(ServiceInstance<T> instance); + void noteError(ServiceInstance<T> instance); /** * Close the provider. Note: it's the provider's responsibility to close any caches it manages diff --git a/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceCacheBuilderImpl.java b/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceCacheBuilderImpl.java index 8922233..501e289 100644 --- a/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceCacheBuilderImpl.java +++ b/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceCacheBuilderImpl.java @@ -47,13 +47,13 @@ class ServiceCacheBuilderImpl<T> implements ServiceCacheBuilder<T> @Override public ServiceCache<T> build() { - if (executorService != null) + if (threadFactory != null) { - return new ServiceCacheImpl<T>(discovery, name, executorService); + return new ServiceCacheImpl<T>(discovery, name, threadFactory); } else { - return new ServiceCacheImpl<T>(discovery, name, threadFactory); + return new ServiceCacheImpl<T>(discovery, name, executorService); } } diff --git a/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceCacheImpl.java b/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceCacheImpl.java index d1a31ad..6ff68f1 100644 --- a/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceCacheImpl.java +++ b/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceCacheImpl.java @@ -16,6 +16,7 @@ * specific language governing permissions and limitations * under the License. */ + package org.apache.curator.x.discovery.details; import com.google.common.annotations.VisibleForTesting; @@ -23,14 +24,18 @@ import com.google.common.base.Function; import com.google.common.base.Preconditions; import com.google.common.collect.Lists; import com.google.common.collect.Maps; -import org.apache.curator.utils.CloseableExecutorService; -import org.apache.curator.utils.CloseableUtils; import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.framework.EnsureContainers; import org.apache.curator.framework.listen.ListenerContainer; import org.apache.curator.framework.recipes.cache.ChildData; -import org.apache.curator.framework.recipes.cache.PathChildrenCache; +import org.apache.curator.framework.recipes.cache.CuratorCache; +import org.apache.curator.framework.recipes.cache.CuratorCacheBridge; +import org.apache.curator.framework.recipes.cache.CuratorCacheBuilder; +import org.apache.curator.framework.recipes.cache.CuratorCacheListener; import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent; import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener; +import org.apache.curator.utils.CloseableExecutorService; +import org.apache.curator.utils.CloseableUtils; import org.apache.curator.utils.ZKPaths; import org.apache.curator.x.discovery.ServiceCache; import org.apache.curator.x.discovery.ServiceInstance; @@ -45,17 +50,17 @@ import java.util.concurrent.atomic.AtomicReference; public class ServiceCacheImpl<T> implements ServiceCache<T>, PathChildrenCacheListener { - private final ListenerContainer<ServiceCacheListener> listenerContainer = new ListenerContainer<ServiceCacheListener>(); - private final ServiceDiscoveryImpl<T> discovery; - private final AtomicReference<State> state = new AtomicReference<State>(State.LATENT); - private final PathChildrenCache cache; - private final ConcurrentMap<String, ServiceInstance<T>> instances = Maps.newConcurrentMap(); + private final ListenerContainer<ServiceCacheListener> listenerContainer = new ListenerContainer<ServiceCacheListener>(); + private final ServiceDiscoveryImpl<T> discovery; + private final AtomicReference<State> state = new AtomicReference<State>(State.LATENT); + private final CuratorCacheBridge cache; + private final ConcurrentMap<String, ServiceInstance<T>> instances = Maps.newConcurrentMap(); + private final CountDownLatch initializedLatch = new CountDownLatch(1); + private String path; private enum State { - LATENT, - STARTED, - STOPPED + LATENT, STARTED, STOPPED } private static CloseableExecutorService convertThreadFactory(ThreadFactory threadFactory) @@ -73,12 +78,24 @@ public class ServiceCacheImpl<T> implements ServiceCache<T>, PathChildrenCacheLi { Preconditions.checkNotNull(discovery, "discovery cannot be null"); Preconditions.checkNotNull(name, "name cannot be null"); - Preconditions.checkNotNull(executorService, "executorService cannot be null"); this.discovery = discovery; - cache = new PathChildrenCache(discovery.getClient(), discovery.pathForName(name), true, false, executorService); - cache.getListenable().addListener(this); + path = discovery.pathForName(name); + + CuratorCacheBuilder builder = CuratorCache.builder(discovery.getClient(), path); + if ( executorService != null ) + { + builder.withExecutor(executorService::submit); + } + cache = builder.buildBridge(false); + + CuratorCacheListener listener = CuratorCacheListener.builder() + .forPathChildrenCache(discovery.getClient(), this, path) + .forInitialized(this::initialized) + .withPathFilter(p -> !p.equals(path)) + .build(); + cache.listenable().addListener(listener); } @Override @@ -94,11 +111,20 @@ public class ServiceCacheImpl<T> implements ServiceCache<T>, PathChildrenCacheLi @Override public void start() throws Exception { + startImmediate().await(); + } + + @Override + public CountDownLatch startImmediate() throws Exception + { Preconditions.checkState(state.compareAndSet(State.LATENT, State.STARTED), "Cannot be started more than once"); - cache.start(true); + new EnsureContainers(discovery.getClient(), path).ensure(); + + cache.start(); if ( debugStartLatch != null ) { + initializedLatch.await(); debugStartLatch.countDown(); debugStartLatch = null; } @@ -108,14 +134,7 @@ public class ServiceCacheImpl<T> implements ServiceCache<T>, PathChildrenCacheLi debugStartWaitLatch = null; } - for ( ChildData childData : cache.getCurrentData() ) - { - if ( childData.getData() != null ) // else already processed by the cache listener - { - addInstance(childData, true); - } - } - discovery.cacheOpened(this); + return initializedLatch; } @Override @@ -123,18 +142,15 @@ public class ServiceCacheImpl<T> implements ServiceCache<T>, PathChildrenCacheLi { Preconditions.checkState(state.compareAndSet(State.STARTED, State.STOPPED), "Already closed or has not been started"); - listenerContainer.forEach - ( - new Function<ServiceCacheListener, Void>() - { - @Override - public Void apply(ServiceCacheListener listener) - { - discovery.getClient().getConnectionStateListenable().removeListener(listener); - return null; - } - } - ); + listenerContainer.forEach(new Function<ServiceCacheListener, Void>() + { + @Override + public Void apply(ServiceCacheListener listener) + { + discovery.getClient().getConnectionStateListenable().removeListener(listener); + return null; + } + }); listenerContainer.clear(); CloseableUtils.closeQuietly(cache); @@ -166,39 +182,36 @@ public class ServiceCacheImpl<T> implements ServiceCache<T>, PathChildrenCacheLi @Override public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception { - boolean notifyListeners = false; + boolean notifyListeners = false; switch ( event.getType() ) { - case CHILD_ADDED: - case CHILD_UPDATED: - { - addInstance(event.getData(), false); - notifyListeners = true; - break; - } + case CHILD_ADDED: + case CHILD_UPDATED: + { + addInstance(event.getData()); + notifyListeners = true; + break; + } - case CHILD_REMOVED: - { - instances.remove(instanceIdFromData(event.getData())); - notifyListeners = true; - break; - } + case CHILD_REMOVED: + { + instances.remove(instanceIdFromData(event.getData())); + notifyListeners = true; + break; + } } - if ( notifyListeners ) + if ( notifyListeners && (initializedLatch.getCount() == 0) ) { - listenerContainer.forEach - ( - new Function<ServiceCacheListener, Void>() + listenerContainer.forEach(new Function<ServiceCacheListener, Void>() + { + @Override + public Void apply(ServiceCacheListener listener) { - @Override - public Void apply(ServiceCacheListener listener) - { - listener.cacheChanged(); - return null; - } + listener.cacheChanged(); + return null; } - ); + }); } } @@ -207,18 +220,23 @@ public class ServiceCacheImpl<T> implements ServiceCache<T>, PathChildrenCacheLi return ZKPaths.getNodeFromPath(childData.getPath()); } - private void addInstance(ChildData childData, boolean onlyIfAbsent) throws Exception + private void addInstance(ChildData childData) { - String instanceId = instanceIdFromData(childData); - ServiceInstance<T> serviceInstance = discovery.getSerializer().deserialize(childData.getData()); - if ( onlyIfAbsent ) + try { - instances.putIfAbsent(instanceId, serviceInstance); + String instanceId = instanceIdFromData(childData); + ServiceInstance<T> serviceInstance = discovery.getSerializer().deserialize(childData.getData()); + instances.put(instanceId, serviceInstance); } - else + catch ( Exception e ) { - instances.put(instanceId, serviceInstance); + throw new RuntimeException(e); } - cache.clearDataBytes(childData.getPath(), childData.getStat().getVersion()); + } + + private void initialized() + { + discovery.cacheOpened(this); + initializedLatch.countDown(); } } diff --git a/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceDiscoveryImpl.java b/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceDiscoveryImpl.java index 476705c..13ae887 100644 --- a/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceDiscoveryImpl.java +++ b/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceDiscoveryImpl.java @@ -26,8 +26,9 @@ import com.google.common.collect.Lists; import com.google.common.collect.Maps; import com.google.common.collect.Sets; import org.apache.curator.framework.CuratorFramework; -import org.apache.curator.framework.recipes.cache.NodeCache; -import org.apache.curator.framework.recipes.cache.NodeCacheListener; +import org.apache.curator.framework.recipes.cache.CuratorCache; +import org.apache.curator.framework.recipes.cache.CuratorCacheBridge; +import org.apache.curator.framework.recipes.cache.CuratorCacheListener; import org.apache.curator.framework.state.ConnectionState; import org.apache.curator.framework.state.ConnectionStateListener; import org.apache.curator.utils.CloseableUtils; @@ -92,7 +93,7 @@ public class ServiceDiscoveryImpl<T> implements ServiceDiscovery<T> private static class Entry<T> { private volatile ServiceInstance<T> service; - private volatile NodeCache cache; + private volatile CuratorCacheBridge cache; private Entry(ServiceInstance<T> service) { @@ -277,8 +278,7 @@ public class ServiceDiscoveryImpl<T> implements ServiceDiscovery<T> @Override public ServiceCacheBuilder<T> serviceCacheBuilder() { - return new ServiceCacheBuilderImpl<T>(this) - .threadFactory(ThreadUtils.newThreadFactory("ServiceCache")); + return new ServiceCacheBuilderImpl<T>(this); } /** @@ -458,52 +458,47 @@ public class ServiceDiscoveryImpl<T> implements ServiceDiscovery<T> } } - private NodeCache makeNodeCache(final ServiceInstance<T> instance) + private CuratorCacheBridge makeNodeCache(final ServiceInstance<T> instance) { if ( !watchInstances ) { return null; } - final NodeCache nodeCache = new NodeCache(client, pathForInstance(instance.getName(), instance.getId())); - try - { - nodeCache.start(true); - } - catch ( InterruptedException e) - { - Thread.currentThread().interrupt(); - return null; - } - catch ( Exception e ) - { - log.error("Could not start node cache for: " + instance, e); - } - NodeCacheListener listener = new NodeCacheListener() - { - @Override - public void nodeChanged() throws Exception - { - if ( nodeCache.getCurrentData() != null ) + CuratorCacheBridge cache = CuratorCache.builder(client, pathForInstance(instance.getName(), instance.getId())) + .withOptions(CuratorCache.Options.SINGLE_NODE_CACHE) + .buildBridge(false); + CuratorCacheListener listener = CuratorCacheListener.builder() + .afterInitialized() + .forAll((__, ___, data) -> { + if ( data != null ) { - ServiceInstance<T> newInstance = serializer.deserialize(nodeCache.getCurrentData().getData()); - Entry<T> entry = services.get(newInstance.getId()); - if ( entry != null ) + try { - synchronized(entry) + ServiceInstance<T> newInstance = serializer.deserialize(data.getData()); + Entry<T> entry = services.get(newInstance.getId()); + if ( entry != null ) { - entry.service = newInstance; + synchronized(entry) + { + entry.service = newInstance; + } } } + catch ( Exception e ) + { + log.debug("Could not deserialize: " + data.getPath()); + } } else { log.warn("Instance data has been deleted for: " + instance); } - } - }; - nodeCache.getListenable().addListener(listener); - return nodeCache; + }) + .build(); + cache.listenable().addListener(listener); + cache.start(); + return cache; } private void internalUnregisterService(final Entry<T> entry) throws Exception diff --git a/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceProviderImpl.java b/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceProviderImpl.java index 2ab1434..a411f33 100644 --- a/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceProviderImpl.java +++ b/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceProviderImpl.java @@ -29,6 +29,7 @@ import java.io.IOException; import java.util.ArrayList; import java.util.Collection; import java.util.List; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.ThreadFactory; /** @@ -76,6 +77,13 @@ public class ServiceProviderImpl<T> implements ServiceProvider<T> discovery.providerOpened(this); } + @Override + public CountDownLatch startImmediate() throws Exception + { + discovery.providerOpened(this); + return cache.startImmediate(); + } + /** * {@inheritDoc} */ diff --git a/curator-x-discovery/src/test/java/org/apache/curator/x/discovery/TestServiceCache.java b/curator-x-discovery/src/test/java/org/apache/curator/x/discovery/TestServiceCache.java index fda5c26..2a9d2d8 100644 --- a/curator-x-discovery/src/test/java/org/apache/curator/x/discovery/TestServiceCache.java +++ b/curator-x-discovery/src/test/java/org/apache/curator/x/discovery/TestServiceCache.java @@ -27,6 +27,7 @@ import org.apache.curator.retry.RetryOneTime; import org.apache.curator.test.BaseClassForTests; import org.apache.curator.test.ExecuteCalledWatchingExecutorService; import org.apache.curator.test.Timing; +import org.apache.curator.test.compatibility.CuratorTestBase; import org.apache.curator.utils.CloseableUtils; import org.apache.curator.x.discovery.details.ServiceCacheListener; import org.testng.Assert; @@ -40,6 +41,7 @@ import java.util.concurrent.Executors; import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; +@Test(groups = CuratorTestBase.zk35CompatibilityGroup) public class TestServiceCache extends BaseClassForTests { @Test diff --git a/curator-x-discovery/src/test/java/org/apache/curator/x/discovery/details/TestServiceCacheRace.java b/curator-x-discovery/src/test/java/org/apache/curator/x/discovery/details/TestServiceCacheRace.java index 06d63b9..e9a7956 100644 --- a/curator-x-discovery/src/test/java/org/apache/curator/x/discovery/details/TestServiceCacheRace.java +++ b/curator-x-discovery/src/test/java/org/apache/curator/x/discovery/details/TestServiceCacheRace.java @@ -25,6 +25,7 @@ import org.apache.curator.framework.state.ConnectionState; import org.apache.curator.retry.RetryOneTime; import org.apache.curator.test.BaseClassForTests; import org.apache.curator.test.Timing; +import org.apache.curator.test.compatibility.CuratorTestBase; import org.apache.curator.utils.CloseableExecutorService; import org.apache.curator.utils.CloseableUtils; import org.apache.curator.x.discovery.ServiceCache; @@ -40,6 +41,7 @@ import java.util.List; import java.util.concurrent.CountDownLatch; import java.util.concurrent.Executors; +@Test(groups = CuratorTestBase.zk35CompatibilityGroup) public class TestServiceCacheRace extends BaseClassForTests { private final Timing timing = new Timing(); diff --git a/curator-x-discovery/src/test/java/org/apache/curator/x/discovery/details/TestServiceDiscovery.java b/curator-x-discovery/src/test/java/org/apache/curator/x/discovery/details/TestServiceDiscovery.java index 54719a5..d86e193 100644 --- a/curator-x-discovery/src/test/java/org/apache/curator/x/discovery/details/TestServiceDiscovery.java +++ b/curator-x-discovery/src/test/java/org/apache/curator/x/discovery/details/TestServiceDiscovery.java @@ -26,6 +26,7 @@ import org.apache.curator.framework.CuratorFrameworkFactory; import org.apache.curator.retry.RetryOneTime; import org.apache.curator.test.BaseClassForTests; import org.apache.curator.test.Timing; +import org.apache.curator.test.compatibility.CuratorTestBase; import org.apache.curator.utils.CloseableUtils; import org.apache.curator.utils.Compatibility; import org.apache.curator.x.discovery.ServiceDiscovery; @@ -39,6 +40,7 @@ import java.util.List; import java.util.concurrent.CountDownLatch; import java.util.concurrent.Semaphore; +@Test(groups = CuratorTestBase.zk35CompatibilityGroup) public class TestServiceDiscovery extends BaseClassForTests { private static final Comparator<ServiceInstance<Void>> comparator = new Comparator<ServiceInstance<Void>>() diff --git a/curator-x-discovery/src/test/java/org/apache/curator/x/discovery/details/TestWatchedInstances.java b/curator-x-discovery/src/test/java/org/apache/curator/x/discovery/details/TestWatchedInstances.java index 2d03c47..a96ff6f 100644 --- a/curator-x-discovery/src/test/java/org/apache/curator/x/discovery/details/TestWatchedInstances.java +++ b/curator-x-discovery/src/test/java/org/apache/curator/x/discovery/details/TestWatchedInstances.java @@ -24,6 +24,7 @@ import org.apache.curator.framework.CuratorFrameworkFactory; import org.apache.curator.retry.RetryOneTime; import org.apache.curator.test.BaseClassForTests; import org.apache.curator.test.Timing; +import org.apache.curator.test.compatibility.CuratorTestBase; import org.apache.curator.utils.CloseableUtils; import org.apache.curator.x.discovery.ServiceDiscovery; import org.apache.curator.x.discovery.ServiceDiscoveryBuilder; @@ -35,6 +36,7 @@ import java.util.Arrays; import java.util.Collections; import java.util.List; +@Test(groups = CuratorTestBase.zk35CompatibilityGroup) public class TestWatchedInstances extends BaseClassForTests { @Test diff --git a/pom.xml b/pom.xml index e223de6..00083c4 100644 --- a/pom.xml +++ b/pom.xml @@ -385,6 +385,13 @@ <groupId>org.apache.curator</groupId> <artifactId>curator-x-discovery</artifactId> <version>${project.version}</version> + <type>test-jar</type> + </dependency> + + <dependency> + <groupId>org.apache.curator</groupId> + <artifactId>curator-x-discovery</artifactId> + <version>${project.version}</version> </dependency> <dependency> @@ -400,6 +407,13 @@ </dependency> <dependency> + <groupId>org.apache.curator</groupId> + <artifactId>curator-x-async</artifactId> + <version>${project.version}</version> + <type>test-jar</type> + </dependency> + + <dependency> <groupId>org.apache.commons</groupId> <artifactId>commons-math</artifactId> <version>${commons-math-version}</version>