wip
Project: http://git-wip-us.apache.org/repos/asf/curator/repo Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/2ebd4569 Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/2ebd4569 Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/2ebd4569 Branch: refs/heads/CURATOR-470 Commit: 2ebd456930d5970380d68af9c6b7150a2cf24abf Parents: 6b1522c Author: randgalt <[email protected]> Authored: Sun Jun 24 10:21:04 2018 -0500 Committer: randgalt <[email protected]> Committed: Sun Jun 24 10:21:04 2018 -0500 ---------------------------------------------------------------------- .../curator/x/discovery/ServiceCache.java | 12 +- .../details/ServiceCacheEventListener.java | 10 +- .../x/discovery/details/ServiceCacheImpl.java | 140 +++++++++++-------- .../details/ServiceCacheListenerWrapper.java | 53 +++++++ .../curator/x/discovery/TestServiceCache.java | 13 +- 5 files changed, 150 insertions(+), 78 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/curator/blob/2ebd4569/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/ServiceCache.java ---------------------------------------------------------------------- diff --git a/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/ServiceCache.java b/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/ServiceCache.java index a122d69..0214bf8 100644 --- a/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/ServiceCache.java +++ b/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/ServiceCache.java @@ -20,6 +20,7 @@ package org.apache.curator.x.discovery; import org.apache.curator.framework.listen.Listenable; import org.apache.curator.x.discovery.details.InstanceProvider; +import org.apache.curator.x.discovery.details.ServiceCacheEventListener; import org.apache.curator.x.discovery.details.ServiceCacheListener; import java.io.Closeable; import java.util.List; @@ -33,12 +34,19 @@ public interface ServiceCache<T> extends Closeable, Listenable<ServiceCacheListe * * @return the list */ - public List<ServiceInstance<T>> getInstances(); + List<ServiceInstance<T>> getInstances(); /** * The cache must be started before use * * @throws Exception errors */ - public void start() throws Exception; + void start() throws Exception; + + /** + * Returns the listenable container over the newer {@link org.apache.curator.x.discovery.details.ServiceCacheEventListener} + * + * @return listenable + */ + Listenable<ServiceCacheEventListener<T>> getCacheEventListenable(); } http://git-wip-us.apache.org/repos/asf/curator/blob/2ebd4569/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceCacheEventListener.java ---------------------------------------------------------------------- diff --git a/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceCacheEventListener.java b/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceCacheEventListener.java index 1f783f3..e6345c4 100644 --- a/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceCacheEventListener.java +++ b/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceCacheEventListener.java @@ -18,27 +18,27 @@ */ package org.apache.curator.x.discovery.details; +import org.apache.curator.framework.state.ConnectionStateListener; import org.apache.curator.x.discovery.ServiceInstance; /** * Listener for events (addition/update/deletion) that happen to a service cache */ -public interface ServiceCacheEventListener<T> extends ServiceCacheListener +public interface ServiceCacheEventListener<T> extends ConnectionStateListener { - /** * Called when a new cache is added. * * @param added instance added */ - public void cacheAdded(ServiceInstance<T> added); + void cacheAdded(ServiceInstance<T> added); /** * Called when a cache is deleted. * * @param deleted instance deleted */ - public void cacheDeleted(ServiceInstance<T> deleted); + void cacheDeleted(ServiceInstance<T> deleted); /** * Called when a cache is updated. @@ -46,5 +46,5 @@ public interface ServiceCacheEventListener<T> extends ServiceCacheListener * @param old old instance * @param updated updated instance */ - public void cacheUpdated(ServiceInstance<T> old, ServiceInstance<T> updated); + void cacheUpdated(ServiceInstance<T> old, ServiceInstance<T> updated); } http://git-wip-us.apache.org/repos/asf/curator/blob/2ebd4569/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceCacheImpl.java ---------------------------------------------------------------------- diff --git a/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceCacheImpl.java b/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceCacheImpl.java index ffaf1a4..47449e8 100644 --- a/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceCacheImpl.java +++ b/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceCacheImpl.java @@ -23,6 +23,7 @@ import com.google.common.base.Function; import com.google.common.base.Preconditions; import com.google.common.collect.Lists; import com.google.common.collect.Maps; +import org.apache.curator.framework.listen.Listenable; import org.apache.curator.utils.CloseableExecutorService; import org.apache.curator.utils.CloseableUtils; import org.apache.curator.framework.CuratorFramework; @@ -36,6 +37,7 @@ import org.apache.curator.x.discovery.ServiceCache; import org.apache.curator.x.discovery.ServiceInstance; import java.io.IOException; import java.util.List; +import java.util.Objects; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.CountDownLatch; import java.util.concurrent.Executor; @@ -45,9 +47,9 @@ import java.util.concurrent.atomic.AtomicReference; public class ServiceCacheImpl<T> implements ServiceCache<T>, PathChildrenCacheListener { - private final ListenerContainer<ServiceCacheListener> listenerContainer = new ListenerContainer<ServiceCacheListener>(); + private final ListenerContainer<ServiceCacheEventListener<T>> listenerContainer = new ListenerContainer<>(); private final ServiceDiscoveryImpl<T> discovery; - private final AtomicReference<State> state = new AtomicReference<State>(State.LATENT); + private final AtomicReference<State> state = new AtomicReference<>(State.LATENT); private final PathChildrenCache cache; private final ConcurrentMap<String, ServiceInstance<T>> instances = Maps.newConcurrentMap(); @@ -125,10 +127,10 @@ public class ServiceCacheImpl<T> implements ServiceCache<T>, PathChildrenCacheLi listenerContainer.forEach ( - new Function<ServiceCacheListener, Void>() + new Function<ServiceCacheEventListener<T>, Void>() { @Override - public Void apply(ServiceCacheListener listener) + public Void apply(ServiceCacheEventListener<T> listener) { discovery.getClient().getConnectionStateListenable().removeListener(listener); return null; @@ -143,98 +145,114 @@ public class ServiceCacheImpl<T> implements ServiceCache<T>, PathChildrenCacheLi } @Override + public Listenable<ServiceCacheEventListener<T>> getCacheEventListenable() + { + return listenerContainer; + } + + @Override public void addListener(ServiceCacheListener listener) { - listenerContainer.addListener(listener); - discovery.getClient().getConnectionStateListenable().addListener(listener); + ServiceCacheListenerWrapper<T> wrapped = ServiceCacheListenerWrapper.<T>wrap(listener); + listenerContainer.addListener(wrapped); + discovery.getClient().getConnectionStateListenable().addListener(wrapped); } @Override public void addListener(ServiceCacheListener listener, Executor executor) { - listenerContainer.addListener(listener, executor); - discovery.getClient().getConnectionStateListenable().addListener(listener, executor); + ServiceCacheListenerWrapper<T> wrapped = ServiceCacheListenerWrapper.<T>wrap(listener); + listenerContainer.addListener(wrapped, executor); + discovery.getClient().getConnectionStateListenable().addListener(wrapped, executor); } @Override - public void removeListener(ServiceCacheListener listener) + public void removeListener(final ServiceCacheListener listener) { - listenerContainer.removeListener(listener); - discovery.getClient().getConnectionStateListenable().removeListener(listener); + listenerContainer.forEach + ( + new Function<ServiceCacheEventListener<T>, Void>() + { + @Override + public Void apply(ServiceCacheEventListener<T> eventListener) + { + if ( Objects.equals(ServiceCacheListenerWrapper.unwrap(eventListener), listener) ) + { + listenerContainer.removeListener(eventListener); + discovery.getClient().getConnectionStateListenable().removeListener(eventListener); + } + return null; + } + } + ); } @Override public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception { - final Tuple<T> tuple; switch ( event.getType() ) { case CHILD_ADDED: - tuple = addOrUpdateInstance(event.getData()); - listenerContainer.forEach( - new Function<ServiceCacheListener, Void>() + { + final Tuple<T> tuple = addOrUpdateInstance(event.getData()); + listenerContainer.forEach + ( + new Function<ServiceCacheEventListener<T>, Void>() { @Override - public Void apply(ServiceCacheListener listener) + public Void apply(ServiceCacheEventListener<T> listener) { - listener.cacheChanged(); + listener.cacheAdded(tuple.newInstance); + return null; + } + } + ); + break; + } - if ( listener instanceof ServiceCacheEventListener ) + case CHILD_UPDATED: + { + final Tuple<T> tuple = addOrUpdateInstance(event.getData()); + listenerContainer.forEach + ( + new Function<ServiceCacheEventListener<T>, Void>() + { + @Override + public Void apply(ServiceCacheEventListener<T> listener) + { + if ( tuple.oldInstance != null ) { - //noinspection unchecked - ((ServiceCacheEventListener) listener).cacheAdded(tuple.newInstance); + listener.cacheUpdated(tuple.oldInstance, tuple.newInstance); + } + else + { + listener.cacheAdded(tuple.newInstance); } - return null; } } ); break; - case CHILD_UPDATED: - { - tuple = addOrUpdateInstance(event.getData()); - listenerContainer.forEach( - new Function<ServiceCacheListener, Void>() - { - @Override - public Void apply(ServiceCacheListener listener) - { - listener.cacheChanged(); - - if( listener instanceof ServiceCacheEventListener ) - { - //noinspection unchecked - ((ServiceCacheEventListener) listener).cacheUpdated(tuple.oldInstance, tuple.newInstance); - } - - return null; - } - } - ); - break; } case CHILD_REMOVED: { final ServiceInstance<T> serviceInstance = instances.remove(instanceIdFromData(event.getData())); - listenerContainer.forEach( - new Function<ServiceCacheListener, Void>() - { - @Override - public Void apply(ServiceCacheListener listener) - { - listener.cacheChanged(); - - if ( listener instanceof ServiceCacheEventListener ) - { - //noinspection unchecked - ((ServiceCacheEventListener) listener).cacheDeleted(serviceInstance); - } - - return null; - } - } - ); + if ( serviceInstance != null ) + { + listenerContainer.forEach + ( + new Function<ServiceCacheEventListener<T>, Void>() + { + @Override + public Void apply(ServiceCacheEventListener<T> listener) + { + listener.cacheDeleted(serviceInstance); + return null; + } + } + ); + } break; } } http://git-wip-us.apache.org/repos/asf/curator/blob/2ebd4569/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceCacheListenerWrapper.java ---------------------------------------------------------------------- diff --git a/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceCacheListenerWrapper.java b/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceCacheListenerWrapper.java new file mode 100644 index 0000000..6f14178 --- /dev/null +++ b/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceCacheListenerWrapper.java @@ -0,0 +1,53 @@ +package org.apache.curator.x.discovery.details; + +import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.framework.state.ConnectionState; +import org.apache.curator.x.discovery.ServiceInstance; + +class ServiceCacheListenerWrapper<T> implements ServiceCacheEventListener<T> +{ + private final ServiceCacheListener listener; + + static <T> ServiceCacheListenerWrapper<T> wrap(ServiceCacheListener listener) + { + return new ServiceCacheListenerWrapper<>(listener); + } + + static ServiceCacheListener unwrap(ServiceCacheEventListener<?> eventListener) + { + if ( eventListener instanceof ServiceCacheListenerWrapper ) + { + return ((ServiceCacheListenerWrapper)eventListener).listener; + } + return null; + } + + private ServiceCacheListenerWrapper(ServiceCacheListener listener) + { + this.listener = listener; + } + + @Override + public void stateChanged(CuratorFramework client, ConnectionState newState) + { + listener.stateChanged(client, newState); + } + + @Override + public void cacheAdded(ServiceInstance<T> added) + { + listener.cacheChanged(); + } + + @Override + public void cacheDeleted(ServiceInstance<T> deleted) + { + listener.cacheChanged(); + } + + @Override + public void cacheUpdated(ServiceInstance<T> old, ServiceInstance<T> updated) + { + listener.cacheChanged(); + } +} http://git-wip-us.apache.org/repos/asf/curator/blob/2ebd4569/curator-x-discovery/src/test/java/org/apache/curator/x/discovery/TestServiceCache.java ---------------------------------------------------------------------- diff --git a/curator-x-discovery/src/test/java/org/apache/curator/x/discovery/TestServiceCache.java b/curator-x-discovery/src/test/java/org/apache/curator/x/discovery/TestServiceCache.java index 65b7cb9..70b41e3 100644 --- a/curator-x-discovery/src/test/java/org/apache/curator/x/discovery/TestServiceCache.java +++ b/curator-x-discovery/src/test/java/org/apache/curator/x/discovery/TestServiceCache.java @@ -330,10 +330,10 @@ public class TestServiceCache extends BaseClassForTests ServiceCache<String> cache = discovery.serviceCacheBuilder().name("test").build(); closeables.add(cache); - final CountDownLatch latch = new CountDownLatch(6); + final CountDownLatch latch = new CountDownLatch(3); final AtomicBoolean notifyError = new AtomicBoolean(false); - ServiceCacheListener listener = new ServiceCacheEventListener<String>() + ServiceCacheEventListener<String> listener = new ServiceCacheEventListener<String>() { @Override public void cacheAdded(final ServiceInstance<String> added) { @@ -358,18 +358,11 @@ public class TestServiceCache extends BaseClassForTests } @Override - public void cacheChanged() - { - latch.countDown(); - } - - @Override public void stateChanged(CuratorFramework client, ConnectionState newState) { - } }; - cache.addListener(listener); + cache.getCacheEventListenable().addListener(listener); cache.start(); ServiceInstance<String> instance = ServiceInstance.<String>builder().payload("before").name("test").port(10064).build();
