Further refactoring TopologyManager
Project: http://git-wip-us.apache.org/repos/asf/aries-rsa/repo Commit: http://git-wip-us.apache.org/repos/asf/aries-rsa/commit/e5db6fdd Tree: http://git-wip-us.apache.org/repos/asf/aries-rsa/tree/e5db6fdd Diff: http://git-wip-us.apache.org/repos/asf/aries-rsa/diff/e5db6fdd Branch: refs/heads/master Commit: e5db6fdd7bf5311c0ef0eea723cceea3632f59d7 Parents: ba591ef Author: Christian Schneider <[email protected]> Authored: Thu Mar 10 17:17:21 2016 +0100 Committer: Christian Schneider <[email protected]> Committed: Thu Mar 10 17:17:21 2016 +0100 ---------------------------------------------------------------------- dsw/cxf-topology-manager/pom.xml | 11 ++ .../cxf/dosgi/topologymanager/Activator.java | 122 ++++++++++-- .../exporter/EndpointListenerNotifier.java | 117 ++++-------- .../exporter/EndpointRepository.java | 27 ++- .../topologymanager/exporter/StringPlus.java | 57 ++++++ .../exporter/TopologyManagerExport.java | 144 ++++---------- .../importer/EndpointListenerManager.java | 2 +- .../topologymanager/importer/FilterHelper.java | 43 +++++ .../importer/ListenerHookImpl.java | 3 +- .../topologymanager/importer/RSATracker.java | 26 +++ .../importer/TopologyManagerImport.java | 39 ++-- .../util/SimpleServiceTracker.java | 151 --------------- .../util/SimpleServiceTrackerListener.java | 54 ------ .../cxf/dosgi/topologymanager/util/Utils.java | 81 -------- .../exporter/EndpointListenerNotifierTest.java | 188 +++++++------------ .../exporter/ExportServiceTest.java | 143 -------------- .../exporter/TopologyManagerExportTest.java | 139 ++++++++++++++ .../importer/TopologyManagerImportTest.java | 19 +- .../util/SimpleServiceTrackerTest.java | 164 ---------------- 19 files changed, 574 insertions(+), 956 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/e5db6fdd/dsw/cxf-topology-manager/pom.xml ---------------------------------------------------------------------- diff --git a/dsw/cxf-topology-manager/pom.xml b/dsw/cxf-topology-manager/pom.xml index cb91f1f..2c64d28 100644 --- a/dsw/cxf-topology-manager/pom.xml +++ b/dsw/cxf-topology-manager/pom.xml @@ -56,10 +56,21 @@ <scope>test</scope> </dependency> <dependency> + <groupId>org.slf4j</groupId> + <artifactId>slf4j-jdk14</artifactId> + <version>1.7.16</version> + <scope>test</scope> + </dependency> + <dependency> <groupId>org.easymock</groupId> <artifactId>easymockclassextension</artifactId> <scope>test</scope> </dependency> + <dependency> + <groupId>org.hamcrest</groupId> + <artifactId>hamcrest-all</artifactId> + <version>1.3</version> + </dependency> </dependencies> <build> http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/e5db6fdd/dsw/cxf-topology-manager/src/main/java/org/apache/cxf/dosgi/topologymanager/Activator.java ---------------------------------------------------------------------- diff --git a/dsw/cxf-topology-manager/src/main/java/org/apache/cxf/dosgi/topologymanager/Activator.java b/dsw/cxf-topology-manager/src/main/java/org/apache/cxf/dosgi/topologymanager/Activator.java index 5cf8479..46f98a7 100644 --- a/dsw/cxf-topology-manager/src/main/java/org/apache/cxf/dosgi/topologymanager/Activator.java +++ b/dsw/cxf-topology-manager/src/main/java/org/apache/cxf/dosgi/topologymanager/Activator.java @@ -18,38 +18,132 @@ */ package org.apache.cxf.dosgi.topologymanager; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; + +import org.apache.cxf.dosgi.topologymanager.exporter.EndpointListenerNotifier; +import org.apache.cxf.dosgi.topologymanager.exporter.EndpointRepository; import org.apache.cxf.dosgi.topologymanager.exporter.TopologyManagerExport; import org.apache.cxf.dosgi.topologymanager.importer.TopologyManagerImport; -import org.apache.cxf.dosgi.topologymanager.util.SimpleServiceTracker; import org.osgi.framework.BundleActivator; import org.osgi.framework.BundleContext; +import org.osgi.framework.Constants; +import org.osgi.framework.Filter; +import org.osgi.framework.InvalidSyntaxException; +import org.osgi.framework.ServiceReference; +import org.osgi.service.remoteserviceadmin.EndpointListener; +import org.osgi.service.remoteserviceadmin.RemoteConstants; import org.osgi.service.remoteserviceadmin.RemoteServiceAdmin; +import org.osgi.util.tracker.ServiceTracker; +import org.osgi.util.tracker.ServiceTrackerCustomizer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class Activator implements BundleActivator { - + private static final String DOSGI_SERVICES = "(" + RemoteConstants.SERVICE_EXPORTED_INTERFACES + "=*)"; + private static final String ENDPOINT_LISTENER_FILTER = + "(&(" + Constants.OBJECTCLASS + "=" + EndpointListener.class.getName() + ")" + + "(" + EndpointListener.ENDPOINT_LISTENER_SCOPE + "=*))"; private static final Logger LOG = LoggerFactory.getLogger(Activator.class); - private TopologyManagerExport topologyManagerExport; - private TopologyManagerImport topologyManagerImport; - private SimpleServiceTracker<RemoteServiceAdmin> rsaTracker; + private TopologyManagerExport exportManager; + private TopologyManagerImport importManager; + private EndpointListenerNotifier notifier; + private ServiceTracker<RemoteServiceAdmin, RemoteServiceAdmin> rsaTracker; + private ThreadPoolExecutor exportExecutor; + private ServiceTracker<EndpointListener, EndpointListener> epListenerTracker; - public void start(BundleContext bc) throws Exception { + public void start(final BundleContext bc) throws Exception { LOG.debug("TopologyManager: start()"); - rsaTracker = new SimpleServiceTracker<RemoteServiceAdmin>(bc, RemoteServiceAdmin.class); - topologyManagerExport = new TopologyManagerExport(bc, rsaTracker); - topologyManagerImport = new TopologyManagerImport(bc, rsaTracker); - + EndpointRepository endpointRepo = new EndpointRepository(); + notifier = new EndpointListenerNotifier(endpointRepo); + epListenerTracker = new ServiceTracker<EndpointListener, EndpointListener>(bc, EndpointListener.class, null) { + @Override + public EndpointListener addingService(ServiceReference<EndpointListener> reference) { + EndpointListener listener = super.addingService(reference); + notifier.add(listener, EndpointListenerNotifier.getFiltersFromEndpointListenerScope(reference)); + return listener; + } + + @Override + public void modifiedService(ServiceReference<EndpointListener> reference, + EndpointListener listener) { + super.modifiedService(reference, listener); + notifier.add(listener, EndpointListenerNotifier.getFiltersFromEndpointListenerScope(reference)); + } + + @Override + public void removedService(ServiceReference<EndpointListener> reference, + EndpointListener listener) { + notifier.remove(listener); + super.removedService(reference, listener); + } + }; + endpointRepo.setNotifier(notifier); + exportExecutor = new ThreadPoolExecutor(5, 10, 50, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>()); + exportManager = new TopologyManagerExport(endpointRepo, exportExecutor); + importManager = new TopologyManagerImport(bc); + rsaTracker = new RSATracker(bc, RemoteServiceAdmin.class, null); + bc.addServiceListener(exportManager); rsaTracker.open(); - topologyManagerExport.start(); - topologyManagerImport.start(); + epListenerTracker.open(); + exportExistingServices(bc); + importManager.start(); } public void stop(BundleContext bc) throws Exception { LOG.debug("TopologyManager: stop()"); - topologyManagerExport.stop(); - topologyManagerImport.stop(); + epListenerTracker.close(); + bc.removeServiceListener(exportManager); + exportExecutor.shutdown(); + importManager.stop(); rsaTracker.close(); } + + public static Filter epListenerFilter(BundleContext bctx) { + try { + return bctx.createFilter(ENDPOINT_LISTENER_FILTER); + } catch (InvalidSyntaxException e) { + throw new RuntimeException("Unexpected exception creating filter", e); + } + } + + public void exportExistingServices(BundleContext context) { + try { + // cast to String is necessary for compiling against OSGi core version >= 4.3 + ServiceReference<?>[] references = context.getServiceReferences((String)null, DOSGI_SERVICES); + if (references != null) { + for (ServiceReference<?> sref : references) { + exportManager.export(sref); + } + } + } catch (InvalidSyntaxException e) { + LOG.error("Error in filter {}. This should not occur!", DOSGI_SERVICES); + } + } + + private final class RSATracker extends ServiceTracker<RemoteServiceAdmin, RemoteServiceAdmin> { + private RSATracker(BundleContext context, Class<RemoteServiceAdmin> clazz, + ServiceTrackerCustomizer<RemoteServiceAdmin, RemoteServiceAdmin> customizer) { + super(context, clazz, customizer); + } + + @Override + public RemoteServiceAdmin addingService(ServiceReference<RemoteServiceAdmin> reference) { + RemoteServiceAdmin rsa = super.addingService(reference); + LOG.debug("New RemoteServiceAdmin {} detected, trying to import and export services with it", rsa); + importManager.add(rsa); + exportManager.add(rsa); + return rsa; + } + + @Override + public void removedService(ServiceReference<RemoteServiceAdmin> reference, + RemoteServiceAdmin rsa) { + exportManager.remove(rsa); + importManager.remove(rsa); + super.removedService(reference, rsa); + } + } } http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/e5db6fdd/dsw/cxf-topology-manager/src/main/java/org/apache/cxf/dosgi/topologymanager/exporter/EndpointListenerNotifier.java ---------------------------------------------------------------------- diff --git a/dsw/cxf-topology-manager/src/main/java/org/apache/cxf/dosgi/topologymanager/exporter/EndpointListenerNotifier.java b/dsw/cxf-topology-manager/src/main/java/org/apache/cxf/dosgi/topologymanager/exporter/EndpointListenerNotifier.java index e01f5dd..8f33add 100644 --- a/dsw/cxf-topology-manager/src/main/java/org/apache/cxf/dosgi/topologymanager/exporter/EndpointListenerNotifier.java +++ b/dsw/cxf-topology-manager/src/main/java/org/apache/cxf/dosgi/topologymanager/exporter/EndpointListenerNotifier.java @@ -18,18 +18,16 @@ */ package org.apache.cxf.dosgi.topologymanager.exporter; -import java.util.ArrayList; import java.util.Collection; import java.util.Dictionary; +import java.util.HashSet; import java.util.Hashtable; -import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; -import org.apache.cxf.dosgi.topologymanager.util.SimpleServiceTracker; -import org.apache.cxf.dosgi.topologymanager.util.SimpleServiceTrackerListener; -import org.apache.cxf.dosgi.topologymanager.util.Utils; -import org.osgi.framework.BundleContext; -import org.osgi.framework.Constants; import org.osgi.framework.Filter; +import org.osgi.framework.FrameworkUtil; import org.osgi.framework.InvalidSyntaxException; import org.osgi.framework.ServiceReference; import org.osgi.service.remoteserviceadmin.EndpointDescription; @@ -41,92 +39,61 @@ import org.slf4j.LoggerFactory; * Tracks EndpointListeners and allows to notify them of endpoints. */ public class EndpointListenerNotifier { - - private static final String ENDPOINT_LISTENER_FILTER = - "(&(" + Constants.OBJECTCLASS + "=" + EndpointListener.class.getName() + ")" - + "(" + EndpointListener.ENDPOINT_LISTENER_SCOPE + "=*))"; private static final Logger LOG = LoggerFactory.getLogger(EndpointListenerNotifier.class); - private BundleContext bctx; - private SimpleServiceTracker<EndpointListener> endpointListenerTracker; - - public EndpointListenerNotifier(BundleContext bctx, final EndpointRepository endpointRepository) { - this.bctx = bctx; - Filter filter; - try { - filter = bctx.createFilter(ENDPOINT_LISTENER_FILTER); - } catch (InvalidSyntaxException e) { - throw new RuntimeException("Unexpected exception creating filter", e); - } - this.endpointListenerTracker = new SimpleServiceTracker<EndpointListener>(bctx, filter); - this.endpointListenerTracker.addListener(new SimpleServiceTrackerListener<EndpointListener>() { - @Override - public void added(ServiceReference<EndpointListener> reference, EndpointListener service) { - LOG.debug("new EndpointListener detected"); - notifyListener(true, reference, endpointRepository.getAllEndpoints()); - } - - @Override - public void modified(ServiceReference<EndpointListener> reference, EndpointListener service) { - LOG.debug("EndpointListener modified"); - notifyListener(true, reference, endpointRepository.getAllEndpoints()); - } + public enum NotifyType { ADDED, REMOVED }; + private Map<EndpointListener, Set<Filter>> listeners; + private EndpointRepository endpointRepo; - @Override - public void removed(ServiceReference<EndpointListener> reference, EndpointListener service) { - } - }); + public EndpointListenerNotifier(final EndpointRepository endpointRepo) { + this.endpointRepo = endpointRepo; + this.listeners = new ConcurrentHashMap<EndpointListener, Set<Filter>>(); } - public void start() { - endpointListenerTracker.open(); + public void add(EndpointListener ep, Set<Filter> filters) { + LOG.debug("new EndpointListener detected"); + listeners.put(ep, filters); + notifyListener(NotifyType.ADDED, ep, filters, endpointRepo.getAllEndpoints()); } - - public void stop() { - endpointListenerTracker.close(); + + public void remove(EndpointListener ep) { + LOG.debug("EndpointListener modified"); + listeners.remove(ep); } - + /** * Notifies all endpoint listeners about endpoints being added or removed. * * @param added specifies whether endpoints were added (true) or removed (false) * @param endpoints the endpoints the listeners should be notified about */ - void notifyListeners(boolean added, Collection<EndpointDescription> endpoints) { + public void notifyListeners(NotifyType type, Collection<EndpointDescription> endpoints) { if (endpoints.isEmpty()) { // a little optimization to prevent unnecessary processing return; } - for (ServiceReference<EndpointListener> eplReference : endpointListenerTracker.getAllServiceReferences()) { - notifyListener(added, eplReference, endpoints); + for (EndpointListener listener : listeners.keySet()) { + notifyListener(type, listener, listeners.get(listener), endpoints); } } /** * Notifies an endpoint listener about endpoints being added or removed. * - * @param added specifies whether endpoints were added (true) or removed (false) + * @param type specifies whether endpoints were added (true) or removed (false) * @param endpointListenerRef the ServiceReference of an EndpointListener to notify * @param endpoints the endpoints the listener should be notified about */ - void notifyListener(boolean added, ServiceReference<EndpointListener> endpointListenerRef, - Collection<EndpointDescription> endpoints) { - List<Filter> filters = getFiltersFromEndpointListenerScope(endpointListenerRef, bctx); - EndpointListener endpointListener = bctx.getService(endpointListenerRef); - try { - LOG.debug("notifyListener (added={})", added); - for (EndpointDescription endpoint : endpoints) { - List<Filter> matchingFilters = getMatchingFilters(filters, endpoint); - for (Filter filter : matchingFilters) { - if (added) { - endpointListener.endpointAdded(endpoint, filter.toString()); - } else { - endpointListener.endpointRemoved(endpoint, filter.toString()); - } + void notifyListener(NotifyType type, EndpointListener listener, Set<Filter> filters, + Collection<EndpointDescription> endpoints) { + LOG.debug("Endpoint {}", type); + for (EndpointDescription endpoint : endpoints) { + Set<Filter> matchingFilters = getMatchingFilters(filters, endpoint); + for (Filter filter : matchingFilters) { + if (type == NotifyType.ADDED) { + listener.endpointAdded(endpoint, filter.toString()); + } else { + listener.endpointRemoved(endpoint, filter.toString()); } } - } finally { - if (endpointListener != null) { - bctx.ungetService(endpointListenerRef); - } } } @@ -136,7 +103,7 @@ public class EndpointListenerNotifier { * @param endpoint an endpoint description * @return endpoint properties (will never return null) */ - public static Dictionary<String, Object> getEndpointProperties(EndpointDescription endpoint) { + private static Dictionary<String, Object> getEndpointProperties(EndpointDescription endpoint) { if (endpoint == null || endpoint.getProperties() == null) { return new Hashtable<String, Object>(); } else { @@ -144,13 +111,12 @@ public class EndpointListenerNotifier { } } - static List<Filter> getFiltersFromEndpointListenerScope(ServiceReference<EndpointListener> sref, - BundleContext bctx) { - List<Filter> filters = new ArrayList<Filter>(); - String[] scopes = Utils.getStringPlusProperty(sref.getProperty(EndpointListener.ENDPOINT_LISTENER_SCOPE)); + public static Set<Filter> getFiltersFromEndpointListenerScope(ServiceReference<EndpointListener> sref) { + Set<Filter> filters = new HashSet<Filter>(); + String[] scopes = StringPlus.parse(sref.getProperty(EndpointListener.ENDPOINT_LISTENER_SCOPE)); for (String scope : scopes) { try { - filters.add(bctx.createFilter(scope)); + filters.add(FrameworkUtil.createFilter(scope)); } catch (InvalidSyntaxException e) { LOG.error("invalid endpoint listener scope: {}", scope, e); } @@ -158,10 +124,9 @@ public class EndpointListenerNotifier { return filters; } - private static List<Filter> getMatchingFilters(List<Filter> filters, EndpointDescription endpoint) { - List<Filter> matchingFilters = new ArrayList<Filter>(); + private static Set<Filter> getMatchingFilters(Set<Filter> filters, EndpointDescription endpoint) { + Set<Filter> matchingFilters = new HashSet<Filter>(); Dictionary<String, Object> dict = EndpointListenerNotifier.getEndpointProperties(endpoint); - for (Filter filter : filters) { if (filter.match(dict)) { LOG.debug("Filter {} matches endpoint {}", filter, dict); http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/e5db6fdd/dsw/cxf-topology-manager/src/main/java/org/apache/cxf/dosgi/topologymanager/exporter/EndpointRepository.java ---------------------------------------------------------------------- diff --git a/dsw/cxf-topology-manager/src/main/java/org/apache/cxf/dosgi/topologymanager/exporter/EndpointRepository.java b/dsw/cxf-topology-manager/src/main/java/org/apache/cxf/dosgi/topologymanager/exporter/EndpointRepository.java index bdcdda5..7984822 100644 --- a/dsw/cxf-topology-manager/src/main/java/org/apache/cxf/dosgi/topologymanager/exporter/EndpointRepository.java +++ b/dsw/cxf-topology-manager/src/main/java/org/apache/cxf/dosgi/topologymanager/exporter/EndpointRepository.java @@ -26,7 +26,7 @@ import java.util.List; import java.util.Map; import java.util.Set; -import org.apache.cxf.dosgi.topologymanager.util.Utils; +import org.apache.cxf.dosgi.topologymanager.exporter.EndpointListenerNotifier.NotifyType; import org.osgi.framework.ServiceReference; import org.osgi.service.remoteserviceadmin.EndpointDescription; import org.osgi.service.remoteserviceadmin.RemoteServiceAdmin; @@ -39,20 +39,28 @@ import org.slf4j.LoggerFactory; * endpoints. */ @SuppressWarnings("rawtypes") -class EndpointRepository { +public class EndpointRepository { private static final Logger LOG = LoggerFactory.getLogger(EndpointRepository.class); private final Map<ServiceReference, Map<RemoteServiceAdmin, Collection<EndpointDescription>>> exportedServices = new LinkedHashMap<ServiceReference, Map<RemoteServiceAdmin, Collection<EndpointDescription>>>(); + private EndpointListenerNotifier notifier; + + public void setNotifier(EndpointListenerNotifier notifier) { + this.notifier = notifier; + } + + /** * Remove all services exported by the given rsa. * * @param rsa the RemoteServiceAdmin to remove * @return list of removed endpoints */ - synchronized List<EndpointDescription> removeRemoteServiceAdmin(RemoteServiceAdmin rsa) { + public synchronized List<EndpointDescription> removeRemoteServiceAdmin(RemoteServiceAdmin rsa) { + LOG.debug("RemoteServiceAdmin removed: {}", rsa.getClass().getName()); List<EndpointDescription> removedEndpoints = new ArrayList<EndpointDescription>(); for (Map<RemoteServiceAdmin, Collection<EndpointDescription>> exports : exportedServices.values()) { Collection<EndpointDescription> endpoints = exports.get(rsa); @@ -61,10 +69,11 @@ class EndpointRepository { exports.remove(rsa); } } + notifier.notifyListeners(NotifyType.REMOVED, removedEndpoints); return removedEndpoints; } - synchronized List<EndpointDescription> removeService(ServiceReference sref) { + synchronized void removeService(ServiceReference sref) { List<EndpointDescription> removedEndpoints = new ArrayList<EndpointDescription>(); Map<RemoteServiceAdmin, Collection<EndpointDescription>> rsas = exportedServices.get(sref); if (rsas != null) { @@ -73,12 +82,12 @@ class EndpointRepository { } exportedServices.remove(sref); } - return removedEndpoints; + notifier.notifyListeners(NotifyType.REMOVED, removedEndpoints); } synchronized void addService(ServiceReference sref) { if (!exportedServices.containsKey(sref)) { - LOG.info("Marking service from bundle {} for export", Utils.getBundleName(sref)); + LOG.info("Marking service from bundle {} for export", sref.getBundle().getSymbolicName()); exportedServices.put(sref, new LinkedHashMap<RemoteServiceAdmin, Collection<EndpointDescription>>()); } } @@ -91,6 +100,7 @@ class EndpointRepository { addService(sref); Map<RemoteServiceAdmin, Collection<EndpointDescription>> exports = exportedServices.get(sref); exports.put(rsa, endpoints); + notifier.notifyListeners(NotifyType.ADDED, endpoints); } synchronized boolean isAlreadyExportedForRsa(ServiceReference sref, RemoteServiceAdmin rsa) { @@ -98,7 +108,7 @@ class EndpointRepository { return exports != null && exports.containsKey(rsa); } - synchronized Collection<EndpointDescription> getAllEndpoints() { + public synchronized Collection<EndpointDescription> getAllEndpoints() { List<EndpointDescription> allEndpoints = new ArrayList<EndpointDescription>(); for (Map<RemoteServiceAdmin, Collection<EndpointDescription>> exports : exportedServices.values()) { for (Collection<EndpointDescription> endpoints : exports.values()) { @@ -108,7 +118,7 @@ class EndpointRepository { return allEndpoints; } - synchronized Set<ServiceReference> getServicesToBeExportedFor(RemoteServiceAdmin rsa) { + public synchronized Set<ServiceReference> getServicesToBeExportedFor(RemoteServiceAdmin rsa) { Set<ServiceReference> servicesToBeExported = new HashSet<ServiceReference>(); for (Map.Entry<ServiceReference, Map<RemoteServiceAdmin, Collection<EndpointDescription>>> entry : exportedServices.entrySet()) { @@ -118,4 +128,5 @@ class EndpointRepository { } return servicesToBeExported; } + } http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/e5db6fdd/dsw/cxf-topology-manager/src/main/java/org/apache/cxf/dosgi/topologymanager/exporter/StringPlus.java ---------------------------------------------------------------------- diff --git a/dsw/cxf-topology-manager/src/main/java/org/apache/cxf/dosgi/topologymanager/exporter/StringPlus.java b/dsw/cxf-topology-manager/src/main/java/org/apache/cxf/dosgi/topologymanager/exporter/StringPlus.java new file mode 100644 index 0000000..1198154 --- /dev/null +++ b/dsw/cxf-topology-manager/src/main/java/org/apache/cxf/dosgi/topologymanager/exporter/StringPlus.java @@ -0,0 +1,57 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.cxf.dosgi.topologymanager.exporter; + +import java.util.Collection; + +public final class StringPlus { + + private StringPlus() { + } + + /** + * Returns the value of a "string+" property as an array of strings. + * <p> + * A "string+" property can have a value which is either a string, + * an array of strings, or a collection of strings. + * <p> + * If the given value is not of one of the valid types, or is null, + * an empty array is returned. + * + * @param property a "string+" property value + * @return the property value as an array of strings, or an empty array + */ + public static String[] parse(Object property) { + if (property instanceof String) { + return new String[] {(String)property}; + } else if (property instanceof String[]) { + return (String[])property; + } else if (property instanceof Collection) { + try { + @SuppressWarnings("unchecked") + Collection<String> strings = (Collection<String>)property; + return strings.toArray(new String[strings.size()]); + } catch (ArrayStoreException ase) { + // ignore collections with wrong type + } + } + return new String[0]; + } + +} http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/e5db6fdd/dsw/cxf-topology-manager/src/main/java/org/apache/cxf/dosgi/topologymanager/exporter/TopologyManagerExport.java ---------------------------------------------------------------------- diff --git a/dsw/cxf-topology-manager/src/main/java/org/apache/cxf/dosgi/topologymanager/exporter/TopologyManagerExport.java b/dsw/cxf-topology-manager/src/main/java/org/apache/cxf/dosgi/topologymanager/exporter/TopologyManagerExport.java index 446ab07..d7db57f 100644 --- a/dsw/cxf-topology-manager/src/main/java/org/apache/cxf/dosgi/topologymanager/exporter/TopologyManagerExport.java +++ b/dsw/cxf-topology-manager/src/main/java/org/apache/cxf/dosgi/topologymanager/exporter/TopologyManagerExport.java @@ -20,17 +20,11 @@ package org.apache.cxf.dosgi.topologymanager.exporter; import java.util.ArrayList; import java.util.Collection; +import java.util.HashSet; import java.util.List; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.ThreadPoolExecutor; -import java.util.concurrent.TimeUnit; +import java.util.Set; +import java.util.concurrent.Executor; -import org.apache.cxf.dosgi.topologymanager.util.SimpleServiceTracker; -import org.apache.cxf.dosgi.topologymanager.util.SimpleServiceTrackerListener; -import org.apache.cxf.dosgi.topologymanager.util.Utils; -import org.osgi.framework.BundleContext; -import org.osgi.framework.InvalidSyntaxException; import org.osgi.framework.ServiceEvent; import org.osgi.framework.ServiceListener; import org.osgi.framework.ServiceReference; @@ -53,71 +47,32 @@ import org.slf4j.LoggerFactory; * <li> When a service is unpublished the EndpointListeners are notified. * The endpoints are not closed as the ExportRegistration takes care of this */ -public class TopologyManagerExport { - - private static final String DOSGI_SERVICES = "(" + RemoteConstants.SERVICE_EXPORTED_INTERFACES + "=*)"; - +public class TopologyManagerExport implements ServiceListener { private static final Logger LOG = LoggerFactory.getLogger(TopologyManagerExport.class); - private final BundleContext bctx; - private final EndpointListenerNotifier epListenerNotifier; - private final ExecutorService execService; - private final SimpleServiceTracker<RemoteServiceAdmin> remoteServiceAdminTracker; - private final ServiceListener serviceListener; + private final Executor execService; private final EndpointRepository endpointRepo; + private final Set<RemoteServiceAdmin> rsaSet; - public TopologyManagerExport(BundleContext ctx, SimpleServiceTracker<RemoteServiceAdmin> rsaTracker) { - this(ctx, rsaTracker, null); + public TopologyManagerExport(final EndpointRepository endpointRepo, Executor executor) { + this.endpointRepo = endpointRepo; + this.rsaSet = new HashSet<RemoteServiceAdmin>(); + this.execService = executor; } - public TopologyManagerExport(BundleContext ctx, SimpleServiceTracker<RemoteServiceAdmin> rsaTracker, - EndpointListenerNotifier notif) { - endpointRepo = new EndpointRepository(); - epListenerNotifier = notif == null ? new EndpointListenerNotifier(ctx, endpointRepo) : notif; - execService = new ThreadPoolExecutor(5, 10, 50, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>()); - bctx = ctx; - remoteServiceAdminTracker = rsaTracker; - - // track RemoteServiceAdmins through which we can export services - remoteServiceAdminTracker.addListener(new SimpleServiceTrackerListener<RemoteServiceAdmin>() { - - public void added(ServiceReference<RemoteServiceAdmin> reference, RemoteServiceAdmin rsa) { - LOG.debug("RemoteServiceAdmin added: {}, total {}", - rsa, remoteServiceAdminTracker.getAllServices().size()); - for (ServiceReference<?> serviceRef : endpointRepo.getServicesToBeExportedFor(rsa)) { - triggerExport(serviceRef); - } - } - - public void modified(ServiceReference<RemoteServiceAdmin> reference, RemoteServiceAdmin rsa) { + // track all service registrations so we can export any services that are configured to be exported + // ServiceListener events may be delivered out of order, concurrently, re-entrant, etc. (see spec or docs) + public void serviceChanged(ServiceEvent event) { + ServiceReference<?> sref = event.getServiceReference(); + if (event.getType() == ServiceEvent.REGISTERED) { + LOG.debug("Received REGISTERED ServiceEvent: {}", event); + if (shouldExportService(sref)) { + export(sref); } - - public void removed(ServiceReference<RemoteServiceAdmin> reference, RemoteServiceAdmin rsa) { - LOG.debug("RemoteServiceAdmin removed: {}, total {}", rsa, - remoteServiceAdminTracker.getAllServices().size()); - List<EndpointDescription> endpoints = endpointRepo.removeRemoteServiceAdmin(rsa); - epListenerNotifier.notifyListeners(false, endpoints); - } - }); - - // track all service registrations so we can export any services that are configured to be exported - // ServiceListener events may be delivered out of order, concurrently, re-entrant, etc. (see spec or docs) - serviceListener = new ServiceListener() { - - public void serviceChanged(ServiceEvent event) { - ServiceReference<?> sref = event.getServiceReference(); - if (event.getType() == ServiceEvent.REGISTERED) { - LOG.debug("Received REGISTERED ServiceEvent: {}", event); - if (shouldExportService(sref)) { - triggerExport(sref); - } - } else if (event.getType() == ServiceEvent.UNREGISTERING) { - LOG.debug("Received UNREGISTERING ServiceEvent: {}", event); - List<EndpointDescription> endpoints = endpointRepo.removeService(sref); - epListenerNotifier.notifyListeners(false, endpoints); - } - } - }; + } else if (event.getType() == ServiceEvent.UNREGISTERING) { + LOG.debug("Received UNREGISTERING ServiceEvent: {}", event); + endpointRepo.removeService(sref); + } } /** @@ -127,19 +82,19 @@ public class TopologyManagerExport { return sref.getProperty(RemoteConstants.SERVICE_EXPORTED_INTERFACES) != null; } - public void start() { - epListenerNotifier.start(); - bctx.addServiceListener(serviceListener); - exportExistingServices(); - } - - public void stop() { - execService.shutdown(); - bctx.removeServiceListener(serviceListener); - epListenerNotifier.stop(); - } - - protected void triggerExport(final ServiceReference<?> sref) { + public void add(RemoteServiceAdmin rsa) { + rsaSet.add(rsa); + for (ServiceReference<?> serviceRef : endpointRepo.getServicesToBeExportedFor(rsa)) { + export(serviceRef); + } + }; + + public void remove(RemoteServiceAdmin rsa) { + rsaSet.remove(rsa); + endpointRepo.removeRemoteServiceAdmin(rsa); + }; + + public void export(final ServiceReference<?> sref) { execService.execute(new Runnable() { public void run() { try { @@ -154,16 +109,15 @@ public class TopologyManagerExport { protected void doExportService(final ServiceReference<?> sref) { LOG.debug("Exporting service {}", sref); endpointRepo.addService(sref); // mark for future export even if there are currently no RSAs - List<RemoteServiceAdmin> rsaList = remoteServiceAdminTracker.getAllServices(); - if (rsaList.isEmpty()) { + if (rsaSet.size() == 0) { LOG.error("No RemoteServiceAdmin available! Unable to export service from bundle {}, interfaces: {}", - Utils.getBundleName(sref), + sref.getBundle().getSymbolicName(), sref.getProperty(org.osgi.framework.Constants.OBJECTCLASS)); + return; } - for (final RemoteServiceAdmin remoteServiceAdmin : rsaList) { + for (RemoteServiceAdmin remoteServiceAdmin : rsaSet) { LOG.info("TopologyManager: handling remoteServiceAdmin " + remoteServiceAdmin); - if (endpointRepo.isAlreadyExportedForRsa(sref, remoteServiceAdmin)) { // already handled by this remoteServiceAdmin LOG.debug("already handled by this remoteServiceAdmin -> skipping"); @@ -186,10 +140,6 @@ public class TopologyManagerExport { LOG.debug("exporting {}...", sref); // TODO: additional parameter Map? Collection<ExportRegistration> exportRegs = remoteServiceAdmin.exportService(sref, null); - if (exportRegs.isEmpty()) { - LOG.warn("TopologyManager: nothing was exported for {}", sref); - return; - } // process successful/failed registrations List<EndpointDescription> endpoints = new ArrayList<EndpointDescription>(); for (ExportRegistration reg : exportRegs) { @@ -198,7 +148,6 @@ public class TopologyManagerExport { LOG.info("TopologyManager: export succeeded for {}, endpoint ", sref, endpoint); endpoints.add(endpoint); } else { - // TODO: what should we do with failed exports? LOG.error("TopologyManager: export failed for {}", sref); reg.close(); } @@ -217,7 +166,6 @@ public class TopologyManagerExport { if (!endpoints.isEmpty()) { LOG.info("TopologyManager: export successful for {}, endpoints: {}", sref, endpoints); endpointRepo.addEndpoints(sref, remoteServiceAdmin, endpoints); - epListenerNotifier.notifyListeners(true, endpoints); // notify after endpoints are added to repo } } @@ -232,17 +180,5 @@ public class TopologyManagerExport { return (ref == null) ? null : ref.getExportedEndpoint(); } - private void exportExistingServices() { - try { - // cast to String is necessary for compiling against OSGi core version >= 4.3 - ServiceReference<?>[] references = bctx.getServiceReferences((String)null, DOSGI_SERVICES); - if (references != null) { - for (ServiceReference<?> sref : references) { - triggerExport(sref); - } - } - } catch (InvalidSyntaxException e) { - LOG.error("Error in filter {}. This should not occur!", DOSGI_SERVICES); - } - } + } http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/e5db6fdd/dsw/cxf-topology-manager/src/main/java/org/apache/cxf/dosgi/topologymanager/importer/EndpointListenerManager.java ---------------------------------------------------------------------- diff --git a/dsw/cxf-topology-manager/src/main/java/org/apache/cxf/dosgi/topologymanager/importer/EndpointListenerManager.java b/dsw/cxf-topology-manager/src/main/java/org/apache/cxf/dosgi/topologymanager/importer/EndpointListenerManager.java index f222606..7812e52 100644 --- a/dsw/cxf-topology-manager/src/main/java/org/apache/cxf/dosgi/topologymanager/importer/EndpointListenerManager.java +++ b/dsw/cxf-topology-manager/src/main/java/org/apache/cxf/dosgi/topologymanager/importer/EndpointListenerManager.java @@ -41,7 +41,7 @@ public class EndpointListenerManager { private final List<String> filters = new ArrayList<String>(); private final EndpointListener endpointListener; - protected EndpointListenerManager(BundleContext bc, EndpointListener endpointListener) { + public EndpointListenerManager(BundleContext bc, EndpointListener endpointListener) { this.bctx = bc; this.endpointListener = endpointListener; } http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/e5db6fdd/dsw/cxf-topology-manager/src/main/java/org/apache/cxf/dosgi/topologymanager/importer/FilterHelper.java ---------------------------------------------------------------------- diff --git a/dsw/cxf-topology-manager/src/main/java/org/apache/cxf/dosgi/topologymanager/importer/FilterHelper.java b/dsw/cxf-topology-manager/src/main/java/org/apache/cxf/dosgi/topologymanager/importer/FilterHelper.java new file mode 100644 index 0000000..3739f16 --- /dev/null +++ b/dsw/cxf-topology-manager/src/main/java/org/apache/cxf/dosgi/topologymanager/importer/FilterHelper.java @@ -0,0 +1,43 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.cxf.dosgi.topologymanager.importer; + +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +import org.osgi.framework.Constants; + +public final class FilterHelper { + private static final String OBJECTCLASS_EXPRESSION = ".*\\(" + Constants.OBJECTCLASS + "=([a-zA-Z_0-9.]+)\\).*"; + private static final Pattern OBJECTCLASS_PATTERN = Pattern.compile(OBJECTCLASS_EXPRESSION); + + private FilterHelper() { + // prevent instantiation + } + + public static String getObjectClass(String filter) { + if (filter != null) { + Matcher matcher = OBJECTCLASS_PATTERN.matcher(filter); + if (matcher.matches() && matcher.groupCount() >= 1) { + return matcher.group(1); + } + } + return null; + } +} http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/e5db6fdd/dsw/cxf-topology-manager/src/main/java/org/apache/cxf/dosgi/topologymanager/importer/ListenerHookImpl.java ---------------------------------------------------------------------- diff --git a/dsw/cxf-topology-manager/src/main/java/org/apache/cxf/dosgi/topologymanager/importer/ListenerHookImpl.java b/dsw/cxf-topology-manager/src/main/java/org/apache/cxf/dosgi/topologymanager/importer/ListenerHookImpl.java index f01754d..6d0bee1 100644 --- a/dsw/cxf-topology-manager/src/main/java/org/apache/cxf/dosgi/topologymanager/importer/ListenerHookImpl.java +++ b/dsw/cxf-topology-manager/src/main/java/org/apache/cxf/dosgi/topologymanager/importer/ListenerHookImpl.java @@ -22,7 +22,6 @@ import java.util.Collection; import java.util.HashSet; import java.util.Set; -import org.apache.cxf.dosgi.topologymanager.util.Utils; import org.osgi.framework.BundleContext; import org.osgi.framework.Constants; import org.osgi.framework.hooks.service.ListenerHook; @@ -64,7 +63,7 @@ public class ListenerHookImpl implements ListenerHook { for (ListenerInfo listenerInfo : listeners) { LOG.debug("Filter {}", listenerInfo.getFilter()); - String className = Utils.getObjectClass(listenerInfo.getFilter()); + String className = FilterHelper.getObjectClass(listenerInfo.getFilter()); if (listenerInfo.getBundleContext().getBundle().equals(bctx.getBundle())) { LOG.debug("ListenerHookImpl: skipping request from myself"); http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/e5db6fdd/dsw/cxf-topology-manager/src/main/java/org/apache/cxf/dosgi/topologymanager/importer/RSATracker.java ---------------------------------------------------------------------- diff --git a/dsw/cxf-topology-manager/src/main/java/org/apache/cxf/dosgi/topologymanager/importer/RSATracker.java b/dsw/cxf-topology-manager/src/main/java/org/apache/cxf/dosgi/topologymanager/importer/RSATracker.java new file mode 100644 index 0000000..56e98e8 --- /dev/null +++ b/dsw/cxf-topology-manager/src/main/java/org/apache/cxf/dosgi/topologymanager/importer/RSATracker.java @@ -0,0 +1,26 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.cxf.dosgi.topologymanager.importer; + +import org.osgi.service.remoteserviceadmin.RemoteServiceAdmin; + +public interface RSATracker { + void added(RemoteServiceAdmin rsa); + void removed(RemoteServiceAdmin rsa); +} http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/e5db6fdd/dsw/cxf-topology-manager/src/main/java/org/apache/cxf/dosgi/topologymanager/importer/TopologyManagerImport.java ---------------------------------------------------------------------- diff --git a/dsw/cxf-topology-manager/src/main/java/org/apache/cxf/dosgi/topologymanager/importer/TopologyManagerImport.java b/dsw/cxf-topology-manager/src/main/java/org/apache/cxf/dosgi/topologymanager/importer/TopologyManagerImport.java index 366d391..30fe4c2 100644 --- a/dsw/cxf-topology-manager/src/main/java/org/apache/cxf/dosgi/topologymanager/importer/TopologyManagerImport.java +++ b/dsw/cxf-topology-manager/src/main/java/org/apache/cxf/dosgi/topologymanager/importer/TopologyManagerImport.java @@ -22,18 +22,17 @@ import java.util.ArrayList; import java.util.Collection; import java.util.Collections; import java.util.HashMap; +import java.util.HashSet; import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.concurrent.ExecutorService; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; -import org.apache.cxf.dosgi.topologymanager.util.SimpleServiceTracker; -import org.apache.cxf.dosgi.topologymanager.util.SimpleServiceTrackerListener; import org.osgi.framework.BundleContext; -import org.osgi.framework.ServiceReference; import org.osgi.framework.hooks.service.ListenerHook; import org.osgi.service.remoteserviceadmin.EndpointDescription; import org.osgi.service.remoteserviceadmin.EndpointListener; @@ -58,7 +57,7 @@ public class TopologyManagerImport implements EndpointListener, RemoteServiceAdm private final EndpointListenerManager endpointListenerManager; private final BundleContext bctx; - private final SimpleServiceTracker<RemoteServiceAdmin> remoteServiceAdminTracker; + private Set<RemoteServiceAdmin> rsaSet; private final ListenerHookImpl listenerHook; /** @@ -88,28 +87,16 @@ public class TopologyManagerImport implements EndpointListener, RemoteServiceAdm */ private final Map<String /* filter */, List<ImportRegistration>> importedServices = new HashMap<String, List<ImportRegistration>>(); + - public TopologyManagerImport(BundleContext bc, SimpleServiceTracker<RemoteServiceAdmin> rsaTracker) { + public TopologyManagerImport(BundleContext bc) { + this.rsaSet = new HashSet<RemoteServiceAdmin>(); bctx = bc; - remoteServiceAdminTracker = rsaTracker; - remoteServiceAdminTracker.addListener(new SimpleServiceTrackerListener<RemoteServiceAdmin>() { - - public void added(ServiceReference<RemoteServiceAdmin> reference, RemoteServiceAdmin rsa) { - triggerImportsForRemoteServiceAdmin(rsa); - } - - public void modified(ServiceReference<RemoteServiceAdmin> reference, RemoteServiceAdmin rsa) { - } - - public void removed(ServiceReference<RemoteServiceAdmin> reference, RemoteServiceAdmin rsa) { - // the RSA's imports will be closed by its shutdown, so nothing to do here - } - }); endpointListenerManager = new EndpointListenerManager(bctx, this); execService = new ThreadPoolExecutor(5, 10, 50, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>()); listenerHook = new ListenerHookImpl(bc, this); } - + public void start() { bctx.registerService(RemoteServiceAdminListener.class, this, null); bctx.registerService(ListenerHook.class, listenerHook, null); @@ -192,14 +179,19 @@ public class TopologyManagerImport implements EndpointListener, RemoteServiceAdm } } - public void triggerImportsForRemoteServiceAdmin(RemoteServiceAdmin rsa) { - LOG.debug("New RemoteServiceAdmin {} detected, trying to import services with it", rsa); + public void add(RemoteServiceAdmin rsa) { + rsaSet.add(rsa); synchronized (importPossibilities) { for (String filter : importPossibilities.keySet()) { triggerImport(filter); } } } + + public void remove(RemoteServiceAdmin rsa) { + rsaSet.remove(rsa); + } + private void triggerImport(final String filter) { LOG.debug("Import of a service for filter {} was queued", filter); @@ -292,7 +284,7 @@ public class TopologyManagerImport implements EndpointListener, RemoteServiceAdm * @return import registration of the first successful import */ private ImportRegistration importService(EndpointDescription endpoint) { - for (RemoteServiceAdmin rsa : remoteServiceAdminTracker.getAllServices()) { + for (RemoteServiceAdmin rsa : rsaSet) { ImportRegistration ir = rsa.importService(endpoint); if (ir != null) { if (ir.getException() == null) { @@ -349,4 +341,5 @@ public class TopologyManagerImport implements EndpointListener, RemoteServiceAdm removeImport(null, event.getImportReference()); } } + } http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/e5db6fdd/dsw/cxf-topology-manager/src/main/java/org/apache/cxf/dosgi/topologymanager/util/SimpleServiceTracker.java ---------------------------------------------------------------------- diff --git a/dsw/cxf-topology-manager/src/main/java/org/apache/cxf/dosgi/topologymanager/util/SimpleServiceTracker.java b/dsw/cxf-topology-manager/src/main/java/org/apache/cxf/dosgi/topologymanager/util/SimpleServiceTracker.java deleted file mode 100644 index fc5b35e..0000000 --- a/dsw/cxf-topology-manager/src/main/java/org/apache/cxf/dosgi/topologymanager/util/SimpleServiceTracker.java +++ /dev/null @@ -1,151 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.cxf.dosgi.topologymanager.util; - -import java.util.ArrayList; -import java.util.List; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.CopyOnWriteArrayList; - -import org.osgi.framework.BundleContext; -import org.osgi.framework.Filter; -import org.osgi.framework.ServiceReference; -import org.osgi.util.tracker.ServiceTracker; - -/** - * A {@link ServiceTracker} extension that simplifies its usage. - * <p> - * It enhances {@code ServiceTracker} by adding: - * <ul> - * <li>Multiple event listeners for service add/remove events - * <li>Simpler event callbacks that do not need to deal with getting/ungetting - * services, calling super methods or returning service objects - * <li>Generics support, which means the callback and {@code getList()} methods - * are type-safe and require no casting - * <li>A {@link #getAllServices()} method which returns all currently tracked services; - * Unlike {@link ServiceTracker#getServices()}, if it is called from within a service - * {@link SimpleServiceTrackerListener#added added} event handler, the returned list - * will include the newly added service (this is the source of several bugs when using - * the original {@code getServices()}) - * </ul> - * - * @param <T> the service interface type - */ -public class SimpleServiceTracker<T> extends ServiceTracker<T, T> { - - // we must use a map with references as keys, so as not to invoke equals remotely on service objects - private final ConcurrentMap<ServiceReference<T>, T> services = new ConcurrentHashMap<ServiceReference<T>, T>(); - private final List<SimpleServiceTrackerListener<T>> listeners = - new CopyOnWriteArrayList<SimpleServiceTrackerListener<T>>(); - - /** - * Create a {@code SimpleServiceTracker} on the specified class name. - * <p> - * Services registered under the specified class name will be tracked by - * this {@code SimpleServiceTracker}. - * - * @param context the {@code BundleContext} against which the tracking is done - * @param clazz the class of the services to be tracked - */ - public SimpleServiceTracker(BundleContext context, Class<T> clazz) { - super(context, clazz.getName(), null); - } - - /** - * Create a {@code SimpleServiceTracker} on the specified {@code Filter} object. - * <p> - * Services which match the specified {@code Filter} object will be tracked by - * this {@code SimpleServiceTracker}. - * - * @param context the {@code BundleContext} against which the tracking is done - * @param filter The {@code Filter} to select the services to be tracked - */ - public SimpleServiceTracker(BundleContext context, Filter filter) { - super(context, filter, null); - } - - /** - * Adds a listener to be notified of services added or removed. - * - * @param listener the listener to add - */ - public void addListener(SimpleServiceTrackerListener<T> listener) { - listeners.add(listener); - } - - @Override - public T addingService(ServiceReference<T> reference) { - T service = super.addingService(reference); - services.put(reference, service); - for (SimpleServiceTrackerListener<T> listener : listeners) { - listener.added(reference, service); - } - return service; - } - - @Override - public void modifiedService(ServiceReference<T> reference, T service) { - for (SimpleServiceTrackerListener<T> listener : listeners) { - listener.modified(reference, service); - } - super.modifiedService(reference, service); - } - - @Override - public void removedService(ServiceReference<T> reference, T service) { - services.remove(reference, service); - for (SimpleServiceTrackerListener<T> listener : listeners) { - listener.removed(reference, service); - } - super.removedService(reference, service); - } - - @Override - public void close() { - super.close(); - services.clear(); - } - - /** - * Returns all currently tracked services. - * <p> - * Unlike {@link ServiceTracker#getServices()}, if it is called from within a service - * {@link SimpleServiceTrackerListener#added added} event handler, the returned list - * will include the newly added service. - * - * @return all currently tracked services - */ - public List<T> getAllServices() { - return new ArrayList<T>(services.values()); - } - - /** - * Returns all currently tracked service references. - * <p> - * Unlike {@link ServiceTracker#getServiceReferences()}, if it is called from within a service - * {@link SimpleServiceTrackerListener#added added} event handler, the returned list - * will include the newly added service reference. - * - * @return all currently tracked service references - */ - public List<ServiceReference<T>> getAllServiceReferences() { - return new ArrayList<ServiceReference<T>>(services.keySet()); - } -} http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/e5db6fdd/dsw/cxf-topology-manager/src/main/java/org/apache/cxf/dosgi/topologymanager/util/SimpleServiceTrackerListener.java ---------------------------------------------------------------------- diff --git a/dsw/cxf-topology-manager/src/main/java/org/apache/cxf/dosgi/topologymanager/util/SimpleServiceTrackerListener.java b/dsw/cxf-topology-manager/src/main/java/org/apache/cxf/dosgi/topologymanager/util/SimpleServiceTrackerListener.java deleted file mode 100644 index ef67b61..0000000 --- a/dsw/cxf-topology-manager/src/main/java/org/apache/cxf/dosgi/topologymanager/util/SimpleServiceTrackerListener.java +++ /dev/null @@ -1,54 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.cxf.dosgi.topologymanager.util; - -import org.osgi.framework.ServiceReference; - -/** - * Callback interface for notifications of services that are - * added to or removed from tracking by a {@link SimpleServiceTracker}. - * - * @param <T> the service interface type - */ -public interface SimpleServiceTrackerListener<T> { - - /** - * Called when a new service is added to the tracked services. - * - * @param reference the newly added service reference - * @param service the newly added service - */ - void added(ServiceReference<T> reference, T service); - - /** - * Called when a tracked service is modified. - * - * @param reference the modified service reference - * @param service the modified service - */ - void modified(ServiceReference<T> reference, T service); - - /** - * Called when a service is removed from the tracked services. - * - * @param reference the removed service reference - * @param service the removed service - */ - void removed(ServiceReference<T> reference, T service); -} http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/e5db6fdd/dsw/cxf-topology-manager/src/main/java/org/apache/cxf/dosgi/topologymanager/util/Utils.java ---------------------------------------------------------------------- diff --git a/dsw/cxf-topology-manager/src/main/java/org/apache/cxf/dosgi/topologymanager/util/Utils.java b/dsw/cxf-topology-manager/src/main/java/org/apache/cxf/dosgi/topologymanager/util/Utils.java deleted file mode 100644 index 021cc55..0000000 --- a/dsw/cxf-topology-manager/src/main/java/org/apache/cxf/dosgi/topologymanager/util/Utils.java +++ /dev/null @@ -1,81 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.cxf.dosgi.topologymanager.util; - -import java.util.Collection; -import java.util.regex.Matcher; -import java.util.regex.Pattern; - -import org.osgi.framework.Bundle; -import org.osgi.framework.Constants; -import org.osgi.framework.ServiceReference; - -public final class Utils { - - private static final String OBJECTCLASS_EXPRESSION = ".*\\(" + Constants.OBJECTCLASS + "=([a-zA-Z_0-9.]+)\\).*"; - private static final Pattern OBJECTCLASS_PATTERN = Pattern.compile(OBJECTCLASS_EXPRESSION); - - private Utils() { - // prevent instantiation - } - - public static String getObjectClass(String filter) { - if (filter != null) { - Matcher matcher = OBJECTCLASS_PATTERN.matcher(filter); - if (matcher.matches() && matcher.groupCount() >= 1) { - return matcher.group(1); - } - } - return null; - } - - /** - * Returns the value of a "string+" property as an array of strings. - * <p> - * A "string+" property can have a value which is either a string, - * an array of strings, or a collection of strings. - * <p> - * If the given value is not of one of the valid types, or is null, - * an empty array is returned. - * - * @param property a "string+" property value - * @return the property value as an array of strings, or an empty array - */ - public static String[] getStringPlusProperty(Object property) { - if (property instanceof String) { - return new String[] {(String)property}; - } else if (property instanceof String[]) { - return (String[])property; - } else if (property instanceof Collection) { - try { - @SuppressWarnings("unchecked") - Collection<String> strings = (Collection<String>)property; - return strings.toArray(new String[strings.size()]); - } catch (ArrayStoreException ase) { - // ignore collections with wrong type - } - } - return new String[0]; - } - - public static String getBundleName(ServiceReference<?> sref) { - Bundle bundle = sref.getBundle(); - return bundle == null ? "<unregistered>" : bundle.getSymbolicName(); - } -} http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/e5db6fdd/dsw/cxf-topology-manager/src/test/java/org/apache/cxf/dosgi/topologymanager/exporter/EndpointListenerNotifierTest.java ---------------------------------------------------------------------- diff --git a/dsw/cxf-topology-manager/src/test/java/org/apache/cxf/dosgi/topologymanager/exporter/EndpointListenerNotifierTest.java b/dsw/cxf-topology-manager/src/test/java/org/apache/cxf/dosgi/topologymanager/exporter/EndpointListenerNotifierTest.java index b5bd5d1..0e60d3e 100644 --- a/dsw/cxf-topology-manager/src/test/java/org/apache/cxf/dosgi/topologymanager/exporter/EndpointListenerNotifierTest.java +++ b/dsw/cxf-topology-manager/src/test/java/org/apache/cxf/dosgi/topologymanager/exporter/EndpointListenerNotifierTest.java @@ -18,25 +18,33 @@ */ package org.apache.cxf.dosgi.topologymanager.exporter; -import java.util.ArrayList; + + +import java.util.Arrays; import java.util.Collection; -import java.util.HashMap; +import java.util.Dictionary; +import java.util.HashSet; +import java.util.Hashtable; +import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.Set; +import org.apache.cxf.dosgi.topologymanager.exporter.EndpointListenerNotifier.NotifyType; import org.easymock.EasyMock; -import org.easymock.IAnswer; -import org.easymock.IMocksControl; +import org.junit.Assert; import org.junit.Test; -import org.osgi.framework.BundleContext; import org.osgi.framework.Filter; import org.osgi.framework.FrameworkUtil; import org.osgi.framework.InvalidSyntaxException; import org.osgi.framework.ServiceReference; import org.osgi.service.remoteserviceadmin.EndpointDescription; import org.osgi.service.remoteserviceadmin.EndpointListener; +import org.osgi.service.remoteserviceadmin.RemoteConstants; +import static org.hamcrest.core.Is.is; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertThat; @SuppressWarnings({ "rawtypes", "unchecked" @@ -45,142 +53,80 @@ public class EndpointListenerNotifierTest { @Test public void testNotifyListenersOfRemovalIfAppropriate() throws InvalidSyntaxException { - IMocksControl c = EasyMock.createNiceControl(); + EndpointDescription endpoint1 = createEndpoint("myClass"); + EndpointDescription endpoint2 = createEndpoint("notMyClass"); - BundleContext bc = c.createMock(BundleContext.class); - ServiceReference sref = c.createMock(ServiceReference.class); + // Expect listener to be called for endpoint1 but not for endpoint2 EndpointListener epl = EasyMock.createMock(EndpointListener.class); - EndpointDescription endpoint = c.createMock(EndpointDescription.class); - EndpointDescription endpoint2 = c.createMock(EndpointDescription.class); - - Map<String, Object> props = new HashMap<String, Object>(); - String[] oc = new String[1]; - oc[0] = "myClass"; - props.put("objectClass", oc); - - Map<String, Object> props2 = new HashMap<String, Object>(); - oc = new String[1]; - oc[0] = "notMyClass"; - props2.put("objectClass", oc); - - EasyMock.expect(bc.getService(EasyMock.eq(sref))).andReturn(epl).anyTimes(); - EasyMock.expect(bc.createFilter((String)EasyMock.anyObject())).andAnswer(new IAnswer<Filter>() { - public Filter answer() throws Throwable { - return FrameworkUtil.createFilter((String)EasyMock.getCurrentArguments()[0]); - } - }).anyTimes(); - EasyMock.expect(sref.getProperty(EasyMock.eq(EndpointListener.ENDPOINT_LISTENER_SCOPE))) - .andReturn("(objectClass=myClass)").anyTimes(); - - EasyMock.expect(endpoint.getProperties()).andReturn(props).anyTimes(); - EasyMock.expect(endpoint2.getProperties()).andReturn(props2).anyTimes(); - - // must only be called for the first EndpointDescription! - epl.endpointRemoved(EasyMock.eq(endpoint), EasyMock.eq("(objectClass=myClass)")); + epl.endpointRemoved(EasyMock.eq(endpoint1), EasyMock.eq("(objectClass=myClass)")); EasyMock.expectLastCall().once(); - - EndpointRepository exportRepository = EasyMock.createMock(EndpointRepository.class); - - c.replay(); EasyMock.replay(epl); - EndpointListenerNotifier tm = new EndpointListenerNotifier(bc, exportRepository); - - List<EndpointDescription> endpoints = new ArrayList<EndpointDescription>(); - endpoints.add(endpoint); - endpoints.add(endpoint2); + EndpointRepository exportRepository = new EndpointRepository(); + EndpointListenerNotifier tm = new EndpointListenerNotifier(exportRepository); - tm.notifyListener(false, sref, endpoints); + List<EndpointDescription> endpoints = Arrays.asList(endpoint1, endpoint2); + Set<Filter> filters = new HashSet<Filter>(); + filters.add(FrameworkUtil.createFilter("(objectClass=myClass)")); + tm.notifyListener(NotifyType.REMOVED, epl, filters, endpoints); - c.verify(); EasyMock.verify(epl); } + + public EndpointDescription createEndpoint(String iface) { + Map<String, Object> props = new Hashtable<String, Object>(); + props.put("objectClass", new String[]{iface}); + props.put(RemoteConstants.ENDPOINT_ID, iface); + props.put(RemoteConstants.SERVICE_IMPORTED_CONFIGS, "any"); + return new EndpointDescription(props); + } @Test public void testNormalizeScopeForSingleString() { - try { - ServiceReference sr = EasyMock.createMock(ServiceReference.class); - EasyMock.expect(sr.getProperty(EndpointListener.ENDPOINT_LISTENER_SCOPE)) - .andReturn("Filterstring"); - - Filter f = EasyMock.createNiceMock(Filter.class); - - BundleContext bc = EasyMock.createNiceMock(BundleContext.class); - EasyMock.expect(bc.createFilter((String)EasyMock.anyObject())).andReturn(f); - - EasyMock.replay(sr); - EasyMock.replay(bc); - - List<Filter> res = EndpointListenerNotifier.getFiltersFromEndpointListenerScope(sr, bc); - - assertEquals(1, res.size()); - assertEquals(f, res.get(0)); - - EasyMock.verify(sr); - EasyMock.verify(bc); - } catch (InvalidSyntaxException e) { - e.printStackTrace(); - } + ServiceReference sr = createListenerServiceWithFilter("(myProp=A)"); + Set<Filter> res = EndpointListenerNotifier.getFiltersFromEndpointListenerScope(sr); + assertEquals(1, res.size()); + Filter filter = res.iterator().next(); + filterMatches(filter); } @Test public void testNormalizeScopeForStringArray() { - try { - String[] filterStrings = {"f1", "f2", "f3"}; - - ServiceReference sr = EasyMock.createMock(ServiceReference.class); - EasyMock.expect(sr.getProperty(EndpointListener.ENDPOINT_LISTENER_SCOPE)) - .andReturn(filterStrings); - - Filter f = EasyMock.createNiceMock(Filter.class); - - BundleContext bc = EasyMock.createNiceMock(BundleContext.class); - EasyMock.expect(bc.createFilter((String)EasyMock.anyObject())).andReturn(f).times(filterStrings.length); - - EasyMock.replay(sr); - EasyMock.replay(bc); - - List<Filter> res = EndpointListenerNotifier.getFiltersFromEndpointListenerScope(sr, bc); - - assertEquals(filterStrings.length, res.size()); - assertEquals(f, res.get(0)); - - EasyMock.verify(sr); - EasyMock.verify(bc); - } catch (InvalidSyntaxException e) { - e.printStackTrace(); - } + String[] filters = {"(myProp=A)", "(otherProp=B)"}; + ServiceReference sr = createListenerServiceWithFilter(filters); + Set<Filter> res = EndpointListenerNotifier.getFiltersFromEndpointListenerScope(sr); + assertEquals(filters.length, res.size()); + Iterator<Filter> it = res.iterator(); + Filter filter1 = it.next(); + Filter filter2 = it.next(); + Dictionary<String, String> props = new Hashtable(); + props.put("myProp", "A"); + assertThat(filter1.match(props) || filter2.match(props), is(true)); } @Test public void testNormalizeScopeForCollection() { - try { - Collection<String> collection = new ArrayList<String>(); - collection.add("f1"); - collection.add("f2"); - collection.add("f3"); - - ServiceReference sr = EasyMock.createMock(ServiceReference.class); - EasyMock.expect(sr.getProperty(EndpointListener.ENDPOINT_LISTENER_SCOPE)) - .andReturn(collection); - - Filter f = EasyMock.createNiceMock(Filter.class); - - BundleContext bc = EasyMock.createNiceMock(BundleContext.class); - EasyMock.expect(bc.createFilter((String)EasyMock.anyObject())).andReturn(f).times(collection.size()); - - EasyMock.replay(sr); - EasyMock.replay(bc); - - List<Filter> res = EndpointListenerNotifier.getFiltersFromEndpointListenerScope(sr, bc); - - assertEquals(collection.size(), res.size()); - assertEquals(f, res.get(0)); + Collection<String> collection = Arrays.asList("(myProp=A)", "(otherProp=B)"); + ServiceReference sr = createListenerServiceWithFilter(collection); + Set<Filter> res = EndpointListenerNotifier.getFiltersFromEndpointListenerScope(sr); + Iterator<Filter> it = res.iterator(); + Filter filter1 = it.next(); + Filter filter2 = it.next(); + Dictionary<String, String> props = new Hashtable(); + props.put("myProp", "A"); + Assert.assertThat(filter1.match(props) || filter2.match(props), is(true)); + } + + private void filterMatches(Filter filter) { + Dictionary<String, String> props = new Hashtable(); + props.put("myProp", "A"); + Assert.assertTrue("Filter should match", filter.match(props)); + } - EasyMock.verify(sr); - EasyMock.verify(bc); - } catch (InvalidSyntaxException e) { - e.printStackTrace(); - } + private ServiceReference createListenerServiceWithFilter(Object filters) { + ServiceReference sr = EasyMock.createMock(ServiceReference.class); + EasyMock.expect(sr.getProperty(EndpointListener.ENDPOINT_LISTENER_SCOPE)).andReturn(filters); + EasyMock.replay(sr); + return sr; } } http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/e5db6fdd/dsw/cxf-topology-manager/src/test/java/org/apache/cxf/dosgi/topologymanager/exporter/ExportServiceTest.java ---------------------------------------------------------------------- diff --git a/dsw/cxf-topology-manager/src/test/java/org/apache/cxf/dosgi/topologymanager/exporter/ExportServiceTest.java b/dsw/cxf-topology-manager/src/test/java/org/apache/cxf/dosgi/topologymanager/exporter/ExportServiceTest.java deleted file mode 100644 index cb4c407..0000000 --- a/dsw/cxf-topology-manager/src/test/java/org/apache/cxf/dosgi/topologymanager/exporter/ExportServiceTest.java +++ /dev/null @@ -1,143 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.cxf.dosgi.topologymanager.exporter; - -import java.util.Collections; -import java.util.HashMap; -import java.util.Map; - -import org.apache.cxf.dosgi.topologymanager.util.SimpleServiceTracker; -import org.apache.cxf.dosgi.topologymanager.util.SimpleServiceTrackerListener; -import org.easymock.EasyMock; -import org.easymock.IAnswer; -import org.easymock.IMocksControl; -import org.junit.Test; -import org.osgi.framework.Bundle; -import org.osgi.framework.BundleContext; -import org.osgi.framework.Constants; -import org.osgi.framework.ServiceEvent; -import org.osgi.framework.ServiceListener; -import org.osgi.framework.ServiceReference; -import org.osgi.service.remoteserviceadmin.EndpointDescription; -import org.osgi.service.remoteserviceadmin.ExportReference; -import org.osgi.service.remoteserviceadmin.ExportRegistration; -import org.osgi.service.remoteserviceadmin.RemoteConstants; -import org.osgi.service.remoteserviceadmin.RemoteServiceAdmin; - -@SuppressWarnings({ - "rawtypes", "unchecked" - }) -public class ExportServiceTest { - - /** - * This tests if the topology manager handles a service marked to be - * exported correctly by exporting it to an available RemoteServiceAdmin - * and notifying an EndpointListener afterwards. - * - * @throws Exception - */ - @Test - public void testServiceExport() throws Exception { - IMocksControl c = EasyMock.createControl(); - - BundleContext bctx = c.createMock(BundleContext.class); - RemoteServiceAdmin rsa = c.createMock(RemoteServiceAdmin.class); - final EndpointListenerNotifier mockEpListenerNotifier = c.createMock(EndpointListenerNotifier.class); - mockEpListenerNotifier.start(); - EasyMock.expectLastCall().once(); - - final ServiceReference sref = createUserServiceBundle(c); - - EasyMock - .expect(bctx.getServiceReferences(EasyMock.<String> anyObject(), EasyMock.<String> anyObject())) - .andReturn(null).atLeastOnce(); - - SimpleServiceTracker<RemoteServiceAdmin> rsaTracker = createSingleRsaTracker(c, rsa); - - EndpointDescription endpoint = createEndpoint(c); - ExportRegistration exportRegistration = createExportRegistration(c, endpoint); - - // Main assertions - simulateUserServicePublished(bctx, sref); - EasyMock.expect(rsa.exportService(EasyMock.same(sref), (Map<String, Object>)EasyMock.anyObject())) - .andReturn(Collections.singletonList(exportRegistration)).once(); - mockEpListenerNotifier.notifyListeners(EasyMock.eq(true), EasyMock.eq(Collections.singletonList(endpoint))); - EasyMock.expectLastCall().once(); - - c.replay(); - - TopologyManagerExport topManager = new TopologyManagerExport(bctx, rsaTracker, mockEpListenerNotifier) { - // override to perform export from the same thread rather than asynchronously - @Override - protected void triggerExport(ServiceReference sref) { - doExportService(sref); - } - }; - topManager.start(); - c.verify(); - } - - private void simulateUserServicePublished(BundleContext bctx, final ServiceReference sref) { - bctx.addServiceListener((ServiceListener)EasyMock.anyObject()); - EasyMock.expectLastCall().andAnswer(new IAnswer<Object>() { - public Object answer() throws Throwable { - System.out.println("Simulating publishing the user service"); - ServiceListener sl = (ServiceListener)EasyMock.getCurrentArguments()[0]; - ServiceEvent se = new ServiceEvent(ServiceEvent.REGISTERED, sref); - sl.serviceChanged(se); - return null; - } - }).once(); - } - - private SimpleServiceTracker<RemoteServiceAdmin> createSingleRsaTracker(IMocksControl c, RemoteServiceAdmin rsa) { - SimpleServiceTracker<RemoteServiceAdmin> rsaTracker = c.createMock(SimpleServiceTracker.class); - rsaTracker.addListener(EasyMock.<SimpleServiceTrackerListener> anyObject()); - EasyMock.expectLastCall().once(); - EasyMock.expect(rsaTracker.getAllServices()).andReturn(Collections.singletonList(rsa)); - return rsaTracker; - } - - private ExportRegistration createExportRegistration(IMocksControl c, EndpointDescription endpoint) { - ExportRegistration exportRegistration = c.createMock(ExportRegistration.class); - ExportReference exportReference = c.createMock(ExportReference.class); - EasyMock.expect(exportRegistration.getExportReference()).andReturn(exportReference).anyTimes(); - EasyMock.expect(exportRegistration.getException()).andReturn(null).anyTimes(); - EasyMock.expect(exportReference.getExportedEndpoint()).andReturn(endpoint).anyTimes(); - return exportRegistration; - } - - private EndpointDescription createEndpoint(IMocksControl c) { - Map<String, Object> props = new HashMap<String, Object>(); - props.put(RemoteConstants.ENDPOINT_ID, "1"); - props.put(Constants.OBJECTCLASS, new String[] {"abc"}); - props.put(RemoteConstants.SERVICE_IMPORTED_CONFIGS, "cxf"); - return new EndpointDescription(props); - } - - private ServiceReference createUserServiceBundle(IMocksControl c) { - final ServiceReference sref = c.createMock(ServiceReference.class); - EasyMock.expect(sref.getProperty(EasyMock.same(RemoteConstants.SERVICE_EXPORTED_INTERFACES))) - .andReturn("*").anyTimes(); - Bundle srefBundle = c.createMock(Bundle.class); - EasyMock.expect(sref.getBundle()).andReturn(srefBundle).atLeastOnce(); - EasyMock.expect(srefBundle.getSymbolicName()).andReturn("serviceBundleName").atLeastOnce(); - return sref; - } -} http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/e5db6fdd/dsw/cxf-topology-manager/src/test/java/org/apache/cxf/dosgi/topologymanager/exporter/TopologyManagerExportTest.java ---------------------------------------------------------------------- diff --git a/dsw/cxf-topology-manager/src/test/java/org/apache/cxf/dosgi/topologymanager/exporter/TopologyManagerExportTest.java b/dsw/cxf-topology-manager/src/test/java/org/apache/cxf/dosgi/topologymanager/exporter/TopologyManagerExportTest.java new file mode 100644 index 0000000..cb9dc36 --- /dev/null +++ b/dsw/cxf-topology-manager/src/test/java/org/apache/cxf/dosgi/topologymanager/exporter/TopologyManagerExportTest.java @@ -0,0 +1,139 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.cxf.dosgi.topologymanager.exporter; + +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.Executor; + +import org.apache.cxf.dosgi.topologymanager.exporter.EndpointListenerNotifier.NotifyType; +import org.easymock.EasyMock; +import org.easymock.IMocksControl; +import org.junit.Test; +import org.osgi.framework.Bundle; +import org.osgi.framework.Constants; +import org.osgi.framework.ServiceEvent; +import org.osgi.framework.ServiceReference; +import org.osgi.service.remoteserviceadmin.EndpointDescription; +import org.osgi.service.remoteserviceadmin.ExportReference; +import org.osgi.service.remoteserviceadmin.ExportRegistration; +import org.osgi.service.remoteserviceadmin.RemoteConstants; +import org.osgi.service.remoteserviceadmin.RemoteServiceAdmin; + +@SuppressWarnings({"rawtypes", "unchecked"}) +public class TopologyManagerExportTest { + + /** + * This tests if the topology manager handles a service marked to be exported correctly by exporting it to + * an available RemoteServiceAdmin and notifying an EndpointListener afterwards. + * + * @throws Exception + */ + @Test + public void testServiceExportUnexport() throws Exception { + IMocksControl c = EasyMock.createControl(); + RemoteServiceAdmin rsa = c.createMock(RemoteServiceAdmin.class); + final EndpointListenerNotifier mockEpListenerNotifier = c.createMock(EndpointListenerNotifier.class); + final ServiceReference sref = createUserService(c); + EndpointDescription epd = createEndpoint(); + expectServiceExported(c, rsa, mockEpListenerNotifier, sref, epd); + c.replay(); + EndpointRepository endpointRepo = new EndpointRepository(); + endpointRepo.setNotifier(mockEpListenerNotifier); + Executor executor = syncExecutor(); + TopologyManagerExport exportManager = new TopologyManagerExport(endpointRepo, executor); + exportManager.add(rsa); + exportManager.serviceChanged(new ServiceEvent(ServiceEvent.REGISTERED, sref)); + c.verify(); + + c.reset(); + mockEpListenerNotifier.notifyListeners(EasyMock.eq(NotifyType.REMOVED), + EasyMock.eq(Collections.singletonList(epd))); + EasyMock.expectLastCall().once(); + c.replay(); + exportManager.serviceChanged(new ServiceEvent(ServiceEvent.UNREGISTERING, sref)); + c.verify(); + } + + @Test + public void testExportExisting() throws Exception { + IMocksControl c = EasyMock.createControl(); + RemoteServiceAdmin rsa = c.createMock(RemoteServiceAdmin.class); + final EndpointListenerNotifier mockEpListenerNotifier = c.createMock(EndpointListenerNotifier.class); + final ServiceReference sref = createUserService(c); + expectServiceExported(c, rsa, mockEpListenerNotifier, sref, createEndpoint()); + c.replay(); + + EndpointRepository endpointRepo = new EndpointRepository(); + endpointRepo.setNotifier(mockEpListenerNotifier); + TopologyManagerExport exportManager = new TopologyManagerExport(endpointRepo, syncExecutor()); + exportManager.export(sref); + exportManager.add(rsa); + c.verify(); + } + + private void expectServiceExported(IMocksControl c, RemoteServiceAdmin rsa, + final EndpointListenerNotifier mockEpListenerNotifier, + final ServiceReference sref, EndpointDescription epd) { + ExportRegistration exportRegistration = createExportRegistration(c, epd); + EasyMock.expect(rsa.exportService(EasyMock.same(sref), (Map<String, Object>)EasyMock.anyObject())) + .andReturn(Collections.singletonList(exportRegistration)).once(); + mockEpListenerNotifier.notifyListeners(EasyMock.eq(NotifyType.ADDED), + EasyMock.eq(Collections.singletonList(epd))); + EasyMock.expectLastCall().once(); + } + + private Executor syncExecutor() { + return new Executor() { + @Override + public void execute(Runnable command) { + command.run(); + } + }; + } + + private ExportRegistration createExportRegistration(IMocksControl c, EndpointDescription endpoint) { + ExportRegistration exportRegistration = c.createMock(ExportRegistration.class); + ExportReference exportReference = c.createMock(ExportReference.class); + EasyMock.expect(exportRegistration.getExportReference()).andReturn(exportReference).anyTimes(); + EasyMock.expect(exportRegistration.getException()).andReturn(null).anyTimes(); + EasyMock.expect(exportReference.getExportedEndpoint()).andReturn(endpoint).anyTimes(); + return exportRegistration; + } + + private EndpointDescription createEndpoint() { + Map<String, Object> props = new HashMap<String, Object>(); + props.put(RemoteConstants.ENDPOINT_ID, "1"); + props.put(Constants.OBJECTCLASS, new String[] {"abc"}); + props.put(RemoteConstants.SERVICE_IMPORTED_CONFIGS, "cxf"); + return new EndpointDescription(props); + } + + private ServiceReference createUserService(IMocksControl c) { + final ServiceReference sref = c.createMock(ServiceReference.class); + EasyMock.expect(sref.getProperty(EasyMock.same(RemoteConstants.SERVICE_EXPORTED_INTERFACES))) + .andReturn("*").anyTimes(); + Bundle srefBundle = c.createMock(Bundle.class); + EasyMock.expect(sref.getBundle()).andReturn(srefBundle).atLeastOnce(); + EasyMock.expect(sref.getProperty("objectClass")).andReturn("org.My").anyTimes(); + EasyMock.expect(srefBundle.getSymbolicName()).andReturn("serviceBundleName").atLeastOnce(); + return sref; + } +}
