This is an automated email from the ASF dual-hosted git repository. randgalt pushed a commit to branch persistent-watcher-bridge in repository https://gitbox.apache.org/repos/asf/curator.git
commit 492eac6b397f9fb2d4ffd7148ef2b45234b548c4 Author: randgalt <[email protected]> AuthorDate: Thu Oct 24 22:05:25 2019 +0300 wip --- .../framework/recipes/cache/CacheBridge.java | 39 ++++++++++ .../recipes/cache/CuratorCacheBridge.java | 63 ++++++++++++++++ .../framework/recipes/cache/NodeCacheBridge.java | 82 ++++++++++++++++++++ .../recipes/cache/PathChildrenCacheBridge.java | 88 ++++++++++++++++++++++ .../framework/recipes/cache/TreeCacheBridge.java | 78 +++++++++++++++++++ .../details/CachedModeledFrameworkImpl.java | 11 +-- .../x/async/modeled/details/ModeledCacheImpl.java | 14 +++- 7 files changed, 364 insertions(+), 11 deletions(-) diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/CacheBridge.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/CacheBridge.java new file mode 100644 index 0000000..b56f873 --- /dev/null +++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/CacheBridge.java @@ -0,0 +1,39 @@ +package org.apache.curator.framework.recipes.cache; + +import org.apache.curator.framework.CuratorFramework; +import java.io.Closeable; +import java.util.Optional; +import java.util.concurrent.ExecutorService; + +public interface CacheBridge extends Closeable +{ + void addListener(TreeCacheListener listener); + + void addListener(NodeCacheListener listener); + + void addListener(PathChildrenCacheListener listener); + + void start(); + + Optional<ChildData> currentData(); + + Optional<ChildData> currentData(String fullPath); + + @Override + void close(); + + static CacheBridge newPathChildrenCacheBridge(CuratorFramework client, String path, boolean cacheData) + { + return PathChildrenCacheBridge.build(client, path, cacheData); + } + + static CacheBridge newTreeCacheBridge(CuratorFramework client, String path, boolean cacheData, boolean compressedData, boolean createParentsIfNeeded, boolean createParentsAsContainers, ExecutorService executor) + { + return TreeCacheBridge.build(client, path, cacheData, compressedData, createParentsIfNeeded, createParentsAsContainers, executor); + } + + static CacheBridge newNodeCacheBridge(CuratorFramework client, String path) + { + return NodeCacheBridge.build(client, path); + } +} diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/CuratorCacheBridge.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/CuratorCacheBridge.java new file mode 100644 index 0000000..ddf663d --- /dev/null +++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/CuratorCacheBridge.java @@ -0,0 +1,63 @@ +package org.apache.curator.framework.recipes.cache; + +import org.apache.curator.framework.CuratorFramework; +import java.util.Optional; + +class CuratorCacheBridge implements CacheBridge +{ + private final CuratorCache cache; + private final CuratorFramework client; + + CuratorCacheBridge(CuratorFramework client, CuratorCacheStorage storage, String path, CuratorCache.Options... options) + { + this.client = client; + cache = CuratorCache.build(client, storage, path, options); + } + + @Override + public void addListener(TreeCacheListener listener) + { + cache.listenable().addListener(CuratorCacheListener.builder().forTreeCache(client, listener).build()); + } + + @Override + public void addListener(NodeCacheListener listener) + { + cache.listenable().addListener(CuratorCacheListener.builder().forNodeCache(listener).build()); + } + + @Override + public void addListener(PathChildrenCacheListener listener) + { + cache.listenable().addListener(CuratorCacheListener.builder().forPathChildrenCache(client, listener).build()); + } + + @Override + public void start() + { + cache.start(); + } + + @Override + public Optional<ChildData> currentData() + { + return cache.getRootData(); + } + + @Override + public Optional<ChildData> currentData(String fullPath) + { + return cache.storage().get(fullPath); + } + + @Override + public void close() + { + cache.close(); + } + + CuratorCache cache() + { + return cache; + } +} diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/NodeCacheBridge.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/NodeCacheBridge.java new file mode 100644 index 0000000..b4debd3 --- /dev/null +++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/NodeCacheBridge.java @@ -0,0 +1,82 @@ +package org.apache.curator.framework.recipes.cache; + +import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.utils.Compatibility; +import java.io.IOException; +import java.util.Optional; + +@SuppressWarnings("deprecation") +class NodeCacheBridge implements CacheBridge +{ + private final NodeCache cache; + + static CacheBridge build(CuratorFramework client, String path) + { + if ( Compatibility.hasPersistentWatches() ) + { + return new CuratorCacheBridge(client, CuratorCacheStorage.standard(), path); + } + return new NodeCacheBridge(client, path); + } + + private NodeCacheBridge(CuratorFramework client, String path) + { + cache = new NodeCache(client, path); + } + + @Override + public void addListener(TreeCacheListener listener) + { + throw new UnsupportedOperationException(); + } + + @Override + public void addListener(NodeCacheListener listener) + { + cache.getListenable().addListener(listener); + } + + @Override + public void addListener(PathChildrenCacheListener listener) + { + throw new UnsupportedOperationException(); + } + + @Override + public void start() + { + try + { + cache.start(); + } + catch ( Exception e ) + { + throw new RuntimeException(e); + } + } + + @Override + public Optional<ChildData> currentData() + { + return Optional.ofNullable(cache.getCurrentData()); + } + + @Override + public Optional<ChildData> currentData(String fullPath) + { + throw new UnsupportedOperationException(); + } + + @Override + public void close() + { + try + { + cache.close(); + } + catch ( IOException e ) + { + throw new RuntimeException(e); + } + } +} diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/PathChildrenCacheBridge.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/PathChildrenCacheBridge.java new file mode 100644 index 0000000..cf1b4e0 --- /dev/null +++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/PathChildrenCacheBridge.java @@ -0,0 +1,88 @@ +package org.apache.curator.framework.recipes.cache; + +import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.utils.Compatibility; +import org.apache.curator.utils.ZKPaths; +import java.io.IOException; +import java.util.Optional; + +import static org.apache.curator.framework.recipes.cache.CuratorCache.Options.RECURSIVE; + +@SuppressWarnings("deprecation") +class PathChildrenCacheBridge implements CacheBridge +{ + private final PathChildrenCache cache; + + static CacheBridge build(CuratorFramework client, String path, boolean cacheData) + { + if ( Compatibility.hasPersistentWatches() ) + { + CuratorCacheStorage storage = cacheData ? CuratorCacheStorage.standard() : CuratorCacheStorage.bytesNotCached(); + CuratorCacheBridge curatorCache = new CuratorCacheBridge(client, storage, path, RECURSIVE); + curatorCache.cache().setPathFilter(filtering -> ZKPaths.getPathAndNode(filtering).getPath().equals(path)); // mimic PathChildrenCache which only caches the children of the main path + return curatorCache; + } + return new PathChildrenCacheBridge(client, path, cacheData); + } + + private PathChildrenCacheBridge(CuratorFramework client, String path, boolean cacheData) + { + cache = new PathChildrenCache(client, path, cacheData); + } + + @Override + public void addListener(TreeCacheListener listener) + { + throw new UnsupportedOperationException(); + } + + @Override + public void addListener(NodeCacheListener listener) + { + throw new UnsupportedOperationException(); + } + + @Override + public void addListener(PathChildrenCacheListener listener) + { + cache.getListenable().addListener(listener); + } + + @Override + public void start() + { + try + { + cache.start(); + } + catch ( Exception e ) + { + throw new RuntimeException(e); + } + } + + @Override + public Optional<ChildData> currentData() + { + throw new UnsupportedOperationException(); + } + + @Override + public Optional<ChildData> currentData(String fullPath) + { + return Optional.ofNullable(cache.getCurrentData(fullPath)); + } + + @Override + public void close() + { + try + { + cache.close(); + } + catch ( IOException e ) + { + throw new RuntimeException(); + } + } +} diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/TreeCacheBridge.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/TreeCacheBridge.java new file mode 100644 index 0000000..1b173b8 --- /dev/null +++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/TreeCacheBridge.java @@ -0,0 +1,78 @@ +package org.apache.curator.framework.recipes.cache; + +import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.utils.Compatibility; +import java.util.Optional; +import java.util.concurrent.ExecutorService; + +import static org.apache.curator.framework.recipes.cache.CuratorCache.Options.RECURSIVE; + +@SuppressWarnings("deprecation") +class TreeCacheBridge implements CacheBridge +{ + private final TreeCache cache; + + static CacheBridge build(CuratorFramework client, String path, boolean cacheData, boolean createParentsIfNeeded, boolean createParentsAsContainers, ExecutorService executor) + { + if ( Compatibility.hasPersistentWatches() ) + { + CuratorCacheStorage storage = cacheData ? CuratorCacheStorage.standard() : CuratorCacheStorage.bytesNotCached(); + return new CuratorCacheBridge(client, storage, path, RECURSIVE); + } + return new TreeCacheBridge(client, path, cacheData); + } + + private TreeCacheBridge(CuratorFramework client, String path, boolean cacheData) + { + cache = TreeCache.newBuilder(client, path).setCacheData(cacheData).build(); + } + + @Override + public void addListener(TreeCacheListener listener) + { + cache.getListenable().addListener(listener); + } + + @Override + public void addListener(NodeCacheListener listener) + { + throw new UnsupportedOperationException(); + } + + @Override + public void addListener(PathChildrenCacheListener listener) + { + throw new UnsupportedOperationException(); + } + + @Override + public void start() + { + try + { + cache.start(); + } + catch ( Exception e ) + { + throw new RuntimeException(e); + } + } + + @Override + public Optional<ChildData> currentData() + { + throw new UnsupportedOperationException(); + } + + @Override + public Optional<ChildData> currentData(String fullPath) + { + return Optional.ofNullable(cache.getCurrentData(fullPath)); + } + + @Override + public void close() + { + cache.close(); + } +} diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/CachedModeledFrameworkImpl.java b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/CachedModeledFrameworkImpl.java index c897b4e..2dd5625 100644 --- a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/CachedModeledFrameworkImpl.java +++ b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/CachedModeledFrameworkImpl.java @@ -37,7 +37,6 @@ import org.apache.zookeeper.server.DataTree; import java.util.List; import java.util.Map; import java.util.Optional; -import java.util.concurrent.Executor; import java.util.concurrent.ExecutorService; import java.util.function.Function; import java.util.function.Supplier; @@ -47,18 +46,16 @@ class CachedModeledFrameworkImpl<T> implements CachedModeledFramework<T> { private final ModeledFramework<T> client; private final ModeledCacheImpl<T> cache; - private final Executor executor; CachedModeledFrameworkImpl(ModeledFramework<T> client, ExecutorService executor) { - this(client, new ModeledCacheImpl<>(client.unwrap().unwrap(), client.modelSpec(), executor), executor); + this(client, new ModeledCacheImpl<>(client.unwrap().unwrap(), client.modelSpec(), executor)); } - private CachedModeledFrameworkImpl(ModeledFramework<T> client, ModeledCacheImpl<T> cache, Executor executor) + private CachedModeledFrameworkImpl(ModeledFramework<T> client, ModeledCacheImpl<T> cache) { this.client = client; this.cache = cache; - this.executor = executor; } @Override @@ -118,7 +115,7 @@ class CachedModeledFrameworkImpl<T> implements CachedModeledFramework<T> @Override public CachedModeledFramework<T> child(Object child) { - return new CachedModeledFrameworkImpl<>(client.child(child), cache, executor); + return new CachedModeledFrameworkImpl<>(client.child(child), cache); } @Override @@ -130,7 +127,7 @@ class CachedModeledFrameworkImpl<T> implements CachedModeledFramework<T> @Override public CachedModeledFramework<T> withPath(ZPath path) { - return new CachedModeledFrameworkImpl<>(client.withPath(path), cache, executor); + return new CachedModeledFrameworkImpl<>(client.withPath(path), cache); } @Override diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/ModeledCacheImpl.java b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/ModeledCacheImpl.java index b95e92d..68baf44 100644 --- a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/ModeledCacheImpl.java +++ b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/ModeledCacheImpl.java @@ -21,6 +21,7 @@ package org.apache.curator.x.async.modeled.details; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.listen.Listenable; import org.apache.curator.framework.listen.ListenerContainer; +import org.apache.curator.framework.recipes.cache.CacheBridge; import org.apache.curator.framework.recipes.cache.TreeCache; import org.apache.curator.framework.recipes.cache.TreeCacheEvent; import org.apache.curator.framework.recipes.cache.TreeCacheListener; @@ -42,7 +43,7 @@ import java.util.stream.Collectors; class ModeledCacheImpl<T> implements TreeCacheListener, ModeledCache<T> { - private final TreeCache cache; + private final CacheBridge cache; private final Map<ZPath, Entry<T>> entries = new ConcurrentHashMap<>(); private final ModelSerializer<T> serializer; private final ListenerContainer<ModeledCacheListener<T>> listenerContainer = new ListenerContainer<>(); @@ -69,19 +70,25 @@ class ModeledCacheImpl<T> implements TreeCacheListener, ModeledCache<T> basePath = modelSpec.path(); this.serializer = modelSpec.serializer(); - cache = TreeCache.newBuilder(client, basePath.fullPath()) + boolean compress = modelSpec.createOptions().contains(CreateOption.compress); + boolean createParentsIfNeeded = modelSpec.createOptions().contains(CreateOption.createParentsIfNeeded); + boolean createParentsAsContainers = modelSpec.createOptions().contains(CreateOption.createParentsAsContainers); + cache = CacheBridge.newTreeCacheBridge(client, basePath.fullPath(), false, compress, createParentsIfNeeded, createParentsAsContainers, executor); +/* + TreeCache.newBuilder(client, basePath.fullPath()) .setCacheData(false) .setDataIsCompressed(modelSpec.createOptions().contains(CreateOption.compress)) .setExecutor(executor) .setCreateParentNodes(modelSpec.createOptions().contains(CreateOption.createParentsIfNeeded) || modelSpec.createOptions().contains(CreateOption.createParentsAsContainers)) .build(); +*/ } public void start() { try { - cache.getListenable().addListener(this); + cache.addListener(this); cache.start(); } catch ( Exception e ) @@ -92,7 +99,6 @@ class ModeledCacheImpl<T> implements TreeCacheListener, ModeledCache<T> public void close() { - cache.getListenable().removeListener(this); cache.close(); entries.clear(); }
