This is an automated email from the ASF dual-hosted git repository. randgalt pushed a commit to branch persistent-watcher-cache-zk35 in repository https://gitbox.apache.org/repos/asf/curator.git
commit fab42ae7412640f75cf2b3cb69894faf46e69648 Author: randgalt <[email protected]> AuthorDate: Mon Oct 7 15:48:57 2019 +0300 Created new replacement cache recipe, CuratorCache. Replaces TreeCache, NodeCache and PathChildrenCache when Persistent Watchers are available. --- .../recipes/cache/CuratorCacheBuilderImpl.java | 7 +- .../recipes/cache/CuratorCacheStorage.java | 1 - .../recipes/cache/CuratorTreeCacheBridge.java | 129 +++++++++++++++++++++ 3 files changed, 135 insertions(+), 2 deletions(-) diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/CuratorCacheBuilderImpl.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/CuratorCacheBuilderImpl.java index d47d779..1b5cd9c 100644 --- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/CuratorCacheBuilderImpl.java +++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/CuratorCacheBuilderImpl.java @@ -19,6 +19,7 @@ package org.apache.curator.framework.recipes.cache; import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.utils.Compatibility; import java.util.concurrent.Executor; import java.util.function.Consumer; @@ -68,6 +69,10 @@ class CuratorCacheBuilderImpl implements CuratorCacheBuilder @Override public CuratorCache build() { - return new CuratorCacheImpl(client, storage, path, options, executor, exceptionHandler); + if ( Compatibility.hasPersistentWatches() ) + { + return new CuratorCacheImpl(client, storage, path, options, executor, exceptionHandler); + } + return new CuratorTreeCacheBridge(client, path, options, executor, exceptionHandler); } } diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/CuratorCacheStorage.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/CuratorCacheStorage.java index 34b187f..427c139 100644 --- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/CuratorCacheStorage.java +++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/CuratorCacheStorage.java @@ -21,7 +21,6 @@ package org.apache.curator.framework.recipes.cache; import java.util.AbstractMap; import java.util.Map; import java.util.Optional; -import java.util.function.BiFunction; import java.util.stream.Collectors; import java.util.stream.Stream; diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/CuratorTreeCacheBridge.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/CuratorTreeCacheBridge.java new file mode 100644 index 0000000..dc15767 --- /dev/null +++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/CuratorTreeCacheBridge.java @@ -0,0 +1,129 @@ +package org.apache.curator.framework.recipes.cache; + +import com.google.common.collect.Sets; +import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.framework.listen.Listenable; +import java.util.Collections; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.Executor; +import java.util.function.Consumer; +import java.util.stream.Stream; + +class CuratorTreeCacheBridge implements CuratorCache, CuratorCacheStorage +{ + private final TreeCache treeCache; + private final String path; + + CuratorTreeCacheBridge(CuratorFramework client, String path, Options[] optionsArg, Executor executor, Consumer<Exception> exceptionHandler) + { + Set<Options> options = (optionsArg != null) ? Sets.newHashSet(optionsArg) : Collections.emptySet(); + this.path = path; + TreeCache.Builder builder = TreeCache.newBuilder(client, path); + if ( options.contains(Options.SINGLE_NODE_CACHE) ) + { + builder.setMaxDepth(0); + } + if ( options.contains(Options.COMPRESSED_DATA) ) + { + builder.setDataIsCompressed(true); + } + if ( executor != null ) + { + //builder = builder.setExecutor() + } + treeCache = builder.build(); + if ( exceptionHandler != null ) + { + treeCache.getUnhandledErrorListenable().addListener((message, e) -> { + if ( e instanceof Exception ) + { + exceptionHandler.accept((Exception)e); + } + else + { + exceptionHandler.accept(new RuntimeException(e)); + } + }); + } + treeCache.getListenable().addListener((__, event) -> callListeners(event)); + } + + @Override + public void start() + { + try + { + treeCache.start(); + } + catch ( Exception e ) + { + throw new RuntimeException(e); + } + } + + @Override + public void close() + { + treeCache.close(); + } + + @Override + public CuratorCacheStorage storage() + { + return this; + } + + @Override + public Listenable<CuratorCacheListener> listenable() + { + return null; + } + + @Override + public Optional<ChildData> put(ChildData data) + { + throw new UnsupportedOperationException("CuratorTreeCacheBridge does not support put()"); + } + + @Override + public Optional<ChildData> remove(String path) + { + throw new UnsupportedOperationException("CuratorTreeCacheBridge does not support remove()"); + } + + @Override + public Optional<ChildData> get(String path) + { + return Optional.ofNullable(treeCache.getCurrentData(path)); + } + + @Override + public int size() + { + throw new UnsupportedOperationException("CuratorTreeCacheBridge does not support size()"); + } + + @Override + public Stream<ChildData> stream() + { + throw new UnsupportedOperationException("CuratorTreeCacheBridge does not support stream()"); + } + + @Override + public Stream<ChildData> streamImmediateChildren(String fromParent) + { + return treeCache.getCurrentChildren(path).values().stream(); + } + + @Override + public void clear() + { + throw new UnsupportedOperationException("CuratorTreeCacheBridge does not support clear()"); + } + + private void callListeners(TreeCacheEvent event) + { + // TODO + } +}
