Repository: aries-rsa Updated Branches: refs/heads/master b7597a4f1 -> 389bae151
[ARIES-1771] Support endpoint changes in zookeeper discovery Project: http://git-wip-us.apache.org/repos/asf/aries-rsa/repo Commit: http://git-wip-us.apache.org/repos/asf/aries-rsa/commit/025516f2 Tree: http://git-wip-us.apache.org/repos/asf/aries-rsa/tree/025516f2 Diff: http://git-wip-us.apache.org/repos/asf/aries-rsa/diff/025516f2 Branch: refs/heads/master Commit: 025516f2cbd1a361a588755db1b8961668ce0c8d Parents: b7597a4 Author: Christian Schneider <[email protected]> Authored: Tue Feb 6 16:54:28 2018 +0100 Committer: Christian Schneider <[email protected]> Committed: Tue Feb 6 16:54:28 2018 +0100 ---------------------------------------------------------------------- .../discovery/zookeeper/ZooKeeperDiscovery.java | 3 +- .../publish/PublishingEndpointListener.java | 50 +++++++-- .../PublishingEndpointListenerFactory.java | 5 +- .../subscribe/EndpointListenerTracker.java | 25 ++++- .../zookeeper/subscribe/InterfaceMonitor.java | 17 ++- .../subscribe/InterfaceMonitorManager.java | 106 +++++++++++-------- .../PublishingEndpointListenerFactoryTest.java | 61 ++++++----- .../subscribe/InterfaceMonitorTest.java | 4 + 8 files changed, 183 insertions(+), 88 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/025516f2/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/ZooKeeperDiscovery.java ---------------------------------------------------------------------- diff --git a/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/ZooKeeperDiscovery.java b/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/ZooKeeperDiscovery.java index 0e03722..584da35 100644 --- a/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/ZooKeeperDiscovery.java +++ b/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/ZooKeeperDiscovery.java @@ -33,7 +33,6 @@ import org.apache.zookeeper.ZooKeeper; import org.osgi.framework.BundleContext; import org.osgi.service.cm.ConfigurationException; import org.osgi.service.cm.ManagedService; -import org.osgi.service.remoteserviceadmin.EndpointEventListener; import org.osgi.util.tracker.ServiceTracker; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -47,7 +46,7 @@ public class ZooKeeperDiscovery implements Watcher, ManagedService { private final BundleContext bctx; private PublishingEndpointListenerFactory endpointListenerFactory; - private ServiceTracker<EndpointEventListener, EndpointEventListener> endpointListenerTracker; + private ServiceTracker<?, ?> endpointListenerTracker; private InterfaceMonitorManager imManager; private ZooKeeper zkClient; private boolean closed; http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/025516f2/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/publish/PublishingEndpointListener.java ---------------------------------------------------------------------- diff --git a/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/publish/PublishingEndpointListener.java b/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/publish/PublishingEndpointListener.java index 82387ba..d5fe7a6 100644 --- a/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/publish/PublishingEndpointListener.java +++ b/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/publish/PublishingEndpointListener.java @@ -38,10 +38,12 @@ import org.apache.zookeeper.KeeperException.NoNodeException; import org.apache.zookeeper.KeeperException.NodeExistsException; import org.apache.zookeeper.ZooDefs.Ids; import org.apache.zookeeper.ZooKeeper; +import org.apache.zookeeper.data.Stat; import org.osgi.framework.BundleContext; import org.osgi.service.remoteserviceadmin.EndpointDescription; import org.osgi.service.remoteserviceadmin.EndpointEvent; import org.osgi.service.remoteserviceadmin.EndpointEventListener; +import org.osgi.service.remoteserviceadmin.EndpointListener; import org.osgi.util.tracker.ServiceTracker; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -49,7 +51,8 @@ import org.slf4j.LoggerFactory; /** * Listens for local Endpoints and publishes them to ZooKeeper. */ -public class PublishingEndpointListener implements EndpointEventListener { +@SuppressWarnings("deprecation") +public class PublishingEndpointListener implements EndpointEventListener, EndpointListener { private static final Logger LOG = LoggerFactory.getLogger(PublishingEndpointListener.class); @@ -79,13 +82,45 @@ public class PublishingEndpointListener implements EndpointEventListener { endpointRemoved(endpoint, filter); break; case EndpointEvent.MODIFIED: - endpointRemoved(endpoint, filter); - endpointAdded(endpoint, filter); + endpointModified(endpoint, filter); break; } } - private void endpointAdded(EndpointDescription endpoint, String matchedFilter) { + private void endpointModified(EndpointDescription endpoint, String filter) { + try { + modifyEndpoint(endpoint); + } catch (Exception e) { + LOG.error("Error modifying endpoint data in zookeeper for endpoint {}", endpoint.getId(), e); + } + } + + private void modifyEndpoint(EndpointDescription endpoint) throws URISyntaxException, KeeperException, InterruptedException { + Collection<String> interfaces = endpoint.getInterfaces(); + String endpointKey = getKey(endpoint); + Map<String, Object> props = new HashMap<String, Object>(endpoint.getProperties()); + + // process plugins + Object[] plugins = discoveryPluginTracker.getServices(); + if (plugins != null) { + for (Object plugin : plugins) { + if (plugin instanceof DiscoveryPlugin) { + endpointKey = ((DiscoveryPlugin)plugin).process(props, endpointKey); + } + } + } + LOG.info("Changing endpoint in zookeeper: {}", endpoint); + for (String name : interfaces) { + String path = Utils.getZooKeeperPath(name); + String fullPath = path + '/' + endpointKey; + LOG.info("Changing ZooKeeper node for service with path {}", fullPath); + createPath(path, zk); + zk.setData(fullPath, getData(endpoint), -1); + } + } + + @Override + public void endpointAdded(EndpointDescription endpoint, String matchedFilter) { synchronized (endpoints) { if (closed) { return; @@ -153,7 +188,8 @@ public class PublishingEndpointListener implements EndpointEventListener { } } - private void endpointRemoved(EndpointDescription endpoint, String matchedFilter) { + @Override + public void endpointRemoved(EndpointDescription endpoint, String matchedFilter) { LOG.info("Local EndpointDescription removed: {}", endpoint); synchronized (endpoints) { @@ -195,7 +231,9 @@ public class PublishingEndpointListener implements EndpointEventListener { current.append('/'); current.append(part); try { - zk.create(current.toString(), new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); + if (zk.exists(current.toString(), false) == null) { + zk.create(current.toString(), new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); + } } catch (NodeExistsException nee) { // it's not the first node with this path to ever exist - that's normal } http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/025516f2/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/publish/PublishingEndpointListenerFactory.java ---------------------------------------------------------------------- diff --git a/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/publish/PublishingEndpointListenerFactory.java b/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/publish/PublishingEndpointListenerFactory.java index 8a40b92..444f7bb 100644 --- a/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/publish/PublishingEndpointListenerFactory.java +++ b/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/publish/PublishingEndpointListenerFactory.java @@ -31,6 +31,7 @@ import org.osgi.framework.Constants; import org.osgi.framework.ServiceFactory; import org.osgi.framework.ServiceRegistration; import org.osgi.service.remoteserviceadmin.EndpointEventListener; +import org.osgi.service.remoteserviceadmin.EndpointListener; import org.osgi.service.remoteserviceadmin.RemoteConstants; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -38,6 +39,7 @@ import org.slf4j.LoggerFactory; /** * Creates local EndpointListeners that publish to ZooKeeper. */ +@SuppressWarnings("deprecation") public class PublishingEndpointListenerFactory implements ServiceFactory<PublishingEndpointListener> { private static final Logger LOG = LoggerFactory.getLogger(PublishingEndpointListenerFactory.class); @@ -78,7 +80,8 @@ public class PublishingEndpointListenerFactory implements ServiceFactory<Publish String.format("(&(%s=*)(%s=%s))", Constants.OBJECTCLASS, RemoteConstants.ENDPOINT_FRAMEWORK_UUID, uuid)); props.put(ZooKeeperDiscovery.DISCOVERY_ZOOKEEPER_ID, "true"); - serviceRegistration = bctx.registerService(EndpointEventListener.class.getName(), this, props); + String[] ifAr = {EndpointEventListener.class.getName(), EndpointListener.class.getName()}; + serviceRegistration = bctx.registerService(ifAr, this, props); } public synchronized void stop() { http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/025516f2/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/subscribe/EndpointListenerTracker.java ---------------------------------------------------------------------- diff --git a/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/subscribe/EndpointListenerTracker.java b/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/subscribe/EndpointListenerTracker.java index 6e6ed1b..0ed1097 100644 --- a/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/subscribe/EndpointListenerTracker.java +++ b/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/subscribe/EndpointListenerTracker.java @@ -19,37 +19,52 @@ package org.apache.aries.rsa.discovery.zookeeper.subscribe; import org.osgi.framework.BundleContext; +import org.osgi.framework.Filter; +import org.osgi.framework.FrameworkUtil; +import org.osgi.framework.InvalidSyntaxException; import org.osgi.framework.ServiceReference; import org.osgi.service.remoteserviceadmin.EndpointEventListener; +import org.osgi.service.remoteserviceadmin.EndpointListener; import org.osgi.util.tracker.ServiceTracker; /** * Tracks interest in EndpointListeners. Delegates to InterfaceMonitorManager to manage * interest in the scopes of each EndpointListener. */ -public class EndpointListenerTracker extends ServiceTracker<EndpointEventListener, EndpointEventListener> { +@SuppressWarnings({ "rawtypes", "deprecation", "unchecked" }) +public class EndpointListenerTracker extends ServiceTracker { private final InterfaceMonitorManager imManager; public EndpointListenerTracker(BundleContext bctx, InterfaceMonitorManager imManager) { - super(bctx, EndpointEventListener.class, null); + super(bctx, getfilter(), null); this.imManager = imManager; } + + private static Filter getfilter() { + String filterSt = String.format("(|(objectClass=%s)(objectClass=%s))", EndpointEventListener.class.getName(), + EndpointListener.class.getName()); + try { + return FrameworkUtil.createFilter(filterSt); + } catch (InvalidSyntaxException e) { + throw new IllegalArgumentException(e.getMessage(), e); + } + } @Override - public EndpointEventListener addingService(ServiceReference<EndpointEventListener> endpointListener) { + public Object addingService(ServiceReference endpointListener) { imManager.addInterest(endpointListener); return null; } @Override - public void modifiedService(ServiceReference<EndpointEventListener> endpointListener, EndpointEventListener service) { + public void modifiedService(ServiceReference endpointListener, Object service) { // called when an EndpointListener updates its service properties, // e.g. when its interest scope is expanded/reduced imManager.addInterest(endpointListener); } @Override - public void removedService(ServiceReference<EndpointEventListener> endpointListener, EndpointEventListener service) { + public void removedService(ServiceReference endpointListener, Object service) { imManager.removeInterest(endpointListener); } http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/025516f2/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/subscribe/InterfaceMonitor.java ---------------------------------------------------------------------- diff --git a/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/subscribe/InterfaceMonitor.java b/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/subscribe/InterfaceMonitor.java index 6972989..7078bb8 100644 --- a/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/subscribe/InterfaceMonitor.java +++ b/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/subscribe/InterfaceMonitor.java @@ -91,6 +91,13 @@ public class InterfaceMonitor implements Watcher, StatCallback { private void watch() { LOG.debug("registering a ZooKeeper.exists({}) callback", znode); zk.exists(znode, this, this, null); + zk.getData(znode, this, new DataCallback() { + + @Override + public void processResult(int rc, String path, Object ctx, byte[] data, Stat stat) { + processDelta(); + } + }, null); } /** @@ -199,18 +206,22 @@ public class InterfaceMonitor implements Watcher, StatCallback { EndpointDescription endpoint = getEndpointDescriptionFromNode(childZNode); if (endpoint != null) { EndpointDescription prevEndpoint = prevNodes.get(child); - LOG.info("found new node " + zn + "/[" + child + "] ( []->child ) props: " - + endpoint.getProperties().values()); + newNodes.put(child, endpoint); prevNodes.remove(child); foundANode = true; LOG.debug("Properties: {}", endpoint.getProperties()); if (prevEndpoint == null) { // This guy is new + LOG.info("found new node " + zn + "/[" + child + "] ( []->child ) props: " + + endpoint.getProperties().values()); EndpointEvent event = new EndpointEvent(EndpointEvent.ADDED, endpoint); endpointListener.endpointChanged(event, null); } else if (!prevEndpoint.getProperties().equals(endpoint.getProperties())) { - // TODO + LOG.info("Found changed node " + zn + "/[" + child + "] ( []->child ) props: " + + endpoint.getProperties().values()); + EndpointEvent event = new EndpointEvent(EndpointEvent.MODIFIED, endpoint); + endpointListener.endpointChanged(event, null); } } if (recursive && processChildren(childZNode, newNodes, prevNodes)) { http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/025516f2/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/subscribe/InterfaceMonitorManager.java ---------------------------------------------------------------------- diff --git a/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/subscribe/InterfaceMonitorManager.java b/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/subscribe/InterfaceMonitorManager.java index 26e4462..7d6e4ae 100644 --- a/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/subscribe/InterfaceMonitorManager.java +++ b/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/subscribe/InterfaceMonitorManager.java @@ -33,13 +33,13 @@ import org.apache.aries.rsa.discovery.zookeeper.ZooKeeperDiscovery; import org.apache.aries.rsa.discovery.zookeeper.util.Utils; import org.apache.aries.rsa.util.StringPlus; import org.apache.zookeeper.ZooKeeper; -import org.osgi.framework.Bundle; import org.osgi.framework.BundleContext; import org.osgi.framework.Filter; import org.osgi.framework.ServiceReference; import org.osgi.service.remoteserviceadmin.EndpointDescription; import org.osgi.service.remoteserviceadmin.EndpointEvent; import org.osgi.service.remoteserviceadmin.EndpointEventListener; +import org.osgi.service.remoteserviceadmin.EndpointListener; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -49,6 +49,7 @@ import org.slf4j.LoggerFactory; * The InterfaceMonitor calls back when it detects added or removed external Endpoints. * These events are then forwarded to all interested EndpointEventListeners. */ +@SuppressWarnings({"deprecation", "rawtypes"}) public class InterfaceMonitorManager { private static final Logger LOG = LoggerFactory.getLogger(InterfaceMonitorManager.class); private static final Pattern OBJECTCLASS_PATTERN = Pattern.compile(".*\\(objectClass=([^)]+)\\).*"); @@ -56,14 +57,14 @@ public class InterfaceMonitorManager { private final BundleContext bctx; private final ZooKeeper zk; // map of EndpointEventListeners and the scopes they are interested in - private final Map<ServiceReference<EndpointEventListener>, List<String>> EndpointEventListenerScopes = - new HashMap<ServiceReference<EndpointEventListener>, List<String>>(); + private final Map<ServiceReference, List<String>> epListenerScopes = + new HashMap<ServiceReference, List<String>>(); // map of scopes and their interest data private final Map<String, Interest> interests = new HashMap<String, Interest>(); protected static class Interest { - List<ServiceReference<EndpointEventListener>> EndpointEventListeners = - new CopyOnWriteArrayList<ServiceReference<EndpointEventListener>>(); + List<ServiceReference> epListeners = + new CopyOnWriteArrayList<ServiceReference>(); InterfaceMonitor monitor; } @@ -72,26 +73,26 @@ public class InterfaceMonitorManager { this.zk = zk; } - public void addInterest(ServiceReference<EndpointEventListener> EndpointEventListener) { - if (isOurOwnEndpointEventListener(EndpointEventListener)) { + public void addInterest(ServiceReference<?> eplistener) { + if (isOurOwnEndpointEventListener(eplistener)) { LOG.debug("Skipping our own EndpointEventListener"); return; } - List<String> scopes = getScopes(EndpointEventListener); + List<String> scopes = getScopes(eplistener); LOG.debug("adding Interests: {}", scopes); for (String scope : scopes) { String objClass = getObjectClass(scope); - addInterest(EndpointEventListener, scope, objClass); + addInterest(eplistener, scope, objClass); } } - private static boolean isOurOwnEndpointEventListener(ServiceReference<EndpointEventListener> EndpointEventListener) { + private static boolean isOurOwnEndpointEventListener(ServiceReference<?> EndpointEventListener) { return Boolean.parseBoolean(String.valueOf( EndpointEventListener.getProperty(ZooKeeperDiscovery.DISCOVERY_ZOOKEEPER_ID))); } - public synchronized void addInterest(ServiceReference<EndpointEventListener> EndpointEventListener, + public synchronized void addInterest(ServiceReference epListener, String scope, String objClass) { // get or create interest for given scope and add listener to it Interest interest = interests.get(scope); @@ -99,27 +100,27 @@ public class InterfaceMonitorManager { // create interest, add listener and start monitor interest = new Interest(); interests.put(scope, interest); - interest.EndpointEventListeners.add(EndpointEventListener); // add it before monitor starts so we don't miss events + interest.epListeners.add(epListener); // add it before monitor starts so we don't miss events interest.monitor = createInterfaceMonitor(scope, objClass, interest); interest.monitor.start(); } else { // interest already exists, so just add listener to it - if (!interest.EndpointEventListeners.contains(EndpointEventListener)) { - interest.EndpointEventListeners.add(EndpointEventListener); + if (!interest.epListeners.contains(epListener)) { + interest.epListeners.add(epListener); } // notify listener of all known endpoints for given scope // (as EndpointEventListener contract requires of all added/modified listeners) for (EndpointDescription endpoint : interest.monitor.getEndpoints()) { EndpointEvent event = new EndpointEvent(EndpointEvent.ADDED, endpoint); - notifyListeners(event, scope, Arrays.asList(EndpointEventListener)); + notifyListeners(event, scope, Arrays.asList(epListener)); } } // add scope to listener's scopes list - List<String> scopes = EndpointEventListenerScopes.get(EndpointEventListener); + List<String> scopes = epListenerScopes.get(epListener); if (scopes == null) { scopes = new ArrayList<String>(1); - EndpointEventListenerScopes.put(EndpointEventListener, scopes); + epListenerScopes.put(epListener, scopes); } if (!scopes.contains(scope)) { scopes.add(scope); @@ -128,7 +129,7 @@ public class InterfaceMonitorManager { public synchronized void removeInterest(ServiceReference<EndpointEventListener> EndpointEventListener) { LOG.info("removing EndpointEventListener interests: {}", EndpointEventListener); - List<String> scopes = EndpointEventListenerScopes.get(EndpointEventListener); + List<String> scopes = epListenerScopes.get(EndpointEventListener); if (scopes == null) { return; } @@ -136,14 +137,14 @@ public class InterfaceMonitorManager { for (String scope : scopes) { Interest interest = interests.get(scope); if (interest != null) { - interest.EndpointEventListeners.remove(EndpointEventListener); - if (interest.EndpointEventListeners.isEmpty()) { + interest.epListeners.remove(EndpointEventListener); + if (interest.epListeners.isEmpty()) { interest.monitor.close(); interests.remove(scope); } } } - EndpointEventListenerScopes.remove(EndpointEventListener); + epListenerScopes.remove(EndpointEventListener); } protected InterfaceMonitor createInterfaceMonitor(final String scope, String objClass, final Interest interest) { @@ -152,30 +153,37 @@ public class InterfaceMonitorManager { @Override public void endpointChanged(EndpointEvent event, String filter) { - notifyListeners(event, scope, interest.EndpointEventListeners); + notifyListeners(event, scope, interest.epListeners); } }; return new InterfaceMonitor(zk, objClass, listener, scope); } private void notifyListeners(EndpointEvent event, String currentScope, - List<ServiceReference<EndpointEventListener>> EndpointEventListeners) { + List<ServiceReference> epListeners) { EndpointDescription endpoint = event.getEndpoint(); - for (ServiceReference<EndpointEventListener> EndpointEventListenerRef : EndpointEventListeners) { - EndpointEventListener service = bctx.getService(EndpointEventListenerRef); + for (ServiceReference<?> epListenerRef : epListeners) { + if (epListenerRef.getBundle() == null) { + LOG.info("listening service was unregistered, ignoring"); + } + Object service = bctx.getService(epListenerRef); + LOG.trace("matching {} against {}", endpoint, currentScope); + if (matchFilter(bctx, currentScope, endpoint)) { + LOG.debug("Matched {} against {}", endpoint, currentScope); try { - EndpointEventListener EndpointEventListener = (EndpointEventListener)service; - LOG.trace("matching {} against {}", endpoint, currentScope); - if (matchFilter(bctx, currentScope, endpoint)) { - LOG.debug("Matched {} against {}", endpoint, currentScope); - notifyListener(event, currentScope, EndpointEventListenerRef.getBundle(), - EndpointEventListener); + if (service instanceof EndpointEventListener) { + EndpointEventListener epeListener = (EndpointEventListener)service; + notifyListener(event, currentScope, epeListener); + } else if (service instanceof EndpointListener) { + EndpointListener epListener = (EndpointListener)service; + notifyListener(event, currentScope, epListener); } } finally { if (service != null) { - bctx.ungetService(EndpointEventListenerRef); + bctx.ungetService(epListenerRef); } } + } } } @@ -194,14 +202,28 @@ public class InterfaceMonitorManager { } - private void notifyListener(EndpointEvent event, String currentScope, - Bundle listenerBundle, EndpointEventListener listener) { + private void notifyListener(EndpointEvent event, String currentScope, EndpointEventListener listener) { EndpointDescription endpoint = event.getEndpoint(); - if (listenerBundle == null) { - LOG.info("listening service was unregistered, ignoring"); - } else { - LOG.info("Calling endpointchanged from bundle {} for endpoint {} ", listenerBundle.getSymbolicName(), endpoint); - listener.endpointChanged(event, currentScope); + LOG.info("Calling endpointchanged on class {} for filter {}, type {}, endpoint {} ", listener, currentScope, endpoint); + listener.endpointChanged(event, currentScope); + } + + private void notifyListener(EndpointEvent event, String currentScope, EndpointListener listener) { + EndpointDescription endpoint = event.getEndpoint(); + LOG.info("Calling old listener on class {} for filter {}, type {}, endpoint {} ", listener, currentScope, endpoint); + switch (event.getType()) { + case EndpointEvent.ADDED: + listener.endpointAdded(endpoint, currentScope); + break; + + case EndpointEvent.MODIFIED: + listener.endpointAdded(endpoint, currentScope); + listener.endpointRemoved(endpoint, currentScope); + break; + + case EndpointEvent.REMOVED: + listener.endpointRemoved(endpoint, currentScope); + break; } } @@ -210,7 +232,7 @@ public class InterfaceMonitorManager { interest.monitor.close(); } interests.clear(); - EndpointEventListenerScopes.clear(); + epListenerScopes.clear(); } /** @@ -223,8 +245,8 @@ public class InterfaceMonitorManager { /** * Only for test case! */ - protected synchronized Map<ServiceReference<EndpointEventListener>, List<String>> getEndpointListenerScopes() { - return EndpointEventListenerScopes; + protected synchronized Map<ServiceReference, List<String>> getEndpointListenerScopes() { + return epListenerScopes; } protected List<String> getScopes(ServiceReference<?> sref) { http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/025516f2/discovery/zookeeper/src/test/java/org/apache/aries/rsa/discovery/zookeeper/publish/PublishingEndpointListenerFactoryTest.java ---------------------------------------------------------------------- diff --git a/discovery/zookeeper/src/test/java/org/apache/aries/rsa/discovery/zookeeper/publish/PublishingEndpointListenerFactoryTest.java b/discovery/zookeeper/src/test/java/org/apache/aries/rsa/discovery/zookeeper/publish/PublishingEndpointListenerFactoryTest.java index 777c11c..381ec9d 100644 --- a/discovery/zookeeper/src/test/java/org/apache/aries/rsa/discovery/zookeeper/publish/PublishingEndpointListenerFactoryTest.java +++ b/discovery/zookeeper/src/test/java/org/apache/aries/rsa/discovery/zookeeper/publish/PublishingEndpointListenerFactoryTest.java @@ -18,58 +18,50 @@ */ package org.apache.aries.rsa.discovery.zookeeper.publish; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + import java.util.Dictionary; import java.util.List; import org.apache.zookeeper.ZooKeeper; import org.easymock.EasyMock; import org.easymock.IMocksControl; +import org.junit.Before; +import org.junit.Test; import org.osgi.framework.BundleContext; import org.osgi.framework.ServiceRegistration; import org.osgi.service.remoteserviceadmin.EndpointEventListener; +import org.osgi.service.remoteserviceadmin.EndpointListener; -import junit.framework.TestCase; +@SuppressWarnings("deprecation") +public class PublishingEndpointListenerFactoryTest { -public class PublishingEndpointListenerFactoryTest extends TestCase { + private IMocksControl c; + private BundleContext ctx; + private ZooKeeper zk; - @SuppressWarnings("unchecked") + @Before + public void before() { + c = EasyMock.createNiceControl(); + zk = c.createMock(ZooKeeper.class); + ctx = createBundleContext(); + } + + @Test public void testScope() { - IMocksControl c = EasyMock.createNiceControl(); - - BundleContext ctx = c.createMock(BundleContext.class); - ZooKeeper zk = c.createMock(ZooKeeper.class); - @SuppressWarnings("rawtypes") - ServiceRegistration sreg = c.createMock(ServiceRegistration.class); - PublishingEndpointListenerFactory eplf = new PublishingEndpointListenerFactory(zk, ctx); - EasyMock.expect(ctx.registerService(EasyMock.eq(EndpointEventListener.class.getName()), EasyMock.eq(eplf), - (Dictionary<String, String>)EasyMock.anyObject())).andReturn(sreg).once(); - - EasyMock.expect(ctx.getProperty(EasyMock.eq("org.osgi.framework.uuid"))).andReturn("myUUID").anyTimes(); - c.replay(); eplf.start(); c.verify(); } - @SuppressWarnings("unchecked") + @Test public void testServiceFactory() { - IMocksControl c = EasyMock.createNiceControl(); - - BundleContext ctx = c.createMock(BundleContext.class); - ZooKeeper zk = c.createMock(ZooKeeper.class); - @SuppressWarnings("rawtypes") - ServiceRegistration sreg = c.createMock(ServiceRegistration.class); - PublishingEndpointListenerFactory eplf = new PublishingEndpointListenerFactory(zk, ctx); - EasyMock.expect(ctx.registerService(EasyMock.eq(EndpointEventListener.class.getName()), EasyMock.eq(eplf), - (Dictionary<String, String>)EasyMock.anyObject())).andReturn(sreg).once(); - - EasyMock.expect(ctx.getProperty(EasyMock.eq("org.osgi.framework.uuid"))).andReturn("myUUID").anyTimes(); - PublishingEndpointListener eli = c.createMock(PublishingEndpointListener.class); eli.close(); EasyMock.expectLastCall().once(); @@ -78,7 +70,6 @@ public class PublishingEndpointListenerFactoryTest extends TestCase { eplf.start(); PublishingEndpointListener service = eplf.getService(null, null); - assertNotNull(service); assertTrue(service instanceof EndpointEventListener); List<PublishingEndpointListener> listeners = eplf.getListeners(); @@ -97,4 +88,16 @@ public class PublishingEndpointListenerFactoryTest extends TestCase { c.verify(); } + + @SuppressWarnings({ "rawtypes", "unchecked" }) + private BundleContext createBundleContext() { + BundleContext ctx = c.createMock(BundleContext.class); + ServiceRegistration sreg = c.createMock(ServiceRegistration.class); + String[] ifAr = {EndpointEventListener.class.getName(), EndpointListener.class.getName()}; + EasyMock.expect(ctx.registerService(EasyMock.aryEq(ifAr), EasyMock.anyObject(), + (Dictionary<String, String>)EasyMock.anyObject())).andReturn(sreg).once(); + + EasyMock.expect(ctx.getProperty(EasyMock.eq("org.osgi.framework.uuid"))).andReturn("myUUID").anyTimes(); + return ctx; + } } http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/025516f2/discovery/zookeeper/src/test/java/org/apache/aries/rsa/discovery/zookeeper/subscribe/InterfaceMonitorTest.java ---------------------------------------------------------------------- diff --git a/discovery/zookeeper/src/test/java/org/apache/aries/rsa/discovery/zookeeper/subscribe/InterfaceMonitorTest.java b/discovery/zookeeper/src/test/java/org/apache/aries/rsa/discovery/zookeeper/subscribe/InterfaceMonitorTest.java index e2ecece..53ddbc4 100644 --- a/discovery/zookeeper/src/test/java/org/apache/aries/rsa/discovery/zookeeper/subscribe/InterfaceMonitorTest.java +++ b/discovery/zookeeper/src/test/java/org/apache/aries/rsa/discovery/zookeeper/subscribe/InterfaceMonitorTest.java @@ -20,6 +20,7 @@ package org.apache.aries.rsa.discovery.zookeeper.subscribe; import static org.easymock.EasyMock.eq; import static org.easymock.EasyMock.expect; +import static org.easymock.EasyMock.expectLastCall; import java.util.Collections; @@ -29,6 +30,7 @@ import org.apache.zookeeper.WatchedEvent; import org.apache.zookeeper.Watcher.Event.EventType; import org.apache.zookeeper.Watcher.Event.KeeperState; import org.apache.zookeeper.ZooKeeper; +import org.apache.zookeeper.AsyncCallback.DataCallback; import org.apache.zookeeper.data.Stat; import org.easymock.EasyMock; import org.easymock.IMocksControl; @@ -52,6 +54,8 @@ public class InterfaceMonitorTest extends TestCase { InterfaceMonitor im = new InterfaceMonitor(zk, interf, endpointListener, scope); zk.exists(eq(node), eq(im), eq(im), EasyMock.anyObject()); EasyMock.expectLastCall().once(); + zk.getData(eq(node), eq(im), EasyMock.anyObject(DataCallback.class), EasyMock.anyObject()); + expectLastCall(); expect(zk.exists(eq(node), eq(false))).andReturn(new Stat()).anyTimes(); expect(zk.getChildren(eq(node), eq(false))).andReturn(Collections.<String> emptyList()).once();
