wip

Project: http://git-wip-us.apache.org/repos/asf/curator/repo
Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/2ebd4569
Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/2ebd4569
Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/2ebd4569

Branch: refs/heads/CURATOR-470
Commit: 2ebd456930d5970380d68af9c6b7150a2cf24abf
Parents: 6b1522c
Author: randgalt <[email protected]>
Authored: Sun Jun 24 10:21:04 2018 -0500
Committer: randgalt <[email protected]>
Committed: Sun Jun 24 10:21:04 2018 -0500

----------------------------------------------------------------------
 .../curator/x/discovery/ServiceCache.java       |  12 +-
 .../details/ServiceCacheEventListener.java      |  10 +-
 .../x/discovery/details/ServiceCacheImpl.java   | 140 +++++++++++--------
 .../details/ServiceCacheListenerWrapper.java    |  53 +++++++
 .../curator/x/discovery/TestServiceCache.java   |  13 +-
 5 files changed, 150 insertions(+), 78 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/curator/blob/2ebd4569/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/ServiceCache.java
----------------------------------------------------------------------
diff --git 
a/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/ServiceCache.java
 
b/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/ServiceCache.java
index a122d69..0214bf8 100644
--- 
a/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/ServiceCache.java
+++ 
b/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/ServiceCache.java
@@ -20,6 +20,7 @@ package org.apache.curator.x.discovery;
 
 import org.apache.curator.framework.listen.Listenable;
 import org.apache.curator.x.discovery.details.InstanceProvider;
+import org.apache.curator.x.discovery.details.ServiceCacheEventListener;
 import org.apache.curator.x.discovery.details.ServiceCacheListener;
 import java.io.Closeable;
 import java.util.List;
@@ -33,12 +34,19 @@ public interface ServiceCache<T> extends Closeable, 
Listenable<ServiceCacheListe
      *
      * @return the list
      */
-    public List<ServiceInstance<T>> getInstances();
+    List<ServiceInstance<T>> getInstances();
 
     /**
      * The cache must be started before use
      *
      * @throws Exception errors
      */
-    public void start() throws Exception;
+    void start() throws Exception;
+
+    /**
+     * Returns the listenable container over the newer {@link 
org.apache.curator.x.discovery.details.ServiceCacheEventListener}
+     *
+     * @return listenable
+     */
+    Listenable<ServiceCacheEventListener<T>> getCacheEventListenable();
 }

http://git-wip-us.apache.org/repos/asf/curator/blob/2ebd4569/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceCacheEventListener.java
----------------------------------------------------------------------
diff --git 
a/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceCacheEventListener.java
 
b/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceCacheEventListener.java
index 1f783f3..e6345c4 100644
--- 
a/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceCacheEventListener.java
+++ 
b/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceCacheEventListener.java
@@ -18,27 +18,27 @@
  */
 package org.apache.curator.x.discovery.details;
 
+import org.apache.curator.framework.state.ConnectionStateListener;
 import org.apache.curator.x.discovery.ServiceInstance;
 
 /**
  * Listener for events (addition/update/deletion) that happen to a service 
cache
  */
-public interface ServiceCacheEventListener<T> extends ServiceCacheListener
+public interface ServiceCacheEventListener<T> extends ConnectionStateListener
 {
-
     /**
      * Called when a new cache is added.
      *
      * @param added instance added
      */
-    public void cacheAdded(ServiceInstance<T> added);
+    void cacheAdded(ServiceInstance<T> added);
 
     /**
      * Called when a cache is deleted.
      *
      * @param deleted instance deleted
      */
-    public void cacheDeleted(ServiceInstance<T> deleted);
+    void cacheDeleted(ServiceInstance<T> deleted);
 
     /**
      * Called when a cache is updated.
@@ -46,5 +46,5 @@ public interface ServiceCacheEventListener<T> extends 
ServiceCacheListener
      * @param old old instance
      * @param updated updated instance
      */
-    public void cacheUpdated(ServiceInstance<T> old, ServiceInstance<T> 
updated);
+    void cacheUpdated(ServiceInstance<T> old, ServiceInstance<T> updated);
 }

http://git-wip-us.apache.org/repos/asf/curator/blob/2ebd4569/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceCacheImpl.java
----------------------------------------------------------------------
diff --git 
a/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceCacheImpl.java
 
b/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceCacheImpl.java
index ffaf1a4..47449e8 100644
--- 
a/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceCacheImpl.java
+++ 
b/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceCacheImpl.java
@@ -23,6 +23,7 @@ import com.google.common.base.Function;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
+import org.apache.curator.framework.listen.Listenable;
 import org.apache.curator.utils.CloseableExecutorService;
 import org.apache.curator.utils.CloseableUtils;
 import org.apache.curator.framework.CuratorFramework;
@@ -36,6 +37,7 @@ import org.apache.curator.x.discovery.ServiceCache;
 import org.apache.curator.x.discovery.ServiceInstance;
 import java.io.IOException;
 import java.util.List;
+import java.util.Objects;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.Executor;
@@ -45,9 +47,9 @@ import java.util.concurrent.atomic.AtomicReference;
 
 public class ServiceCacheImpl<T> implements ServiceCache<T>, 
PathChildrenCacheListener
 {
-    private final ListenerContainer<ServiceCacheListener>           
listenerContainer = new ListenerContainer<ServiceCacheListener>();
+    private final ListenerContainer<ServiceCacheEventListener<T>>   
listenerContainer = new ListenerContainer<>();
     private final ServiceDiscoveryImpl<T>                           discovery;
-    private final AtomicReference<State>                            state = 
new AtomicReference<State>(State.LATENT);
+    private final AtomicReference<State>                            state = 
new AtomicReference<>(State.LATENT);
     private final PathChildrenCache                                 cache;
     private final ConcurrentMap<String, ServiceInstance<T>>         instances 
= Maps.newConcurrentMap();
 
@@ -125,10 +127,10 @@ public class ServiceCacheImpl<T> implements 
ServiceCache<T>, PathChildrenCacheLi
 
         listenerContainer.forEach
             (
-                new Function<ServiceCacheListener, Void>()
+                new Function<ServiceCacheEventListener<T>, Void>()
                 {
                     @Override
-                    public Void apply(ServiceCacheListener listener)
+                    public Void apply(ServiceCacheEventListener<T> listener)
                     {
                         
discovery.getClient().getConnectionStateListenable().removeListener(listener);
                         return null;
@@ -143,98 +145,114 @@ public class ServiceCacheImpl<T> implements 
ServiceCache<T>, PathChildrenCacheLi
     }
 
     @Override
+    public Listenable<ServiceCacheEventListener<T>> getCacheEventListenable()
+    {
+        return listenerContainer;
+    }
+
+    @Override
     public void addListener(ServiceCacheListener listener)
     {
-        listenerContainer.addListener(listener);
-        
discovery.getClient().getConnectionStateListenable().addListener(listener);
+        ServiceCacheListenerWrapper<T> wrapped = 
ServiceCacheListenerWrapper.<T>wrap(listener);
+        listenerContainer.addListener(wrapped);
+        
discovery.getClient().getConnectionStateListenable().addListener(wrapped);
     }
 
     @Override
     public void addListener(ServiceCacheListener listener, Executor executor)
     {
-        listenerContainer.addListener(listener, executor);
-        
discovery.getClient().getConnectionStateListenable().addListener(listener, 
executor);
+        ServiceCacheListenerWrapper<T> wrapped = 
ServiceCacheListenerWrapper.<T>wrap(listener);
+        listenerContainer.addListener(wrapped, executor);
+        
discovery.getClient().getConnectionStateListenable().addListener(wrapped, 
executor);
     }
 
     @Override
-    public void removeListener(ServiceCacheListener listener)
+    public void removeListener(final ServiceCacheListener listener)
     {
-        listenerContainer.removeListener(listener);
-        
discovery.getClient().getConnectionStateListenable().removeListener(listener);
+        listenerContainer.forEach
+        (
+            new Function<ServiceCacheEventListener<T>, Void>()
+            {
+                @Override
+                public Void apply(ServiceCacheEventListener<T> eventListener)
+                {
+                    if ( 
Objects.equals(ServiceCacheListenerWrapper.unwrap(eventListener), listener) )
+                    {
+                        listenerContainer.removeListener(eventListener);
+                        
discovery.getClient().getConnectionStateListenable().removeListener(eventListener);
+                    }
+                    return null;
+                }
+            }
+        );
     }
 
     @Override
     public void childEvent(CuratorFramework client, PathChildrenCacheEvent 
event) throws Exception
     {
-           final Tuple<T> tuple;
            switch ( event.getType() )
         {
             case CHILD_ADDED:
-                   tuple = addOrUpdateInstance(event.getData());
-                listenerContainer.forEach(
-                    new Function<ServiceCacheListener, Void>()
+            {
+                final Tuple<T> tuple = addOrUpdateInstance(event.getData());
+                listenerContainer.forEach
+                (
+                    new Function<ServiceCacheEventListener<T>, Void>()
                     {
                         @Override
-                        public Void apply(ServiceCacheListener listener)
+                        public Void apply(ServiceCacheEventListener<T> 
listener)
                         {
-                            listener.cacheChanged();
+                            listener.cacheAdded(tuple.newInstance);
+                            return null;
+                        }
+                    }
+                );
+                break;
+            }
 
-                            if ( listener instanceof ServiceCacheEventListener 
)
+        case CHILD_UPDATED:
+            {
+                final Tuple<T> tuple = addOrUpdateInstance(event.getData());
+                listenerContainer.forEach
+                (
+                    new Function<ServiceCacheEventListener<T>, Void>()
+                    {
+                        @Override
+                        public Void apply(ServiceCacheEventListener<T> 
listener)
+                        {
+                            if ( tuple.oldInstance != null )
                             {
-                                //noinspection unchecked
-                                ((ServiceCacheEventListener) 
listener).cacheAdded(tuple.newInstance);
+                                listener.cacheUpdated(tuple.oldInstance, 
tuple.newInstance);
+                            }
+                            else
+                            {
+                                listener.cacheAdded(tuple.newInstance);
                             }
-
                             return null;
                         }
                     }
                 );
                 break;
-            case CHILD_UPDATED:
-            {
-                   tuple = addOrUpdateInstance(event.getData());
-                   listenerContainer.forEach(
-                                   new Function<ServiceCacheListener, Void>()
-                                   {
-                                           @Override
-                                           public Void 
apply(ServiceCacheListener listener)
-                                           {
-                                                   listener.cacheChanged();
-
-                                                   if( listener instanceof 
ServiceCacheEventListener )
-                                                   {
-                                    //noinspection unchecked
-                                    ((ServiceCacheEventListener) 
listener).cacheUpdated(tuple.oldInstance, tuple.newInstance);
-                                                   }
-
-                                                   return null;
-                                           }
-                                   }
-                   );
-                break;
             }
 
             case CHILD_REMOVED:
             {
                 final ServiceInstance<T> serviceInstance = 
instances.remove(instanceIdFromData(event.getData()));
-                   listenerContainer.forEach(
-                                   new Function<ServiceCacheListener, Void>()
-                                   {
-                                           @Override
-                                           public Void 
apply(ServiceCacheListener listener)
-                                           {
-                                                   listener.cacheChanged();
-
-                                                   if ( listener instanceof 
ServiceCacheEventListener )
-                                                   {
-                                    //noinspection unchecked
-                                    ((ServiceCacheEventListener) 
listener).cacheDeleted(serviceInstance);
-                                                   }
-
-                                                   return null;
-                                           }
-                                   }
-                   );
+                if ( serviceInstance != null )
+                {
+                    listenerContainer.forEach
+                    (
+                        new Function<ServiceCacheEventListener<T>, Void>()
+                        {
+                            @Override
+                            public Void apply(ServiceCacheEventListener<T> 
listener)
+                            {
+                                listener.cacheDeleted(serviceInstance);
+                                return null;
+                            }
+                        }
+                    );
+                }
                 break;
             }
         }

http://git-wip-us.apache.org/repos/asf/curator/blob/2ebd4569/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceCacheListenerWrapper.java
----------------------------------------------------------------------
diff --git 
a/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceCacheListenerWrapper.java
 
b/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceCacheListenerWrapper.java
new file mode 100644
index 0000000..6f14178
--- /dev/null
+++ 
b/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceCacheListenerWrapper.java
@@ -0,0 +1,53 @@
+package org.apache.curator.x.discovery.details;
+
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.state.ConnectionState;
+import org.apache.curator.x.discovery.ServiceInstance;
+
+class ServiceCacheListenerWrapper<T> implements ServiceCacheEventListener<T>
+{
+    private final ServiceCacheListener listener;
+
+    static <T> ServiceCacheListenerWrapper<T> wrap(ServiceCacheListener 
listener)
+    {
+        return new ServiceCacheListenerWrapper<>(listener);
+    }
+
+    static ServiceCacheListener unwrap(ServiceCacheEventListener<?> 
eventListener)
+    {
+        if ( eventListener instanceof ServiceCacheListenerWrapper )
+        {
+            return ((ServiceCacheListenerWrapper)eventListener).listener;
+        }
+        return null;
+    }
+
+    private ServiceCacheListenerWrapper(ServiceCacheListener listener)
+    {
+        this.listener = listener;
+    }
+
+    @Override
+    public void stateChanged(CuratorFramework client, ConnectionState newState)
+    {
+        listener.stateChanged(client, newState);
+    }
+
+    @Override
+    public void cacheAdded(ServiceInstance<T> added)
+    {
+        listener.cacheChanged();
+    }
+
+    @Override
+    public void cacheDeleted(ServiceInstance<T> deleted)
+    {
+        listener.cacheChanged();
+    }
+
+    @Override
+    public void cacheUpdated(ServiceInstance<T> old, ServiceInstance<T> 
updated)
+    {
+        listener.cacheChanged();
+    }
+}

http://git-wip-us.apache.org/repos/asf/curator/blob/2ebd4569/curator-x-discovery/src/test/java/org/apache/curator/x/discovery/TestServiceCache.java
----------------------------------------------------------------------
diff --git 
a/curator-x-discovery/src/test/java/org/apache/curator/x/discovery/TestServiceCache.java
 
b/curator-x-discovery/src/test/java/org/apache/curator/x/discovery/TestServiceCache.java
index 65b7cb9..70b41e3 100644
--- 
a/curator-x-discovery/src/test/java/org/apache/curator/x/discovery/TestServiceCache.java
+++ 
b/curator-x-discovery/src/test/java/org/apache/curator/x/discovery/TestServiceCache.java
@@ -330,10 +330,10 @@ public class TestServiceCache extends BaseClassForTests
             ServiceCache<String> cache = 
discovery.serviceCacheBuilder().name("test").build();
             closeables.add(cache);
 
-            final CountDownLatch latch = new CountDownLatch(6);
+            final CountDownLatch latch = new CountDownLatch(3);
 
             final AtomicBoolean notifyError = new AtomicBoolean(false);
-            ServiceCacheListener listener = new 
ServiceCacheEventListener<String>()
+            ServiceCacheEventListener<String> listener = new 
ServiceCacheEventListener<String>()
             {
                 @Override
                 public void cacheAdded(final ServiceInstance<String> added) {
@@ -358,18 +358,11 @@ public class TestServiceCache extends BaseClassForTests
                 }
 
                 @Override
-                public void cacheChanged()
-                {
-                    latch.countDown();
-                }
-
-                @Override
                 public void stateChanged(CuratorFramework client, 
ConnectionState newState)
                 {
-
                 }
             };
-            cache.addListener(listener);
+            cache.getCacheEventListenable().addListener(listener);
             cache.start();
 
             ServiceInstance<String> instance = 
ServiceInstance.<String>builder().payload("before").name("test").port(10064).build();

Reply via email to