[ARIES-1774] Centralize Zookeeper logic in ZookeeperEndpointRepository
Project: http://git-wip-us.apache.org/repos/asf/aries-rsa/repo Commit: http://git-wip-us.apache.org/repos/asf/aries-rsa/commit/f07ee8b5 Tree: http://git-wip-us.apache.org/repos/asf/aries-rsa/tree/f07ee8b5 Diff: http://git-wip-us.apache.org/repos/asf/aries-rsa/diff/f07ee8b5 Branch: refs/heads/master Commit: f07ee8b59e226579a55c3463329abf912a2ae291 Parents: fa57fb2 Author: Christian Schneider <[email protected]> Authored: Wed Feb 7 16:52:07 2018 +0100 Committer: Christian Schneider <[email protected]> Committed: Thu Feb 8 17:40:34 2018 +0100 ---------------------------------------------------------------------- .../discovery/zookeeper/ZooKeeperDiscovery.java | 7 +- .../publish/PublishingEndpointListener.java | 178 ++------------ .../repository/ZookeeperEndpointRepository.java | 239 +++++++++++++++++++ .../zookeeper/subscribe/InterfaceMonitor.java | 4 +- .../subscribe/InterfaceMonitorManager.java | 4 +- .../rsa/discovery/zookeeper/util/Utils.java | 57 ----- .../publish/PublishingEndpointListenerTest.java | 23 +- .../ZookeeperEndpointRepositoryTest.java | 116 +++++++++ .../subscribe/InterfaceMonitorTest.java | 4 +- .../rsa/discovery/zookeeper/util/UtilsTest.java | 37 --- 10 files changed, 383 insertions(+), 286 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/f07ee8b5/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/ZooKeeperDiscovery.java ---------------------------------------------------------------------- diff --git a/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/ZooKeeperDiscovery.java b/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/ZooKeeperDiscovery.java index c8b020d..d265a22 100644 --- a/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/ZooKeeperDiscovery.java +++ b/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/ZooKeeperDiscovery.java @@ -26,6 +26,7 @@ import java.util.HashMap; import java.util.Map; import org.apache.aries.rsa.discovery.zookeeper.publish.PublishingEndpointListener; +import org.apache.aries.rsa.discovery.zookeeper.repository.ZookeeperEndpointRepository; import org.apache.aries.rsa.discovery.zookeeper.subscribe.EndpointListenerTracker; import org.apache.aries.rsa.discovery.zookeeper.subscribe.InterfaceMonitorManager; import org.apache.zookeeper.WatchedEvent; @@ -55,6 +56,8 @@ public class ZooKeeperDiscovery implements Watcher, ManagedService { private Dictionary<String, ?> curConfiguration; + private ZookeeperEndpointRepository repository; + public ZooKeeperDiscovery(BundleContext bctx) { this.bctx = bctx; } @@ -92,7 +95,8 @@ public class ZooKeeperDiscovery implements Watcher, ManagedService { return; } LOG.debug("starting ZookeeperDiscovery"); - endpointListener = new PublishingEndpointListener(zkClient, bctx); + repository = new ZookeeperEndpointRepository(zkClient); + endpointListener = new PublishingEndpointListener(repository); endpointListener.start(bctx); imManager = new InterfaceMonitorManager(bctx, zkClient); endpointListenerTracker = new EndpointListenerTracker(bctx, imManager); @@ -108,7 +112,6 @@ public class ZooKeeperDiscovery implements Watcher, ManagedService { closed |= close; if (endpointListener != null) { endpointListener.stop(); - endpointListener.close(); } if (endpointListenerTracker != null) { endpointListenerTracker.close(); http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/f07ee8b5/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/publish/PublishingEndpointListener.java ---------------------------------------------------------------------- diff --git a/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/publish/PublishingEndpointListener.java b/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/publish/PublishingEndpointListener.java index c3bf01f..ceef6b0 100644 --- a/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/publish/PublishingEndpointListener.java +++ b/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/publish/PublishingEndpointListener.java @@ -18,29 +18,11 @@ */ package org.apache.aries.rsa.discovery.zookeeper.publish; -import java.io.ByteArrayOutputStream; -import java.io.IOException; -import java.net.URI; -import java.net.URISyntaxException; -import java.net.UnknownHostException; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collection; import java.util.Dictionary; -import java.util.HashMap; import java.util.Hashtable; -import java.util.List; -import java.util.Map; -import org.apache.aries.rsa.discovery.endpoint.EndpointDescriptionParser; import org.apache.aries.rsa.discovery.zookeeper.ZooKeeperDiscovery; -import org.apache.aries.rsa.discovery.zookeeper.util.Utils; -import org.apache.zookeeper.CreateMode; -import org.apache.zookeeper.KeeperException; -import org.apache.zookeeper.KeeperException.NoNodeException; -import org.apache.zookeeper.KeeperException.NodeExistsException; -import org.apache.zookeeper.ZooDefs.Ids; -import org.apache.zookeeper.ZooKeeper; +import org.apache.aries.rsa.discovery.zookeeper.repository.ZookeeperEndpointRepository; import org.osgi.framework.BundleContext; import org.osgi.framework.Constants; import org.osgi.framework.ServiceRegistration; @@ -49,28 +31,23 @@ import org.osgi.service.remoteserviceadmin.EndpointEvent; import org.osgi.service.remoteserviceadmin.EndpointEventListener; import org.osgi.service.remoteserviceadmin.EndpointListener; import org.osgi.service.remoteserviceadmin.RemoteConstants; -import org.osgi.util.tracker.ServiceTracker; import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** - * Listens for local Endpoints and publishes them to ZooKeeper. + * Listens for local EndpointEvents using old and new style listeners and publishes changes to + * the ZooKeeperEndpointRepository */ @SuppressWarnings("deprecation") public class PublishingEndpointListener implements EndpointEventListener, EndpointListener { private static final Logger LOG = LoggerFactory.getLogger(PublishingEndpointListener.class); - private final ZooKeeper zk; - private final List<EndpointDescription> endpoints = new ArrayList<EndpointDescription>(); - private boolean closed; - private final EndpointDescriptionParser endpointDescriptionParser; - private ServiceRegistration<?> listenerReg; + private ZookeeperEndpointRepository repository; - public PublishingEndpointListener(ZooKeeper zk, BundleContext bctx) { - this.zk = zk; - endpointDescriptionParser = new EndpointDescriptionParser(); + public PublishingEndpointListener(ZookeeperEndpointRepository repository) { + this.repository = repository; } public void start(BundleContext bctx) { @@ -109,155 +86,28 @@ public class PublishingEndpointListener implements EndpointEventListener, Endpoi private void endpointModified(EndpointDescription endpoint, String filter) { try { - modifyEndpoint(endpoint); + repository.modify(endpoint); } catch (Exception e) { LOG.error("Error modifying endpoint data in zookeeper for endpoint {}", endpoint.getId(), e); } } - private void modifyEndpoint(EndpointDescription endpoint) throws URISyntaxException, KeeperException, InterruptedException { - Collection<String> interfaces = endpoint.getInterfaces(); - String endpointKey = getKey(endpoint); - Map<String, Object> props = new HashMap<String, Object>(endpoint.getProperties()); - - LOG.info("Changing endpoint in zookeeper: {}", endpoint); - for (String name : interfaces) { - String path = Utils.getZooKeeperPath(name); - String fullPath = path + '/' + endpointKey; - LOG.info("Changing ZooKeeper node for service with path {}", fullPath); - createPath(path, zk); - zk.setData(fullPath, getData(endpoint), -1); - } - } - @Override public void endpointAdded(EndpointDescription endpoint, String matchedFilter) { - synchronized (endpoints) { - if (closed) { - return; - } - if (endpoints.contains(endpoint)) { - // TODO -> Should the published endpoint be updated here? - return; - } - - try { - addEndpoint(endpoint); - endpoints.add(endpoint); - } catch (Exception ex) { - LOG.error("Exception while processing the addition of an endpoint.", ex); - } - } - } - - private byte[] getData(EndpointDescription epd) { - ByteArrayOutputStream bos = new ByteArrayOutputStream(); - endpointDescriptionParser.writeEndpoint(epd, bos); - return bos.toByteArray(); - } - - private void addEndpoint(EndpointDescription endpoint) throws URISyntaxException, KeeperException, - InterruptedException, IOException { - Collection<String> interfaces = endpoint.getInterfaces(); - String endpointKey = getKey(endpoint); - LOG.info("Exporting endpoint to zookeeper: {}", endpoint); - for (String name : interfaces) { - String path = Utils.getZooKeeperPath(name); - String fullPath = path + '/' + endpointKey; - LOG.info("Creating ZooKeeper node for service with path {}", fullPath); - createPath(path, zk); - createEphemeralNode(fullPath, getData(endpoint)); - } - } - - private void createEphemeralNode(String fullPath, byte[] data) throws KeeperException, InterruptedException { try { - zk.create(fullPath, data, Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL); - } catch (NodeExistsException nee) { - // this sometimes happens after a ZooKeeper node dies and the ephemeral node - // that belonged to the old session was not yet deleted. We need to make our - // session the owner of the node so it won't get deleted automatically - - // we do this by deleting and recreating it ourselves. - LOG.info("node for endpoint already exists, recreating: {}", fullPath); - try { - zk.delete(fullPath, -1); - } catch (NoNodeException nne) { - // it's a race condition, but as long as it got deleted - it's ok - } - zk.create(fullPath, data, Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL); + repository.add(endpoint); + } catch (Exception ex) { + LOG.error("Exception while processing the addition of an endpoint.", ex); } } @Override public void endpointRemoved(EndpointDescription endpoint, String matchedFilter) { - LOG.info("Local EndpointDescription removed: {}", endpoint); - - synchronized (endpoints) { - if (closed) { - return; - } - if (!endpoints.contains(endpoint)) { - return; - } - - try { - removeEndpoint(endpoint); - endpoints.remove(endpoint); - } catch (Exception ex) { - LOG.error("Exception while processing the removal of an endpoint", ex); - } - } - } - - private void removeEndpoint(EndpointDescription endpoint) throws UnknownHostException, URISyntaxException { - Collection<String> interfaces = endpoint.getInterfaces(); - String endpointKey = getKey(endpoint); - for (String name : interfaces) { - String path = Utils.getZooKeeperPath(name); - String fullPath = path + '/' + endpointKey; - LOG.debug("Removing ZooKeeper node: {}", fullPath); - try { - zk.delete(fullPath, -1); - } catch (Exception ex) { - LOG.debug("Error while removing endpoint: {}", ex); // e.g. session expired - } - } - } - - private static void createPath(String path, ZooKeeper zk) throws KeeperException, InterruptedException { - StringBuilder current = new StringBuilder(); - List<String> parts = Utils.removeEmpty(Arrays.asList(path.split("/"))); - for (String part : parts) { - current.append('/'); - current.append(part); - try { - if (zk.exists(current.toString(), false) == null) { - zk.create(current.toString(), new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); - } - } catch (NodeExistsException nee) { - // it's not the first node with this path to ever exist - that's normal - } + try { + repository.remove(endpoint); + } catch (Exception ex) { + LOG.error("Exception while processing the removal of an endpoint", ex); } } - private static String getKey(EndpointDescription endpoint) throws URISyntaxException { - URI uri = new URI(endpoint.getId()); - return new StringBuilder().append(uri.getHost()).append("#").append(uri.getPort()) - .append("#").append(uri.getPath().replace('/', '#')).toString(); - } - - public void close() { - LOG.debug("closing - removing all endpoints"); - synchronized (endpoints) { - closed = true; - for (EndpointDescription endpoint : endpoints) { - try { - removeEndpoint(endpoint); - } catch (Exception ex) { - LOG.error("Exception while removing endpoint during close", ex); - } - } - endpoints.clear(); - } - } } http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/f07ee8b5/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/repository/ZookeeperEndpointRepository.java ---------------------------------------------------------------------- diff --git a/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/repository/ZookeeperEndpointRepository.java b/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/repository/ZookeeperEndpointRepository.java new file mode 100644 index 0000000..2349c45 --- /dev/null +++ b/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/repository/ZookeeperEndpointRepository.java @@ -0,0 +1,239 @@ +package org.apache.aries.rsa.discovery.zookeeper.repository; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.Closeable; +import java.io.IOException; +import java.net.URI; +import java.net.URISyntaxException; +import java.net.UnknownHostException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +import org.apache.aries.rsa.discovery.endpoint.EndpointDescriptionParser; +import org.apache.zookeeper.CreateMode; +import org.apache.zookeeper.KeeperException; +import org.apache.zookeeper.KeeperException.NoNodeException; +import org.apache.zookeeper.KeeperException.NodeExistsException; +import org.apache.zookeeper.WatchedEvent; +import org.apache.zookeeper.Watcher; +import org.apache.zookeeper.Watcher.Event.EventType; +import org.apache.zookeeper.ZooDefs.Ids; +import org.apache.zookeeper.ZooKeeper; +import org.apache.zookeeper.data.Stat; +import org.osgi.service.remoteserviceadmin.EndpointDescription; +import org.osgi.service.remoteserviceadmin.EndpointEvent; +import org.osgi.service.remoteserviceadmin.EndpointEventListener; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class ZookeeperEndpointRepository implements Closeable, Watcher { + private static final Logger LOG = LoggerFactory.getLogger(ZookeeperEndpointRepository.class); + private final ZooKeeper zk; + private final EndpointDescriptionParser parser; + private EndpointEventListener listener; + public static final String PATH_PREFIX = "/osgi/service_registry"; + + private Map<String, EndpointDescription> nodes = new ConcurrentHashMap<String, EndpointDescription>(); + + public ZookeeperEndpointRepository(ZooKeeper zk) { + this(zk, null); + } + + public ZookeeperEndpointRepository(ZooKeeper zk, EndpointEventListener listener) { + this.zk = zk; + this.listener = listener; + this.parser = new EndpointDescriptionParser(); + try { + createPath(PATH_PREFIX); + } catch (Exception e) { + throw new IllegalStateException("Unable to create base path"); + } + // Not yet needed + //this.registerWatcher(); + } + + private void registerWatcher() { + try { + List<String> children = zk.getChildren(ZookeeperEndpointRepository.PATH_PREFIX, this); + System.out.println(children); + } catch (KeeperException e) { + e.printStackTrace(); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } + + protected void notifyListener(WatchedEvent wevent) { + EndpointDescription ep = read(wevent.getPath()); + if (ep != null) { + int type = getEndpointEventType(wevent); + EndpointEvent event = new EndpointEvent(type, ep); + listener.endpointChanged(event, null); + } + } + + private int getEndpointEventType(WatchedEvent wevent) { + EventType type = wevent.getType(); + return EndpointEvent.ADDED; + } + + /** + * Retrieves data from the given node and parses it into an EndpointDescription. + * + * @param path a node path + * @return endpoint found in the node or null if no endpoint was found + */ + public EndpointDescription read(String path) { + try { + Stat stat = zk.exists(path, false); + if (stat == null || stat.getDataLength() <= 0) { + return null; + } + byte[] data = zk.getData(path, false, null); + LOG.debug("Got data for node: {}", path); + + EndpointDescription endpoint = parser.readEndpoint(new ByteArrayInputStream(data)); + if (endpoint != null) { + return endpoint; + } + LOG.warn("No Discovery information found for node: {}", path); + } catch (Exception e) { + LOG.error("Problem getting EndpointDescription from node " + path, e); + } + return null; + } + + public void add(EndpointDescription endpoint) throws URISyntaxException, KeeperException, + InterruptedException, IOException { + Collection<String> interfaces = endpoint.getInterfaces(); + String endpointKey = getKey(endpoint); + + LOG.info("Exporting endpoint to zookeeper: {}", endpoint); + for (String name : interfaces) { + String path = ZookeeperEndpointRepository.getZooKeeperPath(name); + String fullPath = path + '/' + endpointKey; + LOG.info("Creating ZooKeeper node for service with path {}", fullPath); + createPath(path); + createEphemeralNode(fullPath, getData(endpoint)); + } + } + + public void modify(EndpointDescription endpoint) throws URISyntaxException, KeeperException, InterruptedException { + Collection<String> interfaces = endpoint.getInterfaces(); + String endpointKey = getKey(endpoint); + + LOG.info("Changing endpoint in zookeeper: {}", endpoint); + for (String name : interfaces) { + String path = ZookeeperEndpointRepository.getZooKeeperPath(name); + String fullPath = path + '/' + endpointKey; + LOG.info("Changing ZooKeeper node for service with path {}", fullPath); + createPath(path); + zk.setData(fullPath, getData(endpoint), -1); + } + } + + public void remove(EndpointDescription endpoint) throws UnknownHostException, URISyntaxException { + Collection<String> interfaces = endpoint.getInterfaces(); + String endpointKey = getKey(endpoint); + for (String name : interfaces) { + String path = ZookeeperEndpointRepository.getZooKeeperPath(name); + String fullPath = path + '/' + endpointKey; + LOG.debug("Removing ZooKeeper node: {}", fullPath); + try { + zk.delete(fullPath, -1); + } catch (Exception ex) { + LOG.debug("Error while removing endpoint: {}", ex); // e.g. session expired + } + } + } + + public List<EndpointDescription> getAll() throws KeeperException, InterruptedException { + return null; + } + + @Override + public void close() throws IOException { + + } + + private byte[] getData(EndpointDescription epd) { + ByteArrayOutputStream bos = new ByteArrayOutputStream(); + parser.writeEndpoint(epd, bos); + return bos.toByteArray(); + } + + private void createEphemeralNode(String fullPath, byte[] data) throws KeeperException, InterruptedException { + try { + zk.create(fullPath, data, Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL); + } catch (NodeExistsException nee) { + // this sometimes happens after a ZooKeeper node dies and the ephemeral node + // that belonged to the old session was not yet deleted. We need to make our + // session the owner of the node so it won't get deleted automatically - + // we do this by deleting and recreating it ourselves. + LOG.info("node for endpoint already exists, recreating: {}", fullPath); + try { + zk.delete(fullPath, -1); + } catch (NoNodeException nne) { + // it's a race condition, but as long as it got deleted - it's ok + } + zk.create(fullPath, data, Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL); + } + } + + private void createPath(String path) throws KeeperException, InterruptedException { + StringBuilder current = new StringBuilder(); + List<String> parts = ZookeeperEndpointRepository.removeEmpty(Arrays.asList(path.split("/"))); + for (String part : parts) { + current.append('/'); + current.append(part); + try { + if (zk.exists(current.toString(), false) == null) { + zk.create(current.toString(), new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); + } + } catch (NodeExistsException nee) { + // it's not the first node with this path to ever exist - that's normal + } + } + } + + /** + * Removes nulls and empty strings from the given string array. + * + * @param strings an array of strings + * @return a new array containing the non-null and non-empty + * elements of the original array in the same order + */ + public static List<String> removeEmpty(List<String> strings) { + List<String> result = new ArrayList<String>(); + if (strings == null) { + return result; + } + for (String s : strings) { + if (s != null && !s.isEmpty()) { + result.add(s); + } + } + return result; + } + + public static String getZooKeeperPath(String name) { + return name == null || name.isEmpty() ? PATH_PREFIX : PATH_PREFIX + '/' + name.replace('.', '/'); + } + + private static String getKey(EndpointDescription endpoint) throws URISyntaxException { + URI uri = new URI(endpoint.getId()); + return new StringBuilder().append(uri.getHost()).append("#").append(uri.getPort()) + .append("#").append(uri.getPath().replace('/', '#')).toString(); + } + + @Override + public void process(WatchedEvent event) { + + } + +} http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/f07ee8b5/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/subscribe/InterfaceMonitor.java ---------------------------------------------------------------------- diff --git a/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/subscribe/InterfaceMonitor.java b/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/subscribe/InterfaceMonitor.java index 7078bb8..2c90b3c 100644 --- a/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/subscribe/InterfaceMonitor.java +++ b/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/subscribe/InterfaceMonitor.java @@ -25,7 +25,7 @@ import java.util.List; import java.util.Map; import org.apache.aries.rsa.discovery.endpoint.EndpointDescriptionParser; -import org.apache.aries.rsa.discovery.zookeeper.util.Utils; +import org.apache.aries.rsa.discovery.zookeeper.repository.ZookeeperEndpointRepository; import org.apache.zookeeper.AsyncCallback.StatCallback; import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.KeeperException.Code; @@ -67,7 +67,7 @@ public class InterfaceMonitor implements Watcher, StatCallback { public InterfaceMonitor(ZooKeeper zk, String objClass, EndpointEventListener endpointListener, String scope) { this.zk = zk; - this.znode = Utils.getZooKeeperPath(objClass); + this.znode = ZookeeperEndpointRepository.getZooKeeperPath(objClass); this.recursive = objClass == null || objClass.isEmpty(); this.endpointListener = endpointListener; this.parser = new EndpointDescriptionParser(); http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/f07ee8b5/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/subscribe/InterfaceMonitorManager.java ---------------------------------------------------------------------- diff --git a/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/subscribe/InterfaceMonitorManager.java b/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/subscribe/InterfaceMonitorManager.java index 7d6e4ae..0aa98b3 100644 --- a/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/subscribe/InterfaceMonitorManager.java +++ b/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/subscribe/InterfaceMonitorManager.java @@ -30,7 +30,7 @@ import java.util.regex.Matcher; import java.util.regex.Pattern; import org.apache.aries.rsa.discovery.zookeeper.ZooKeeperDiscovery; -import org.apache.aries.rsa.discovery.zookeeper.util.Utils; +import org.apache.aries.rsa.discovery.zookeeper.repository.ZookeeperEndpointRepository; import org.apache.aries.rsa.util.StringPlus; import org.apache.zookeeper.ZooKeeper; import org.osgi.framework.BundleContext; @@ -250,7 +250,7 @@ public class InterfaceMonitorManager { } protected List<String> getScopes(ServiceReference<?> sref) { - return Utils.removeEmpty(StringPlus.normalize(sref.getProperty(EndpointEventListener.ENDPOINT_LISTENER_SCOPE))); + return StringPlus.normalize(sref.getProperty(EndpointEventListener.ENDPOINT_LISTENER_SCOPE)); } public static String getObjectClass(String scope) { http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/f07ee8b5/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/util/Utils.java ---------------------------------------------------------------------- diff --git a/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/util/Utils.java b/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/util/Utils.java deleted file mode 100644 index 289ae32..0000000 --- a/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/util/Utils.java +++ /dev/null @@ -1,57 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.aries.rsa.discovery.zookeeper.util; - -import java.util.ArrayList; -import java.util.List; - -public final class Utils { - - static final String PATH_PREFIX = "/osgi/service_registry"; - - private Utils() { - // never constructed - } - - public static String getZooKeeperPath(String name) { - return name == null || name.isEmpty() ? PATH_PREFIX : PATH_PREFIX + '/' + name.replace('.', '/'); - } - - /** - * Removes nulls and empty strings from the given string array. - * - * @param strings an array of strings - * @return a new array containing the non-null and non-empty - * elements of the original array in the same order - */ - public static List<String> removeEmpty(List<String> strings) { - List<String> result = new ArrayList<String>(); - if (strings == null) { - return result; - } - for (String s : strings) { - if (s != null && !s.isEmpty()) { - result.add(s); - } - } - return result; - } - - -} http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/f07ee8b5/discovery/zookeeper/src/test/java/org/apache/aries/rsa/discovery/zookeeper/publish/PublishingEndpointListenerTest.java ---------------------------------------------------------------------- diff --git a/discovery/zookeeper/src/test/java/org/apache/aries/rsa/discovery/zookeeper/publish/PublishingEndpointListenerTest.java b/discovery/zookeeper/src/test/java/org/apache/aries/rsa/discovery/zookeeper/publish/PublishingEndpointListenerTest.java index b7debf6..a61cf76 100644 --- a/discovery/zookeeper/src/test/java/org/apache/aries/rsa/discovery/zookeeper/publish/PublishingEndpointListenerTest.java +++ b/discovery/zookeeper/src/test/java/org/apache/aries/rsa/discovery/zookeeper/publish/PublishingEndpointListenerTest.java @@ -23,13 +23,13 @@ import static org.easymock.EasyMock.expect; import java.util.HashMap; import java.util.Map; +import org.apache.aries.rsa.discovery.zookeeper.repository.ZookeeperEndpointRepository; import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.ZooDefs.Ids; import org.apache.zookeeper.ZooKeeper; import org.easymock.EasyMock; import org.easymock.IMocksControl; -import org.osgi.framework.BundleContext; import org.osgi.framework.Constants; import org.osgi.service.remoteserviceadmin.EndpointDescription; import org.osgi.service.remoteserviceadmin.EndpointEvent; @@ -44,7 +44,6 @@ public class PublishingEndpointListenerTest extends TestCase { public void testEndpointRemovalAdding() throws KeeperException, InterruptedException { IMocksControl c = EasyMock.createNiceControl(); - BundleContext ctx = c.createMock(BundleContext.class); ZooKeeper zk = c.createMock(ZooKeeper.class); String path = ENDPOINT_PATH; @@ -53,7 +52,8 @@ public class PublishingEndpointListenerTest extends TestCase { c.replay(); - PublishingEndpointListener eli = new PublishingEndpointListener(zk, ctx); + ZookeeperEndpointRepository repository = new ZookeeperEndpointRepository(zk); + PublishingEndpointListener eli = new PublishingEndpointListener(repository); EndpointDescription endpoint = createEndpoint(); eli.endpointChanged(new EndpointEvent(EndpointEvent.ADDED, endpoint), null); eli.endpointChanged(new EndpointEvent(EndpointEvent.ADDED, endpoint), null); // should do nothing @@ -63,23 +63,6 @@ public class PublishingEndpointListenerTest extends TestCase { c.verify(); } - public void testClose() throws KeeperException, InterruptedException { - IMocksControl c = EasyMock.createNiceControl(); - BundleContext ctx = c.createMock(BundleContext.class); - ZooKeeper zk = c.createMock(ZooKeeper.class); - expectCreated(zk, ENDPOINT_PATH); - expectDeleted(zk, ENDPOINT_PATH); - - c.replay(); - - PublishingEndpointListener eli = new PublishingEndpointListener(zk, ctx); - EndpointDescription endpoint = createEndpoint(); - eli.endpointChanged(new EndpointEvent(EndpointEvent.ADDED, endpoint), null); - eli.close(); // should result in zk.delete(...) - - c.verify(); - } - private void expectCreated(ZooKeeper zk, String path) throws KeeperException, InterruptedException { expect(zk.create(EasyMock.eq(path), (byte[])EasyMock.anyObject(), http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/f07ee8b5/discovery/zookeeper/src/test/java/org/apache/aries/rsa/discovery/zookeeper/repository/ZookeeperEndpointRepositoryTest.java ---------------------------------------------------------------------- diff --git a/discovery/zookeeper/src/test/java/org/apache/aries/rsa/discovery/zookeeper/repository/ZookeeperEndpointRepositoryTest.java b/discovery/zookeeper/src/test/java/org/apache/aries/rsa/discovery/zookeeper/repository/ZookeeperEndpointRepositoryTest.java new file mode 100644 index 0000000..3a20f5a --- /dev/null +++ b/discovery/zookeeper/src/test/java/org/apache/aries/rsa/discovery/zookeeper/repository/ZookeeperEndpointRepositoryTest.java @@ -0,0 +1,116 @@ +package org.apache.aries.rsa.discovery.zookeeper.repository; + +import static org.junit.Assert.assertEquals; + +import java.io.File; +import java.io.IOException; +import java.net.InetSocketAddress; +import java.net.ServerSocket; +import java.net.URISyntaxException; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import org.apache.zookeeper.KeeperException; +import org.apache.zookeeper.WatchedEvent; +import org.apache.zookeeper.Watcher; +import org.apache.zookeeper.ZooKeeper; +import org.apache.zookeeper.server.NIOServerCnxnFactory; +import org.apache.zookeeper.server.ServerCnxnFactory; +import org.apache.zookeeper.server.ZooKeeperServer; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.osgi.framework.Constants; +import org.osgi.service.remoteserviceadmin.EndpointDescription; +import org.osgi.service.remoteserviceadmin.EndpointEvent; +import org.osgi.service.remoteserviceadmin.EndpointEventListener; +import org.osgi.service.remoteserviceadmin.RemoteConstants; + +public class ZookeeperEndpointRepositoryTest { + + private ZooKeeperServer server; + private ZooKeeper zk; + private ServerCnxnFactory factory; + + @Before + public void before() throws IOException, InterruptedException, KeeperException { + File target = new File("target"); + File zookeeperDir = new File(target, "zookeeper"); + server = new ZooKeeperServer(zookeeperDir, zookeeperDir, 2000); + factory = new NIOServerCnxnFactory(); + int clientPort = getClientPort(); + factory.configure(new InetSocketAddress(clientPort), 10); + factory.startup(server); + Watcher watcher = new Watcher() { + + @Override + public void process(WatchedEvent event) { + System.out.println(event); + } + + }; + zk = new ZooKeeper("localhost:" + server.getClientPort(), 1000, watcher); + printNodes("/"); + } + + private int getClientPort() throws IOException { + try (ServerSocket serverSocket = new ServerSocket(0)) { + return serverSocket.getLocalPort(); + } + } + + @After + public void after() throws InterruptedException { + zk.close(); + factory.shutdown(); + } + + @Test + public void test() throws IOException, URISyntaxException, KeeperException, InterruptedException { + EndpointEventListener listener = new EndpointEventListener() { + + @Override + public void endpointChanged(EndpointEvent event, String filter) { + System.out.println(event); + } + }; + ZookeeperEndpointRepository repository = new ZookeeperEndpointRepository(zk, listener); + EndpointDescription endpoint = createEndpoint(); + repository.add(endpoint); + + String path = "/osgi/service_registry/java/lang/Runnable/test.de#-1##service1"; + EndpointDescription ep2 = repository.read(path); + repository.close(); + } + + @Test + public void testGetZooKeeperPath() { + assertEquals(ZookeeperEndpointRepository.PATH_PREFIX + '/' + "org/example/Test", + ZookeeperEndpointRepository.getZooKeeperPath("org.example.Test")); + + // used for the recursive discovery + assertEquals(ZookeeperEndpointRepository.PATH_PREFIX, ZookeeperEndpointRepository.getZooKeeperPath(null)); + assertEquals(ZookeeperEndpointRepository.PATH_PREFIX, ZookeeperEndpointRepository.getZooKeeperPath("")); + } + + private EndpointDescription createEndpoint() { + Map<String, Object> props = new HashMap<>(); + props.put(Constants.OBJECTCLASS, new String[] {Runnable.class.getName()}); + props.put(RemoteConstants.ENDPOINT_ID, "http://test.de/service1"); + props.put(RemoteConstants.SERVICE_IMPORTED_CONFIGS, "my"); + + EndpointDescription endpoint = new EndpointDescription(props); + return endpoint; + } + + public void printNodes(String path) throws KeeperException, InterruptedException { + List<String> children = zk.getChildren(path, false); + for (String child : children) { + String newPath = path.endsWith("/") ? path : path + "/"; + String fullPath = newPath + child; + System.out.println(fullPath); + printNodes(fullPath); + } + } +} http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/f07ee8b5/discovery/zookeeper/src/test/java/org/apache/aries/rsa/discovery/zookeeper/subscribe/InterfaceMonitorTest.java ---------------------------------------------------------------------- diff --git a/discovery/zookeeper/src/test/java/org/apache/aries/rsa/discovery/zookeeper/subscribe/InterfaceMonitorTest.java b/discovery/zookeeper/src/test/java/org/apache/aries/rsa/discovery/zookeeper/subscribe/InterfaceMonitorTest.java index 53ddbc4..e09cfbf 100644 --- a/discovery/zookeeper/src/test/java/org/apache/aries/rsa/discovery/zookeeper/subscribe/InterfaceMonitorTest.java +++ b/discovery/zookeeper/src/test/java/org/apache/aries/rsa/discovery/zookeeper/subscribe/InterfaceMonitorTest.java @@ -24,7 +24,7 @@ import static org.easymock.EasyMock.expectLastCall; import java.util.Collections; -import org.apache.aries.rsa.discovery.zookeeper.util.Utils; +import org.apache.aries.rsa.discovery.zookeeper.repository.ZookeeperEndpointRepository; import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.WatchedEvent; import org.apache.zookeeper.Watcher.Event.EventType; @@ -48,7 +48,7 @@ public class InterfaceMonitorTest extends TestCase { String scope = "(myProp=test)"; String interf = "es.schaaf.test"; - String node = Utils.getZooKeeperPath(interf); + String node = ZookeeperEndpointRepository.getZooKeeperPath(interf); EndpointEventListener endpointListener = c.createMock(EndpointEventListener.class); InterfaceMonitor im = new InterfaceMonitor(zk, interf, endpointListener, scope); http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/f07ee8b5/discovery/zookeeper/src/test/java/org/apache/aries/rsa/discovery/zookeeper/util/UtilsTest.java ---------------------------------------------------------------------- diff --git a/discovery/zookeeper/src/test/java/org/apache/aries/rsa/discovery/zookeeper/util/UtilsTest.java b/discovery/zookeeper/src/test/java/org/apache/aries/rsa/discovery/zookeeper/util/UtilsTest.java deleted file mode 100644 index 4d41fb0..0000000 --- a/discovery/zookeeper/src/test/java/org/apache/aries/rsa/discovery/zookeeper/util/UtilsTest.java +++ /dev/null @@ -1,37 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.aries.rsa.discovery.zookeeper.util; - -import org.apache.aries.rsa.discovery.zookeeper.util.Utils; - -import junit.framework.TestCase; - -public class UtilsTest extends TestCase { - - public void testGetZooKeeperPath() { - assertEquals(Utils.PATH_PREFIX + '/' + "org/example/Test", - Utils.getZooKeeperPath("org.example.Test")); - - // used for the recursive discovery - assertEquals(Utils.PATH_PREFIX, Utils.getZooKeeperPath(null)); - assertEquals(Utils.PATH_PREFIX, Utils.getZooKeeperPath("")); - } - - -}
