[ARIES-1774] Centralize Zookeeper logic in ZookeeperEndpointRepository

Project: http://git-wip-us.apache.org/repos/asf/aries-rsa/repo
Commit: http://git-wip-us.apache.org/repos/asf/aries-rsa/commit/f07ee8b5
Tree: http://git-wip-us.apache.org/repos/asf/aries-rsa/tree/f07ee8b5
Diff: http://git-wip-us.apache.org/repos/asf/aries-rsa/diff/f07ee8b5

Branch: refs/heads/master
Commit: f07ee8b59e226579a55c3463329abf912a2ae291
Parents: fa57fb2
Author: Christian Schneider <cschn...@adobe.com>
Authored: Wed Feb 7 16:52:07 2018 +0100
Committer: Christian Schneider <cschn...@adobe.com>
Committed: Thu Feb 8 17:40:34 2018 +0100

----------------------------------------------------------------------
 .../discovery/zookeeper/ZooKeeperDiscovery.java |   7 +-
 .../publish/PublishingEndpointListener.java     | 178 ++------------
 .../repository/ZookeeperEndpointRepository.java | 239 +++++++++++++++++++
 .../zookeeper/subscribe/InterfaceMonitor.java   |   4 +-
 .../subscribe/InterfaceMonitorManager.java      |   4 +-
 .../rsa/discovery/zookeeper/util/Utils.java     |  57 -----
 .../publish/PublishingEndpointListenerTest.java |  23 +-
 .../ZookeeperEndpointRepositoryTest.java        | 116 +++++++++
 .../subscribe/InterfaceMonitorTest.java         |   4 +-
 .../rsa/discovery/zookeeper/util/UtilsTest.java |  37 ---
 10 files changed, 383 insertions(+), 286 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/f07ee8b5/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/ZooKeeperDiscovery.java
----------------------------------------------------------------------
diff --git 
a/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/ZooKeeperDiscovery.java
 
b/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/ZooKeeperDiscovery.java
index c8b020d..d265a22 100644
--- 
a/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/ZooKeeperDiscovery.java
+++ 
b/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/ZooKeeperDiscovery.java
@@ -26,6 +26,7 @@ import java.util.HashMap;
 import java.util.Map;
 
 import 
org.apache.aries.rsa.discovery.zookeeper.publish.PublishingEndpointListener;
+import 
org.apache.aries.rsa.discovery.zookeeper.repository.ZookeeperEndpointRepository;
 import 
org.apache.aries.rsa.discovery.zookeeper.subscribe.EndpointListenerTracker;
 import 
org.apache.aries.rsa.discovery.zookeeper.subscribe.InterfaceMonitorManager;
 import org.apache.zookeeper.WatchedEvent;
@@ -55,6 +56,8 @@ public class ZooKeeperDiscovery implements Watcher, 
ManagedService {
 
     private Dictionary<String, ?> curConfiguration;
 
+    private ZookeeperEndpointRepository repository;
+
     public ZooKeeperDiscovery(BundleContext bctx) {
         this.bctx = bctx;
     }
@@ -92,7 +95,8 @@ public class ZooKeeperDiscovery implements Watcher, 
ManagedService {
             return;
         }
         LOG.debug("starting ZookeeperDiscovery");
-        endpointListener = new PublishingEndpointListener(zkClient, bctx);
+        repository = new ZookeeperEndpointRepository(zkClient);
+        endpointListener = new PublishingEndpointListener(repository);
         endpointListener.start(bctx);
         imManager = new InterfaceMonitorManager(bctx, zkClient);
         endpointListenerTracker = new EndpointListenerTracker(bctx, imManager);
@@ -108,7 +112,6 @@ public class ZooKeeperDiscovery implements Watcher, 
ManagedService {
         closed |= close;
         if (endpointListener != null) {
             endpointListener.stop();
-            endpointListener.close();
         }
         if (endpointListenerTracker != null) {
             endpointListenerTracker.close();

http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/f07ee8b5/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/publish/PublishingEndpointListener.java
----------------------------------------------------------------------
diff --git 
a/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/publish/PublishingEndpointListener.java
 
b/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/publish/PublishingEndpointListener.java
index c3bf01f..ceef6b0 100644
--- 
a/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/publish/PublishingEndpointListener.java
+++ 
b/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/publish/PublishingEndpointListener.java
@@ -18,29 +18,11 @@
  */
 package org.apache.aries.rsa.discovery.zookeeper.publish;
 
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-import java.net.URI;
-import java.net.URISyntaxException;
-import java.net.UnknownHostException;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
 import java.util.Dictionary;
-import java.util.HashMap;
 import java.util.Hashtable;
-import java.util.List;
-import java.util.Map;
 
-import org.apache.aries.rsa.discovery.endpoint.EndpointDescriptionParser;
 import org.apache.aries.rsa.discovery.zookeeper.ZooKeeperDiscovery;
-import org.apache.aries.rsa.discovery.zookeeper.util.Utils;
-import org.apache.zookeeper.CreateMode;
-import org.apache.zookeeper.KeeperException;
-import org.apache.zookeeper.KeeperException.NoNodeException;
-import org.apache.zookeeper.KeeperException.NodeExistsException;
-import org.apache.zookeeper.ZooDefs.Ids;
-import org.apache.zookeeper.ZooKeeper;
+import 
org.apache.aries.rsa.discovery.zookeeper.repository.ZookeeperEndpointRepository;
 import org.osgi.framework.BundleContext;
 import org.osgi.framework.Constants;
 import org.osgi.framework.ServiceRegistration;
@@ -49,28 +31,23 @@ import org.osgi.service.remoteserviceadmin.EndpointEvent;
 import org.osgi.service.remoteserviceadmin.EndpointEventListener;
 import org.osgi.service.remoteserviceadmin.EndpointListener;
 import org.osgi.service.remoteserviceadmin.RemoteConstants;
-import org.osgi.util.tracker.ServiceTracker;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 /**
- * Listens for local Endpoints and publishes them to ZooKeeper.
+ * Listens for local EndpointEvents using old and new style listeners and 
publishes changes to 
+ * the ZooKeeperEndpointRepository
  */
 @SuppressWarnings("deprecation")
 public class PublishingEndpointListener implements EndpointEventListener, 
EndpointListener {
 
     private static final Logger LOG = 
LoggerFactory.getLogger(PublishingEndpointListener.class);
 
-    private final ZooKeeper zk;
-    private final List<EndpointDescription> endpoints = new 
ArrayList<EndpointDescription>();
-    private boolean closed;
-    private final EndpointDescriptionParser endpointDescriptionParser;
-
     private ServiceRegistration<?> listenerReg;
+    private ZookeeperEndpointRepository repository;
 
-    public PublishingEndpointListener(ZooKeeper zk, BundleContext bctx) {
-        this.zk = zk;
-        endpointDescriptionParser = new EndpointDescriptionParser();
+    public PublishingEndpointListener(ZookeeperEndpointRepository repository) {
+        this.repository = repository;
     }
     
     public void start(BundleContext bctx) {
@@ -109,155 +86,28 @@ public class PublishingEndpointListener implements 
EndpointEventListener, Endpoi
     
     private void endpointModified(EndpointDescription endpoint, String filter) 
{
         try {
-            modifyEndpoint(endpoint);
+            repository.modify(endpoint);
         } catch (Exception e) {
             LOG.error("Error modifying endpoint data in zookeeper for endpoint 
{}", endpoint.getId(), e);
         }
     }
 
-    private void modifyEndpoint(EndpointDescription endpoint) throws 
URISyntaxException, KeeperException, InterruptedException {
-        Collection<String> interfaces = endpoint.getInterfaces();
-        String endpointKey = getKey(endpoint);
-        Map<String, Object> props = new HashMap<String, 
Object>(endpoint.getProperties());
-
-        LOG.info("Changing endpoint in zookeeper: {}", endpoint);
-        for (String name : interfaces) {
-            String path = Utils.getZooKeeperPath(name);
-            String fullPath = path + '/' + endpointKey;
-            LOG.info("Changing ZooKeeper node for service with path {}", 
fullPath);
-            createPath(path, zk);
-            zk.setData(fullPath, getData(endpoint), -1);
-        }
-    }
-
     @Override
     public void endpointAdded(EndpointDescription endpoint, String 
matchedFilter) {
-        synchronized (endpoints) {
-            if (closed) {
-                return;
-            }
-            if (endpoints.contains(endpoint)) {
-                // TODO -> Should the published endpoint be updated here?
-                return;
-            }
-
-            try {
-                addEndpoint(endpoint);
-                endpoints.add(endpoint);
-            } catch (Exception ex) {
-                LOG.error("Exception while processing the addition of an 
endpoint.", ex);
-            }
-        }
-    }
-    
-    private byte[] getData(EndpointDescription epd) {
-        ByteArrayOutputStream bos = new ByteArrayOutputStream();
-        endpointDescriptionParser.writeEndpoint(epd, bos);
-        return bos.toByteArray();
-    }
-
-    private void addEndpoint(EndpointDescription endpoint) throws 
URISyntaxException, KeeperException,
-                                                                  
InterruptedException, IOException {
-        Collection<String> interfaces = endpoint.getInterfaces();
-        String endpointKey = getKey(endpoint);
-        LOG.info("Exporting endpoint to zookeeper: {}", endpoint);
-        for (String name : interfaces) {
-            String path = Utils.getZooKeeperPath(name);
-            String fullPath = path + '/' + endpointKey;
-            LOG.info("Creating ZooKeeper node for service with path {}", 
fullPath);
-            createPath(path, zk);
-            createEphemeralNode(fullPath, getData(endpoint));
-        }
-    }
-
-    private void createEphemeralNode(String fullPath, byte[] data) throws 
KeeperException, InterruptedException {
         try {
-            zk.create(fullPath, data, Ids.OPEN_ACL_UNSAFE, 
CreateMode.EPHEMERAL);
-        } catch (NodeExistsException nee) {
-            // this sometimes happens after a ZooKeeper node dies and the 
ephemeral node
-            // that belonged to the old session was not yet deleted. We need 
to make our
-            // session the owner of the node so it won't get deleted 
automatically -
-            // we do this by deleting and recreating it ourselves.
-            LOG.info("node for endpoint already exists, recreating: {}", 
fullPath);
-            try {
-                zk.delete(fullPath, -1);
-            } catch (NoNodeException nne) {
-                // it's a race condition, but as long as it got deleted - it's 
ok
-            }
-            zk.create(fullPath, data, Ids.OPEN_ACL_UNSAFE, 
CreateMode.EPHEMERAL);
+            repository.add(endpoint);
+        } catch (Exception ex) {
+            LOG.error("Exception while processing the addition of an 
endpoint.", ex);
         }
     }
 
     @Override
     public void endpointRemoved(EndpointDescription endpoint, String 
matchedFilter) {
-        LOG.info("Local EndpointDescription removed: {}", endpoint);
-
-        synchronized (endpoints) {
-            if (closed) {
-                return;
-            }
-            if (!endpoints.contains(endpoint)) {
-                return;
-            }
-
-            try {
-                removeEndpoint(endpoint);
-                endpoints.remove(endpoint);
-            } catch (Exception ex) {
-                LOG.error("Exception while processing the removal of an 
endpoint", ex);
-            }
-        }
-    }
-
-    private void removeEndpoint(EndpointDescription endpoint) throws 
UnknownHostException, URISyntaxException {
-        Collection<String> interfaces = endpoint.getInterfaces();
-        String endpointKey = getKey(endpoint);
-        for (String name : interfaces) {
-            String path = Utils.getZooKeeperPath(name);
-            String fullPath = path + '/' + endpointKey;
-            LOG.debug("Removing ZooKeeper node: {}", fullPath);
-            try {
-                zk.delete(fullPath, -1);
-            } catch (Exception ex) {
-                LOG.debug("Error while removing endpoint: {}", ex); // e.g. 
session expired
-            }
-        }
-    }
-
-    private static void createPath(String path, ZooKeeper zk) throws 
KeeperException, InterruptedException {
-        StringBuilder current = new StringBuilder();
-        List<String> parts = Utils.removeEmpty(Arrays.asList(path.split("/")));
-        for (String part : parts) {
-            current.append('/');
-            current.append(part);
-            try {
-                if (zk.exists(current.toString(), false) == null) {
-                    zk.create(current.toString(), new byte[0], 
Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
-                }
-            } catch (NodeExistsException nee) {
-                // it's not the first node with this path to ever exist - 
that's normal
-            }
+        try {
+            repository.remove(endpoint);
+        } catch (Exception ex) {
+            LOG.error("Exception while processing the removal of an endpoint", 
ex);
         }
     }
 
-    private static String getKey(EndpointDescription endpoint) throws 
URISyntaxException {
-        URI uri = new URI(endpoint.getId());
-        return new 
StringBuilder().append(uri.getHost()).append("#").append(uri.getPort())
-            .append("#").append(uri.getPath().replace('/', '#')).toString();
-    }
-
-    public void close() {
-        LOG.debug("closing - removing all endpoints");
-        synchronized (endpoints) {
-            closed = true;
-            for (EndpointDescription endpoint : endpoints) {
-                try {
-                    removeEndpoint(endpoint);
-                } catch (Exception ex) {
-                    LOG.error("Exception while removing endpoint during 
close", ex);
-                }
-            }
-            endpoints.clear();
-        }
-    }
 }

http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/f07ee8b5/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/repository/ZookeeperEndpointRepository.java
----------------------------------------------------------------------
diff --git 
a/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/repository/ZookeeperEndpointRepository.java
 
b/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/repository/ZookeeperEndpointRepository.java
new file mode 100644
index 0000000..2349c45
--- /dev/null
+++ 
b/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/repository/ZookeeperEndpointRepository.java
@@ -0,0 +1,239 @@
+package org.apache.aries.rsa.discovery.zookeeper.repository;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.Closeable;
+import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.net.UnknownHostException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.aries.rsa.discovery.endpoint.EndpointDescriptionParser;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.KeeperException.NoNodeException;
+import org.apache.zookeeper.KeeperException.NodeExistsException;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.Watcher.Event.EventType;
+import org.apache.zookeeper.ZooDefs.Ids;
+import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.data.Stat;
+import org.osgi.service.remoteserviceadmin.EndpointDescription;
+import org.osgi.service.remoteserviceadmin.EndpointEvent;
+import org.osgi.service.remoteserviceadmin.EndpointEventListener;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class ZookeeperEndpointRepository implements Closeable, Watcher {
+    private static final Logger LOG = 
LoggerFactory.getLogger(ZookeeperEndpointRepository.class);
+    private final ZooKeeper zk;
+    private final EndpointDescriptionParser parser;
+    private EndpointEventListener listener;
+    public static final String PATH_PREFIX = "/osgi/service_registry";
+    
+    private Map<String, EndpointDescription> nodes = new 
ConcurrentHashMap<String, EndpointDescription>();
+    
+    public ZookeeperEndpointRepository(ZooKeeper zk) {
+        this(zk, null);
+    }
+    
+    public ZookeeperEndpointRepository(ZooKeeper zk, EndpointEventListener 
listener) {
+        this.zk = zk;
+        this.listener = listener;
+        this.parser = new EndpointDescriptionParser();
+        try {
+            createPath(PATH_PREFIX);
+        } catch (Exception e) {
+            throw new IllegalStateException("Unable to create base path");
+        }
+        // Not yet needed
+        //this.registerWatcher();
+    }
+
+    private void registerWatcher() {
+        try {
+            List<String> children = 
zk.getChildren(ZookeeperEndpointRepository.PATH_PREFIX, this);
+            System.out.println(children);
+        } catch (KeeperException e) {
+            e.printStackTrace();
+        } catch (InterruptedException e) {
+            e.printStackTrace();
+        }
+    }
+    
+    protected void notifyListener(WatchedEvent wevent) {
+        EndpointDescription ep = read(wevent.getPath());
+        if (ep != null) {
+            int type = getEndpointEventType(wevent);
+            EndpointEvent event = new EndpointEvent(type, ep);
+            listener.endpointChanged(event, null);
+        }
+    }
+    
+    private int getEndpointEventType(WatchedEvent wevent) {
+        EventType type = wevent.getType();
+        return EndpointEvent.ADDED;
+    }
+
+    /**
+     * Retrieves data from the given node and parses it into an 
EndpointDescription.
+     *
+     * @param path a node path
+     * @return endpoint found in the node or null if no endpoint was found
+     */
+    public EndpointDescription read(String path) {
+        try {
+            Stat stat = zk.exists(path, false);
+            if (stat == null || stat.getDataLength() <= 0) {
+                return null;
+            }
+            byte[] data = zk.getData(path, false, null);
+            LOG.debug("Got data for node: {}", path);
+
+            EndpointDescription endpoint = parser.readEndpoint(new 
ByteArrayInputStream(data));
+            if (endpoint != null) {
+                return endpoint;
+            }
+            LOG.warn("No Discovery information found for node: {}", path);
+        } catch (Exception e) {
+            LOG.error("Problem getting EndpointDescription from node " + path, 
e);
+        }
+        return null;
+    }
+
+    public void add(EndpointDescription endpoint) throws URISyntaxException, 
KeeperException,
+    InterruptedException, IOException {
+        Collection<String> interfaces = endpoint.getInterfaces();
+        String endpointKey = getKey(endpoint);
+    
+        LOG.info("Exporting endpoint to zookeeper: {}", endpoint);
+        for (String name : interfaces) {
+            String path = ZookeeperEndpointRepository.getZooKeeperPath(name);
+            String fullPath = path + '/' + endpointKey;
+            LOG.info("Creating ZooKeeper node for service with path {}", 
fullPath);
+            createPath(path);
+            createEphemeralNode(fullPath, getData(endpoint));
+        }
+    }
+
+    public void modify(EndpointDescription endpoint) throws 
URISyntaxException, KeeperException, InterruptedException {
+        Collection<String> interfaces = endpoint.getInterfaces();
+        String endpointKey = getKey(endpoint);
+
+        LOG.info("Changing endpoint in zookeeper: {}", endpoint);
+        for (String name : interfaces) {
+            String path = ZookeeperEndpointRepository.getZooKeeperPath(name);
+            String fullPath = path + '/' + endpointKey;
+            LOG.info("Changing ZooKeeper node for service with path {}", 
fullPath);
+            createPath(path);
+            zk.setData(fullPath, getData(endpoint), -1);
+        }
+    }
+    
+    public void remove(EndpointDescription endpoint) throws 
UnknownHostException, URISyntaxException {
+        Collection<String> interfaces = endpoint.getInterfaces();
+        String endpointKey = getKey(endpoint);
+        for (String name : interfaces) {
+            String path = ZookeeperEndpointRepository.getZooKeeperPath(name);
+            String fullPath = path + '/' + endpointKey;
+            LOG.debug("Removing ZooKeeper node: {}", fullPath);
+            try {
+                zk.delete(fullPath, -1);
+            } catch (Exception ex) {
+                LOG.debug("Error while removing endpoint: {}", ex); // e.g. 
session expired
+            }
+        }
+    }
+    
+    public List<EndpointDescription> getAll() throws KeeperException, 
InterruptedException {
+        return null;
+    }
+
+    @Override
+    public void close() throws IOException {
+
+    }
+
+    private byte[] getData(EndpointDescription epd) {
+        ByteArrayOutputStream bos = new ByteArrayOutputStream();
+        parser.writeEndpoint(epd, bos);
+        return bos.toByteArray();
+    }
+    
+    private void createEphemeralNode(String fullPath, byte[] data) throws 
KeeperException, InterruptedException {
+        try {
+            zk.create(fullPath, data, Ids.OPEN_ACL_UNSAFE, 
CreateMode.EPHEMERAL);
+        } catch (NodeExistsException nee) {
+            // this sometimes happens after a ZooKeeper node dies and the 
ephemeral node
+            // that belonged to the old session was not yet deleted. We need 
to make our
+            // session the owner of the node so it won't get deleted 
automatically -
+            // we do this by deleting and recreating it ourselves.
+            LOG.info("node for endpoint already exists, recreating: {}", 
fullPath);
+            try {
+                zk.delete(fullPath, -1);
+            } catch (NoNodeException nne) {
+                // it's a race condition, but as long as it got deleted - it's 
ok
+            }
+            zk.create(fullPath, data, Ids.OPEN_ACL_UNSAFE, 
CreateMode.EPHEMERAL);
+        }
+    }
+    
+    private void createPath(String path) throws KeeperException, 
InterruptedException {
+        StringBuilder current = new StringBuilder();
+        List<String> parts = 
ZookeeperEndpointRepository.removeEmpty(Arrays.asList(path.split("/")));
+        for (String part : parts) {
+            current.append('/');
+            current.append(part);
+            try {
+                if (zk.exists(current.toString(), false) == null) {
+                    zk.create(current.toString(), new byte[0], 
Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+                }
+            } catch (NodeExistsException nee) {
+                // it's not the first node with this path to ever exist - 
that's normal
+            }
+        }
+    }
+
+    /**
+     * Removes nulls and empty strings from the given string array.
+     *
+     * @param strings an array of strings
+     * @return a new array containing the non-null and non-empty
+     *         elements of the original array in the same order
+     */
+    public static List<String> removeEmpty(List<String> strings) {
+        List<String> result = new ArrayList<String>();
+        if (strings == null) {
+            return result;
+        }
+        for (String s : strings) {
+            if (s != null && !s.isEmpty()) {
+                result.add(s);
+            }
+        }
+        return result;
+    }
+
+    public static String getZooKeeperPath(String name) {
+        return name == null || name.isEmpty() ? PATH_PREFIX : PATH_PREFIX + 
'/' + name.replace('.', '/');
+    }
+
+    private static String getKey(EndpointDescription endpoint) throws 
URISyntaxException {
+        URI uri = new URI(endpoint.getId());
+        return new 
StringBuilder().append(uri.getHost()).append("#").append(uri.getPort())
+            .append("#").append(uri.getPath().replace('/', '#')).toString();
+    }
+
+    @Override
+    public void process(WatchedEvent event) {
+        
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/f07ee8b5/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/subscribe/InterfaceMonitor.java
----------------------------------------------------------------------
diff --git 
a/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/subscribe/InterfaceMonitor.java
 
b/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/subscribe/InterfaceMonitor.java
index 7078bb8..2c90b3c 100644
--- 
a/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/subscribe/InterfaceMonitor.java
+++ 
b/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/subscribe/InterfaceMonitor.java
@@ -25,7 +25,7 @@ import java.util.List;
 import java.util.Map;
 
 import org.apache.aries.rsa.discovery.endpoint.EndpointDescriptionParser;
-import org.apache.aries.rsa.discovery.zookeeper.util.Utils;
+import 
org.apache.aries.rsa.discovery.zookeeper.repository.ZookeeperEndpointRepository;
 import org.apache.zookeeper.AsyncCallback.StatCallback;
 import org.apache.zookeeper.KeeperException;
 import org.apache.zookeeper.KeeperException.Code;
@@ -67,7 +67,7 @@ public class InterfaceMonitor implements Watcher, 
StatCallback {
 
     public InterfaceMonitor(ZooKeeper zk, String objClass, 
EndpointEventListener endpointListener, String scope) {
         this.zk = zk;
-        this.znode = Utils.getZooKeeperPath(objClass);
+        this.znode = ZookeeperEndpointRepository.getZooKeeperPath(objClass);
         this.recursive = objClass == null || objClass.isEmpty();
         this.endpointListener = endpointListener;
         this.parser = new EndpointDescriptionParser();

http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/f07ee8b5/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/subscribe/InterfaceMonitorManager.java
----------------------------------------------------------------------
diff --git 
a/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/subscribe/InterfaceMonitorManager.java
 
b/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/subscribe/InterfaceMonitorManager.java
index 7d6e4ae..0aa98b3 100644
--- 
a/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/subscribe/InterfaceMonitorManager.java
+++ 
b/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/subscribe/InterfaceMonitorManager.java
@@ -30,7 +30,7 @@ import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 
 import org.apache.aries.rsa.discovery.zookeeper.ZooKeeperDiscovery;
-import org.apache.aries.rsa.discovery.zookeeper.util.Utils;
+import 
org.apache.aries.rsa.discovery.zookeeper.repository.ZookeeperEndpointRepository;
 import org.apache.aries.rsa.util.StringPlus;
 import org.apache.zookeeper.ZooKeeper;
 import org.osgi.framework.BundleContext;
@@ -250,7 +250,7 @@ public class InterfaceMonitorManager {
     }
 
     protected List<String> getScopes(ServiceReference<?> sref) {
-        return 
Utils.removeEmpty(StringPlus.normalize(sref.getProperty(EndpointEventListener.ENDPOINT_LISTENER_SCOPE)));
+        return 
StringPlus.normalize(sref.getProperty(EndpointEventListener.ENDPOINT_LISTENER_SCOPE));
     }
     
     public static String getObjectClass(String scope) {

http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/f07ee8b5/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/util/Utils.java
----------------------------------------------------------------------
diff --git 
a/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/util/Utils.java
 
b/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/util/Utils.java
deleted file mode 100644
index 289ae32..0000000
--- 
a/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/util/Utils.java
+++ /dev/null
@@ -1,57 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.aries.rsa.discovery.zookeeper.util;
-
-import java.util.ArrayList;
-import java.util.List;
-
-public final class Utils {
-
-    static final String PATH_PREFIX = "/osgi/service_registry";
-
-    private Utils() {
-        // never constructed
-    }
-
-    public static String getZooKeeperPath(String name) {
-        return name == null || name.isEmpty() ? PATH_PREFIX : PATH_PREFIX + 
'/' + name.replace('.', '/');
-    }
-
-    /**
-     * Removes nulls and empty strings from the given string array.
-     *
-     * @param strings an array of strings
-     * @return a new array containing the non-null and non-empty
-     *         elements of the original array in the same order
-     */
-    public static List<String> removeEmpty(List<String> strings) {
-        List<String> result = new ArrayList<String>();
-        if (strings == null) {
-            return result;
-        }
-        for (String s : strings) {
-            if (s != null && !s.isEmpty()) {
-                result.add(s);
-            }
-        }
-        return result;
-    }
-
-
-}

http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/f07ee8b5/discovery/zookeeper/src/test/java/org/apache/aries/rsa/discovery/zookeeper/publish/PublishingEndpointListenerTest.java
----------------------------------------------------------------------
diff --git 
a/discovery/zookeeper/src/test/java/org/apache/aries/rsa/discovery/zookeeper/publish/PublishingEndpointListenerTest.java
 
b/discovery/zookeeper/src/test/java/org/apache/aries/rsa/discovery/zookeeper/publish/PublishingEndpointListenerTest.java
index b7debf6..a61cf76 100644
--- 
a/discovery/zookeeper/src/test/java/org/apache/aries/rsa/discovery/zookeeper/publish/PublishingEndpointListenerTest.java
+++ 
b/discovery/zookeeper/src/test/java/org/apache/aries/rsa/discovery/zookeeper/publish/PublishingEndpointListenerTest.java
@@ -23,13 +23,13 @@ import static org.easymock.EasyMock.expect;
 import java.util.HashMap;
 import java.util.Map;
 
+import 
org.apache.aries.rsa.discovery.zookeeper.repository.ZookeeperEndpointRepository;
 import org.apache.zookeeper.CreateMode;
 import org.apache.zookeeper.KeeperException;
 import org.apache.zookeeper.ZooDefs.Ids;
 import org.apache.zookeeper.ZooKeeper;
 import org.easymock.EasyMock;
 import org.easymock.IMocksControl;
-import org.osgi.framework.BundleContext;
 import org.osgi.framework.Constants;
 import org.osgi.service.remoteserviceadmin.EndpointDescription;
 import org.osgi.service.remoteserviceadmin.EndpointEvent;
@@ -44,7 +44,6 @@ public class PublishingEndpointListenerTest extends TestCase {
     public void testEndpointRemovalAdding() throws KeeperException, 
InterruptedException {
         IMocksControl c = EasyMock.createNiceControl();
 
-        BundleContext ctx = c.createMock(BundleContext.class);
         ZooKeeper zk = c.createMock(ZooKeeper.class);
 
         String path = ENDPOINT_PATH;
@@ -53,7 +52,8 @@ public class PublishingEndpointListenerTest extends TestCase {
 
         c.replay();
 
-        PublishingEndpointListener eli = new PublishingEndpointListener(zk, 
ctx);
+        ZookeeperEndpointRepository repository = new 
ZookeeperEndpointRepository(zk);
+        PublishingEndpointListener eli = new 
PublishingEndpointListener(repository);
         EndpointDescription endpoint = createEndpoint();
         eli.endpointChanged(new EndpointEvent(EndpointEvent.ADDED, endpoint), 
null);
         eli.endpointChanged(new EndpointEvent(EndpointEvent.ADDED, endpoint), 
null); // should do nothing
@@ -63,23 +63,6 @@ public class PublishingEndpointListenerTest extends TestCase 
{
         c.verify();
     }
 
-    public void testClose() throws KeeperException, InterruptedException {
-        IMocksControl c = EasyMock.createNiceControl();
-        BundleContext ctx = c.createMock(BundleContext.class);
-        ZooKeeper zk = c.createMock(ZooKeeper.class);
-        expectCreated(zk, ENDPOINT_PATH);
-        expectDeleted(zk, ENDPOINT_PATH);
-
-        c.replay();
-
-        PublishingEndpointListener eli = new PublishingEndpointListener(zk, 
ctx);
-        EndpointDescription endpoint = createEndpoint();
-        eli.endpointChanged(new EndpointEvent(EndpointEvent.ADDED, endpoint), 
null);
-        eli.close(); // should result in zk.delete(...)
-
-        c.verify();
-    }
-
     private void expectCreated(ZooKeeper zk, String path) throws 
KeeperException, InterruptedException {
         expect(zk.create(EasyMock.eq(path), 
                          (byte[])EasyMock.anyObject(), 

http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/f07ee8b5/discovery/zookeeper/src/test/java/org/apache/aries/rsa/discovery/zookeeper/repository/ZookeeperEndpointRepositoryTest.java
----------------------------------------------------------------------
diff --git 
a/discovery/zookeeper/src/test/java/org/apache/aries/rsa/discovery/zookeeper/repository/ZookeeperEndpointRepositoryTest.java
 
b/discovery/zookeeper/src/test/java/org/apache/aries/rsa/discovery/zookeeper/repository/ZookeeperEndpointRepositoryTest.java
new file mode 100644
index 0000000..3a20f5a
--- /dev/null
+++ 
b/discovery/zookeeper/src/test/java/org/apache/aries/rsa/discovery/zookeeper/repository/ZookeeperEndpointRepositoryTest.java
@@ -0,0 +1,116 @@
+package org.apache.aries.rsa.discovery.zookeeper.repository;
+
+import static org.junit.Assert.assertEquals;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.net.ServerSocket;
+import java.net.URISyntaxException;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.server.NIOServerCnxnFactory;
+import org.apache.zookeeper.server.ServerCnxnFactory;
+import org.apache.zookeeper.server.ZooKeeperServer;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.osgi.framework.Constants;
+import org.osgi.service.remoteserviceadmin.EndpointDescription;
+import org.osgi.service.remoteserviceadmin.EndpointEvent;
+import org.osgi.service.remoteserviceadmin.EndpointEventListener;
+import org.osgi.service.remoteserviceadmin.RemoteConstants;
+
+public class ZookeeperEndpointRepositoryTest {
+    
+    private ZooKeeperServer server;
+    private ZooKeeper zk;
+    private ServerCnxnFactory factory;
+
+    @Before
+    public void before() throws IOException, InterruptedException, 
KeeperException {
+        File target = new File("target");
+        File zookeeperDir = new File(target, "zookeeper");
+        server = new ZooKeeperServer(zookeeperDir, zookeeperDir, 2000);
+        factory = new NIOServerCnxnFactory();
+        int clientPort = getClientPort();
+        factory.configure(new InetSocketAddress(clientPort), 10);
+        factory.startup(server);
+        Watcher watcher = new Watcher() {
+
+            @Override
+            public void process(WatchedEvent event) {
+                System.out.println(event);
+            }
+            
+        };
+        zk = new ZooKeeper("localhost:" + server.getClientPort(), 1000, 
watcher);
+        printNodes("/");
+    }
+    
+    private int getClientPort() throws IOException {
+        try (ServerSocket serverSocket = new ServerSocket(0)) {
+            return serverSocket.getLocalPort();
+        }
+    }
+
+    @After
+    public void after() throws InterruptedException {
+        zk.close();
+        factory.shutdown();
+    }
+
+    @Test
+    public void test() throws IOException, URISyntaxException, 
KeeperException, InterruptedException {
+        EndpointEventListener listener = new EndpointEventListener() {
+            
+            @Override
+            public void endpointChanged(EndpointEvent event, String filter) {
+                System.out.println(event);
+            }
+        };
+        ZookeeperEndpointRepository repository = new 
ZookeeperEndpointRepository(zk, listener);
+        EndpointDescription endpoint = createEndpoint();
+        repository.add(endpoint);
+        
+        String path = 
"/osgi/service_registry/java/lang/Runnable/test.de#-1##service1";
+        EndpointDescription ep2 = repository.read(path);
+        repository.close();
+    }
+    
+    @Test
+    public void testGetZooKeeperPath() {
+        assertEquals(ZookeeperEndpointRepository.PATH_PREFIX + '/' + 
"org/example/Test",
+            ZookeeperEndpointRepository.getZooKeeperPath("org.example.Test"));
+
+        // used for the recursive discovery
+        assertEquals(ZookeeperEndpointRepository.PATH_PREFIX, 
ZookeeperEndpointRepository.getZooKeeperPath(null));
+        assertEquals(ZookeeperEndpointRepository.PATH_PREFIX, 
ZookeeperEndpointRepository.getZooKeeperPath(""));
+    }
+
+    private EndpointDescription createEndpoint() {
+        Map<String, Object> props = new HashMap<>();
+        props.put(Constants.OBJECTCLASS, new String[] 
{Runnable.class.getName()});
+        props.put(RemoteConstants.ENDPOINT_ID, "http://test.de/service1";);
+        props.put(RemoteConstants.SERVICE_IMPORTED_CONFIGS, "my");
+
+        EndpointDescription endpoint = new EndpointDescription(props);
+        return endpoint;
+    }
+
+    public void printNodes(String path) throws KeeperException, 
InterruptedException {
+        List<String> children = zk.getChildren(path, false);
+        for (String child : children) {
+            String newPath = path.endsWith("/") ? path : path + "/";
+            String fullPath = newPath + child;
+            System.out.println(fullPath);
+            printNodes(fullPath);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/f07ee8b5/discovery/zookeeper/src/test/java/org/apache/aries/rsa/discovery/zookeeper/subscribe/InterfaceMonitorTest.java
----------------------------------------------------------------------
diff --git 
a/discovery/zookeeper/src/test/java/org/apache/aries/rsa/discovery/zookeeper/subscribe/InterfaceMonitorTest.java
 
b/discovery/zookeeper/src/test/java/org/apache/aries/rsa/discovery/zookeeper/subscribe/InterfaceMonitorTest.java
index 53ddbc4..e09cfbf 100644
--- 
a/discovery/zookeeper/src/test/java/org/apache/aries/rsa/discovery/zookeeper/subscribe/InterfaceMonitorTest.java
+++ 
b/discovery/zookeeper/src/test/java/org/apache/aries/rsa/discovery/zookeeper/subscribe/InterfaceMonitorTest.java
@@ -24,7 +24,7 @@ import static org.easymock.EasyMock.expectLastCall;
 
 import java.util.Collections;
 
-import org.apache.aries.rsa.discovery.zookeeper.util.Utils;
+import 
org.apache.aries.rsa.discovery.zookeeper.repository.ZookeeperEndpointRepository;
 import org.apache.zookeeper.KeeperException;
 import org.apache.zookeeper.WatchedEvent;
 import org.apache.zookeeper.Watcher.Event.EventType;
@@ -48,7 +48,7 @@ public class InterfaceMonitorTest extends TestCase {
 
         String scope = "(myProp=test)";
         String interf = "es.schaaf.test";
-        String node = Utils.getZooKeeperPath(interf);
+        String node = ZookeeperEndpointRepository.getZooKeeperPath(interf);
 
         EndpointEventListener endpointListener = 
c.createMock(EndpointEventListener.class);
         InterfaceMonitor im = new InterfaceMonitor(zk, interf, 
endpointListener, scope);

http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/f07ee8b5/discovery/zookeeper/src/test/java/org/apache/aries/rsa/discovery/zookeeper/util/UtilsTest.java
----------------------------------------------------------------------
diff --git 
a/discovery/zookeeper/src/test/java/org/apache/aries/rsa/discovery/zookeeper/util/UtilsTest.java
 
b/discovery/zookeeper/src/test/java/org/apache/aries/rsa/discovery/zookeeper/util/UtilsTest.java
deleted file mode 100644
index 4d41fb0..0000000
--- 
a/discovery/zookeeper/src/test/java/org/apache/aries/rsa/discovery/zookeeper/util/UtilsTest.java
+++ /dev/null
@@ -1,37 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.aries.rsa.discovery.zookeeper.util;
-
-import org.apache.aries.rsa.discovery.zookeeper.util.Utils;
-
-import junit.framework.TestCase;
-
-public class UtilsTest extends TestCase {
-
-    public void testGetZooKeeperPath() {
-        assertEquals(Utils.PATH_PREFIX + '/' + "org/example/Test",
-            Utils.getZooKeeperPath("org.example.Test"));
-
-        // used for the recursive discovery
-        assertEquals(Utils.PATH_PREFIX, Utils.getZooKeeperPath(null));
-        assertEquals(Utils.PATH_PREFIX, Utils.getZooKeeperPath(""));
-    }
-
-
-}

Reply via email to