http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/5f4c6604/topology-manager/src/main/java/org/apache/aries/rsa/topologymanager/Activator.java ---------------------------------------------------------------------- diff --git a/topology-manager/src/main/java/org/apache/aries/rsa/topologymanager/Activator.java b/topology-manager/src/main/java/org/apache/aries/rsa/topologymanager/Activator.java new file mode 100644 index 0000000..02d9674 --- /dev/null +++ b/topology-manager/src/main/java/org/apache/aries/rsa/topologymanager/Activator.java @@ -0,0 +1,192 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.aries.rsa.topologymanager; + +import java.util.Dictionary; +import java.util.Hashtable; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; + +import org.apache.aries.rsa.spi.ExportPolicy; +import org.apache.aries.rsa.topologymanager.exporter.DefaultExportPolicy; +import org.apache.aries.rsa.topologymanager.exporter.EndpointListenerNotifier; +import org.apache.aries.rsa.topologymanager.exporter.EndpointRepository; +import org.apache.aries.rsa.topologymanager.exporter.TopologyManagerExport; +import org.apache.aries.rsa.topologymanager.importer.TopologyManagerImport; +import org.osgi.framework.BundleActivator; +import org.osgi.framework.BundleContext; +import org.osgi.framework.Filter; +import org.osgi.framework.FrameworkUtil; +import org.osgi.framework.InvalidSyntaxException; +import org.osgi.framework.ServiceEvent; +import org.osgi.framework.ServiceReference; +import org.osgi.service.remoteserviceadmin.EndpointListener; +import org.osgi.service.remoteserviceadmin.RemoteConstants; +import org.osgi.service.remoteserviceadmin.RemoteServiceAdmin; +import org.osgi.util.tracker.ServiceTracker; +import org.osgi.util.tracker.ServiceTrackerCustomizer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class Activator implements BundleActivator { + public static final String RSA_EXPORT_POLICY_FILTER = "rsa.export.policy.filter"; + static final String DOSGI_SERVICES = "(" + RemoteConstants.SERVICE_EXPORTED_INTERFACES + "=*)"; + private static final Logger LOG = LoggerFactory.getLogger(Activator.class); + + private TopologyManagerExport exportManager; + private TopologyManagerImport importManager; + private EndpointListenerNotifier notifier; + private ServiceTracker<RemoteServiceAdmin, RemoteServiceAdmin> rsaTracker; + private ThreadPoolExecutor exportExecutor; + private ServiceTracker<EndpointListener, EndpointListener> epListenerTracker; + private ServiceTracker<ExportPolicy, ExportPolicy> policyTracker; + + public void start(final BundleContext bc) throws Exception { + Dictionary<String, String> props = new Hashtable<String, String>(); + props.put("name", "default"); + bc.registerService(ExportPolicy.class, new DefaultExportPolicy(), props); + + Filter policyFilter = exportPolicyFilter(bc); + policyTracker = new ServiceTracker<ExportPolicy, ExportPolicy>(bc, policyFilter, null) { + + @Override + public ExportPolicy addingService(ServiceReference<ExportPolicy> reference) { + ExportPolicy policy = super.addingService(reference); + if (exportManager == null) { + doStart(bc, policy); + } + return policy; + } + + @Override + public void removedService(ServiceReference<ExportPolicy> reference, ExportPolicy service) { + if (exportManager != null) { + doStop(bc); + } + super.removedService(reference, service); + } + }; + policyTracker.open(); + } + + private Filter exportPolicyFilter(BundleContext bc) throws InvalidSyntaxException { + String filter = bc.getProperty(RSA_EXPORT_POLICY_FILTER); + if (filter == null) { + filter = "(name=default)"; + } + return FrameworkUtil.createFilter(String.format("(&(objectClass=%s)%s)", ExportPolicy.class.getName(), filter)); + } + + public void doStart(final BundleContext bc, ExportPolicy policy) { + LOG.debug("TopologyManager: start()"); + EndpointRepository endpointRepo = new EndpointRepository(); + notifier = new EndpointListenerNotifier(endpointRepo); + epListenerTracker = new EndpointListenerTracker(bc); + endpointRepo.setNotifier(notifier); + exportExecutor = new ThreadPoolExecutor(5, 10, 50, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>()); + exportManager = new TopologyManagerExport(endpointRepo, exportExecutor, policy); + importManager = new TopologyManagerImport(bc); + rsaTracker = new RSATracker(bc, RemoteServiceAdmin.class, null); + bc.addServiceListener(exportManager); + rsaTracker.open(); + epListenerTracker.open(); + exportExistingServices(bc); + importManager.start(); + } + + public void stop(BundleContext bc) throws Exception { + policyTracker.close(); + } + + public void doStop(BundleContext bc) { + LOG.debug("TopologyManager: stop()"); + epListenerTracker.close(); + bc.removeServiceListener(exportManager); + exportExecutor.shutdown(); + importManager.stop(); + rsaTracker.close(); + exportManager = null; + } + + public void exportExistingServices(BundleContext context) { + try { + // cast to String is necessary for compiling against OSGi core version >= 4.3 + ServiceReference<?>[] references = context.getServiceReferences((String)null, DOSGI_SERVICES); + if (references != null) { + for (ServiceReference<?> sref : references) { + exportManager.serviceChanged(new ServiceEvent(ServiceEvent.REGISTERED, sref)); + } + } + } catch (InvalidSyntaxException e) { + LOG.error("Error in filter {}. This should not occur!", DOSGI_SERVICES); + } + } + + private final class EndpointListenerTracker extends ServiceTracker<EndpointListener, EndpointListener> { + private EndpointListenerTracker(BundleContext context) { + super(context, EndpointListener.class, null); + } + + @Override + public EndpointListener addingService(ServiceReference<EndpointListener> reference) { + EndpointListener listener = super.addingService(reference); + notifier.add(listener, EndpointListenerNotifier.getFiltersFromEndpointListenerScope(reference)); + return listener; + } + + @Override + public void modifiedService(ServiceReference<EndpointListener> reference, + EndpointListener listener) { + super.modifiedService(reference, listener); + notifier.add(listener, EndpointListenerNotifier.getFiltersFromEndpointListenerScope(reference)); + } + + @Override + public void removedService(ServiceReference<EndpointListener> reference, + EndpointListener listener) { + notifier.remove(listener); + super.removedService(reference, listener); + } + } + + private final class RSATracker extends ServiceTracker<RemoteServiceAdmin, RemoteServiceAdmin> { + private RSATracker(BundleContext context, Class<RemoteServiceAdmin> clazz, + ServiceTrackerCustomizer<RemoteServiceAdmin, RemoteServiceAdmin> customizer) { + super(context, clazz, customizer); + } + + @Override + public RemoteServiceAdmin addingService(ServiceReference<RemoteServiceAdmin> reference) { + RemoteServiceAdmin rsa = super.addingService(reference); + LOG.debug("New RemoteServiceAdmin {} detected, trying to import and export services with it", rsa); + importManager.add(rsa); + exportManager.add(rsa); + return rsa; + } + + @Override + public void removedService(ServiceReference<RemoteServiceAdmin> reference, + RemoteServiceAdmin rsa) { + exportManager.remove(rsa); + importManager.remove(rsa); + super.removedService(reference, rsa); + } + } +}
http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/5f4c6604/topology-manager/src/main/java/org/apache/aries/rsa/topologymanager/exporter/DefaultExportPolicy.java ---------------------------------------------------------------------- diff --git a/topology-manager/src/main/java/org/apache/aries/rsa/topologymanager/exporter/DefaultExportPolicy.java b/topology-manager/src/main/java/org/apache/aries/rsa/topologymanager/exporter/DefaultExportPolicy.java new file mode 100644 index 0000000..d867ccf --- /dev/null +++ b/topology-manager/src/main/java/org/apache/aries/rsa/topologymanager/exporter/DefaultExportPolicy.java @@ -0,0 +1,37 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.aries.rsa.topologymanager.exporter; + +import java.util.HashMap; +import java.util.Map; + +import org.apache.aries.rsa.spi.ExportPolicy; +import org.osgi.framework.ServiceReference; + +/** + * The default is to not customize the way services are exported + */ +public class DefaultExportPolicy implements ExportPolicy { + + @Override + public Map<String, ?> additionalParameters(ServiceReference<?> sref) { + return new HashMap<String, Object>(); + } + +} http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/5f4c6604/topology-manager/src/main/java/org/apache/aries/rsa/topologymanager/exporter/EndpointListenerNotifier.java ---------------------------------------------------------------------- diff --git a/topology-manager/src/main/java/org/apache/aries/rsa/topologymanager/exporter/EndpointListenerNotifier.java b/topology-manager/src/main/java/org/apache/aries/rsa/topologymanager/exporter/EndpointListenerNotifier.java new file mode 100644 index 0000000..9b0386a --- /dev/null +++ b/topology-manager/src/main/java/org/apache/aries/rsa/topologymanager/exporter/EndpointListenerNotifier.java @@ -0,0 +1,135 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.aries.rsa.topologymanager.exporter; + +import java.util.Dictionary; +import java.util.HashSet; +import java.util.Hashtable; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; + +import org.apache.aries.rsa.util.StringPlus; +import org.osgi.framework.Filter; +import org.osgi.framework.FrameworkUtil; +import org.osgi.framework.InvalidSyntaxException; +import org.osgi.framework.ServiceReference; +import org.osgi.service.remoteserviceadmin.EndpointDescription; +import org.osgi.service.remoteserviceadmin.EndpointListener; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Tracks EndpointListeners and allows to notify them of endpoints. + */ +public class EndpointListenerNotifier implements EndpointListener { + private static final Logger LOG = LoggerFactory.getLogger(EndpointListenerNotifier.class); + private enum NotifyType { ADDED, REMOVED }; + private Map<EndpointListener, Set<Filter>> listeners; + private EndpointRepository endpointRepo; + + public EndpointListenerNotifier(final EndpointRepository endpointRepo) { + this.endpointRepo = endpointRepo; + this.listeners = new ConcurrentHashMap<EndpointListener, Set<Filter>>(); + } + + public static Set<Filter> getFiltersFromEndpointListenerScope(ServiceReference<EndpointListener> sref) { + Set<Filter> filters = new HashSet<Filter>(); + List<String> scopes = StringPlus.normalize(sref.getProperty(EndpointListener.ENDPOINT_LISTENER_SCOPE)); + for (String scope : scopes) { + try { + filters.add(FrameworkUtil.createFilter(scope)); + } catch (InvalidSyntaxException e) { + LOG.error("invalid endpoint listener scope: {}", scope, e); + } + } + return filters; + } + + public void add(EndpointListener ep, Set<Filter> filters) { + LOG.debug("new EndpointListener detected"); + listeners.put(ep, filters); + for (EndpointDescription endpoint : endpointRepo.getAllEndpoints()) { + notifyListener(NotifyType.ADDED, ep, filters, endpoint); + } + } + + public void remove(EndpointListener ep) { + LOG.debug("EndpointListener modified"); + listeners.remove(ep); + } + + @Override + public void endpointAdded(EndpointDescription endpoint, String matchedFilter) { + notifyListeners(NotifyType.ADDED, endpoint); + } + + @Override + public void endpointRemoved(EndpointDescription endpoint, String matchedFilter) { + notifyListeners(NotifyType.REMOVED, endpoint); + } + + /** + * Notifies all endpoint listeners about endpoints being added or removed. + * + * @param added specifies whether endpoints were added (true) or removed (false) + * @param endpoints the endpoints the listeners should be notified about + */ + private void notifyListeners(NotifyType type, EndpointDescription endpoint) { + for (EndpointListener listener : listeners.keySet()) { + notifyListener(type, listener, listeners.get(listener), endpoint); + } + } + + /** + * Notifies an endpoint listener about endpoints being added or removed. + * + * @param type specifies whether endpoints were added (true) or removed (false) + * @param endpointListenerRef the ServiceReference of an EndpointListener to notify + * @param endpoints the endpoints the listener should be notified about + */ + private void notifyListener(NotifyType type, EndpointListener listener, Set<Filter> filters, + EndpointDescription endpoint) { + LOG.debug("Endpoint {}", type); + Set<Filter> matchingFilters = getMatchingFilters(filters, endpoint); + for (Filter filter : matchingFilters) { + if (type == NotifyType.ADDED) { + listener.endpointAdded(endpoint, filter.toString()); + } else { + listener.endpointRemoved(endpoint, filter.toString()); + } + } + } + + private static Set<Filter> getMatchingFilters(Set<Filter> filters, EndpointDescription endpoint) { + Set<Filter> matchingFilters = new HashSet<Filter>(); + Dictionary<String, Object> dict = new Hashtable<String, Object>(endpoint.getProperties()); + for (Filter filter : filters) { + if (filter.match(dict)) { + LOG.debug("Filter {} matches endpoint {}", filter, dict); + matchingFilters.add(filter); + } else { + LOG.trace("Filter {} does not match endpoint {}", filter, dict); + } + } + return matchingFilters; + } + +} http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/5f4c6604/topology-manager/src/main/java/org/apache/aries/rsa/topologymanager/exporter/EndpointRepository.java ---------------------------------------------------------------------- diff --git a/topology-manager/src/main/java/org/apache/aries/rsa/topologymanager/exporter/EndpointRepository.java b/topology-manager/src/main/java/org/apache/aries/rsa/topologymanager/exporter/EndpointRepository.java new file mode 100644 index 0000000..71a9a29 --- /dev/null +++ b/topology-manager/src/main/java/org/apache/aries/rsa/topologymanager/exporter/EndpointRepository.java @@ -0,0 +1,140 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.aries.rsa.topologymanager.exporter; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashSet; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import org.osgi.framework.ServiceReference; +import org.osgi.service.remoteserviceadmin.EndpointDescription; +import org.osgi.service.remoteserviceadmin.EndpointListener; +import org.osgi.service.remoteserviceadmin.RemoteServiceAdmin; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Holds all endpoints that are exported by a TopologyManager. For each ServiceReference that is exported a + * map is maintained which contains information on the endpoints for each RemoteAdminService that created the + * endpoints. + */ +@SuppressWarnings("rawtypes") +public class EndpointRepository { + + private static final Logger LOG = LoggerFactory.getLogger(EndpointRepository.class); + + private final Map<ServiceReference, Map<RemoteServiceAdmin, Collection<EndpointDescription>>> exportedServices + = new LinkedHashMap<ServiceReference, Map<RemoteServiceAdmin, Collection<EndpointDescription>>>(); + + private EndpointListener notifier; + + public void setNotifier(EndpointListener notifier) { + this.notifier = notifier; + } + + + /** + * Remove all services exported by the given rsa. + * + * @param rsa the RemoteServiceAdmin to remove + * @return list of removed endpoints + */ + public synchronized List<EndpointDescription> removeRemoteServiceAdmin(RemoteServiceAdmin rsa) { + LOG.debug("RemoteServiceAdmin removed: {}", rsa.getClass().getName()); + List<EndpointDescription> removedEndpoints = new ArrayList<EndpointDescription>(); + for (Map<RemoteServiceAdmin, Collection<EndpointDescription>> exports : exportedServices.values()) { + Collection<EndpointDescription> endpoints = exports.get(rsa); + if (endpoints != null) { + removedEndpoints.addAll(endpoints); + exports.remove(rsa); + } + } + endpointsRemoved(removedEndpoints); + return removedEndpoints; + } + + public synchronized void removeService(ServiceReference sref) { + List<EndpointDescription> removedEndpoints = new ArrayList<EndpointDescription>(); + Map<RemoteServiceAdmin, Collection<EndpointDescription>> rsaToEndpoints = exportedServices.get(sref); + if (rsaToEndpoints != null) { + for (Collection<EndpointDescription> endpoints : rsaToEndpoints.values()) { + removedEndpoints.addAll(endpoints); + } + exportedServices.remove(sref); + } + endpointsRemoved(removedEndpoints); + } + + public synchronized void addService(ServiceReference sref) { + if (!exportedServices.containsKey(sref)) { + LOG.info("Marking service from bundle {} for export", sref.getBundle().getSymbolicName()); + exportedServices.put(sref, new LinkedHashMap<RemoteServiceAdmin, Collection<EndpointDescription>>()); + } + } + + public synchronized void addEndpoints(ServiceReference sref, RemoteServiceAdmin rsa, + List<EndpointDescription> endpoints) { + addService(sref); + Map<RemoteServiceAdmin, Collection<EndpointDescription>> exports = exportedServices.get(sref); + exports.put(rsa, endpoints); + endpointsAdded(endpoints); + } + + synchronized boolean isAlreadyExportedForRsa(ServiceReference sref, RemoteServiceAdmin rsa) { + Map<RemoteServiceAdmin, Collection<EndpointDescription>> exports = exportedServices.get(sref); + return exports != null && exports.containsKey(rsa); + } + + public synchronized Collection<EndpointDescription> getAllEndpoints() { + List<EndpointDescription> allEndpoints = new ArrayList<EndpointDescription>(); + for (Map<RemoteServiceAdmin, Collection<EndpointDescription>> exports : exportedServices.values()) { + for (Collection<EndpointDescription> endpoints : exports.values()) { + allEndpoints.addAll(endpoints); + } + } + return allEndpoints; + } + + public synchronized Set<ServiceReference> getServicesToBeExportedFor(RemoteServiceAdmin rsa) { + Set<ServiceReference> servicesToBeExported = new HashSet<ServiceReference>(); + for (Map.Entry<ServiceReference, Map<RemoteServiceAdmin, Collection<EndpointDescription>>> entry + : exportedServices.entrySet()) { + if (!entry.getValue().containsKey(rsa)) { + servicesToBeExported.add(entry.getKey()); + } + } + return servicesToBeExported; + } + + private void endpointsAdded(List<EndpointDescription> endpoints) { + for (EndpointDescription epd : endpoints) { + notifier.endpointAdded(epd, null); + } + } + + private void endpointsRemoved(List<EndpointDescription> endpoints) { + for (EndpointDescription epd : endpoints) { + notifier.endpointRemoved(epd, null); + } + } +} http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/5f4c6604/topology-manager/src/main/java/org/apache/aries/rsa/topologymanager/exporter/TopologyManagerExport.java ---------------------------------------------------------------------- diff --git a/topology-manager/src/main/java/org/apache/aries/rsa/topologymanager/exporter/TopologyManagerExport.java b/topology-manager/src/main/java/org/apache/aries/rsa/topologymanager/exporter/TopologyManagerExport.java new file mode 100644 index 0000000..0557116 --- /dev/null +++ b/topology-manager/src/main/java/org/apache/aries/rsa/topologymanager/exporter/TopologyManagerExport.java @@ -0,0 +1,195 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.aries.rsa.topologymanager.exporter; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.Executor; + +import org.apache.aries.rsa.spi.ExportPolicy; +import org.osgi.framework.Bundle; +import org.osgi.framework.ServiceEvent; +import org.osgi.framework.ServiceListener; +import org.osgi.framework.ServiceReference; +import org.osgi.service.remoteserviceadmin.EndpointDescription; +import org.osgi.service.remoteserviceadmin.ExportReference; +import org.osgi.service.remoteserviceadmin.ExportRegistration; +import org.osgi.service.remoteserviceadmin.RemoteConstants; +import org.osgi.service.remoteserviceadmin.RemoteServiceAdmin; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Manages exported endpoints of DOSGi services and notifies EndpointListeners of changes. + * + * <li> Tracks local RemoteServiceAdmin instances by using a ServiceTracker + * <li> Uses a ServiceListener to track local OSGi services + * <li> When a service is published that is supported by DOSGi the + * known RemoteServiceAdmins are instructed to export the service and + * the EndpointListeners are notified + * <li> When a service is unpublished the EndpointListeners are notified. + * The endpoints are not closed as the ExportRegistration takes care of this + */ +public class TopologyManagerExport implements ServiceListener { + private static final Logger LOG = LoggerFactory.getLogger(TopologyManagerExport.class); + + private final Executor execService; + private final EndpointRepository endpointRepo; + private ExportPolicy policy; + private final Set<RemoteServiceAdmin> rsaSet; + + + public TopologyManagerExport(final EndpointRepository endpointRepo, Executor executor, ExportPolicy policy) { + this.endpointRepo = endpointRepo; + this.policy = policy; + this.rsaSet = new HashSet<RemoteServiceAdmin>(); + this.execService = executor; + } + + // track all service registrations so we can export any services that are configured to be exported + // ServiceListener events may be delivered out of order, concurrently, re-entrant, etc. (see spec or docs) + public void serviceChanged(ServiceEvent event) { + ServiceReference<?> sref = event.getServiceReference(); + if (event.getType() == ServiceEvent.REGISTERED) { + LOG.debug("Received REGISTERED ServiceEvent: {}", event); + export(sref); + } else if (event.getType() == ServiceEvent.UNREGISTERING) { + LOG.debug("Received UNREGISTERING ServiceEvent: {}", event); + endpointRepo.removeService(sref); + } + } + + public void add(RemoteServiceAdmin rsa) { + rsaSet.add(rsa); + for (ServiceReference<?> serviceRef : endpointRepo.getServicesToBeExportedFor(rsa)) { + export(serviceRef); + } + }; + + public void remove(RemoteServiceAdmin rsa) { + rsaSet.remove(rsa); + endpointRepo.removeRemoteServiceAdmin(rsa); + }; + + private void export(final ServiceReference<?> sref) { + execService.execute(new Runnable() { + public void run() { + doExport(sref); + } + }); + } + + private void doExport(final ServiceReference<?> sref) { + Map<String, ?> addProps = policy.additionalParameters(sref); + if (!shouldExport(sref, addProps)) { + LOG.debug("Skipping service {}", sref); + return; + } + LOG.debug("Exporting service {}", sref); + endpointRepo.addService(sref); // mark for future export even if there are currently no RSAs + if (rsaSet.size() == 0) { + LOG.error("No RemoteServiceAdmin available! Unable to export service from bundle {}, interfaces: {}", + getSymbolicName(sref.getBundle()), + sref.getProperty(org.osgi.framework.Constants.OBJECTCLASS)); + return; + } + + for (RemoteServiceAdmin remoteServiceAdmin : rsaSet) { + LOG.info("TopologyManager: handling remoteServiceAdmin " + remoteServiceAdmin); + if (endpointRepo.isAlreadyExportedForRsa(sref, remoteServiceAdmin)) { + // already handled by this remoteServiceAdmin + LOG.debug("already handled by this remoteServiceAdmin -> skipping"); + } else { + + exportServiceUsingRemoteServiceAdmin(sref, remoteServiceAdmin, addProps); + } + } + } + + private boolean shouldExport(ServiceReference<?> sref, Map<String, ?> addProps) { + String exported = (String)sref.getProperty(RemoteConstants.SERVICE_EXPORTED_INTERFACES); + String addExported = (String)addProps.get(RemoteConstants.SERVICE_EXPORTED_INTERFACES); + String effectiveExported = addExported != null ? addExported : exported; + return (effectiveExported != null) && !effectiveExported.isEmpty(); + } + + private Object getSymbolicName(Bundle bundle) { + return bundle == null ? null : bundle.getSymbolicName(); + } + + private void exportServiceUsingRemoteServiceAdmin(final ServiceReference<?> sref, + final RemoteServiceAdmin remoteServiceAdmin, + Map<String, ?> addProps) { + // abort if the service was unregistered by the time we got here + // (we check again at the end, but this optimization saves unnecessary heavy processing) + if (sref.getBundle() == null) { + LOG.info("TopologyManager: export aborted for {} since it was unregistered", sref); + endpointRepo.removeService(sref); + return; + } + // do the export + LOG.debug("exporting {}...", sref); + // TODO: additional parameter Map? + Collection<ExportRegistration> exportRegs = remoteServiceAdmin.exportService(sref, addProps); + // process successful/failed registrations + List<EndpointDescription> endpoints = new ArrayList<EndpointDescription>(); + for (ExportRegistration reg : exportRegs) { + if (reg.getException() == null) { + EndpointDescription endpoint = getExportedEndpoint(reg); + LOG.info("TopologyManager: export succeeded for {}, endpoint ", sref, endpoint); + endpoints.add(endpoint); + } else { + LOG.error("TopologyManager: export failed for {}", sref); + reg.close(); + } + } + // abort export if service was unregistered in the meanwhile (since we have a race + // with the unregister event which may have already been handled, so we'll miss it) + if (sref.getBundle() == null) { + LOG.info("TopologyManager: export reverted for {} since service was unregistered", sref); + endpointRepo.removeService(sref); + for (ExportRegistration reg : exportRegs) { + reg.close(); + } + return; + } + // add the new exported endpoints + if (!endpoints.isEmpty()) { + LOG.info("TopologyManager: export successful for {}, endpoints: {}", sref, endpoints); + endpointRepo.addEndpoints(sref, remoteServiceAdmin, endpoints); + } + } + + /** + * Retrieves an exported Endpoint (while safely handling nulls). + * + * @param exReg an export registration + * @return exported Endpoint or null if not present + */ + private EndpointDescription getExportedEndpoint(ExportRegistration exReg) { + ExportReference ref = (exReg == null) ? null : exReg.getExportReference(); + return (ref == null) ? null : ref.getExportedEndpoint(); + } + + +} http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/5f4c6604/topology-manager/src/main/java/org/apache/aries/rsa/topologymanager/importer/EndpointListenerManager.java ---------------------------------------------------------------------- diff --git a/topology-manager/src/main/java/org/apache/aries/rsa/topologymanager/importer/EndpointListenerManager.java b/topology-manager/src/main/java/org/apache/aries/rsa/topologymanager/importer/EndpointListenerManager.java new file mode 100644 index 0000000..1207f9f --- /dev/null +++ b/topology-manager/src/main/java/org/apache/aries/rsa/topologymanager/importer/EndpointListenerManager.java @@ -0,0 +1,98 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.aries.rsa.topologymanager.importer; + +import java.util.ArrayList; +import java.util.Dictionary; +import java.util.Hashtable; +import java.util.List; + +import org.osgi.framework.BundleContext; +import org.osgi.framework.ServiceRegistration; +import org.osgi.service.remoteserviceadmin.EndpointListener; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Manages an EndpointListener and adjusts its scope according to requested service filters. + */ +public class EndpointListenerManager { + + private static final Logger LOG = LoggerFactory.getLogger(EndpointListenerManager.class); + + private final BundleContext bctx; + private volatile ServiceRegistration<EndpointListener> serviceRegistration; + private final List<String> filters = new ArrayList<String>(); + private final EndpointListener endpointListener; + + public EndpointListenerManager(BundleContext bc, EndpointListener endpointListener) { + this.bctx = bc; + this.endpointListener = endpointListener; + } + + protected void start() { + serviceRegistration = bctx.registerService(EndpointListener.class, endpointListener, + getRegistrationProperties()); + } + + public void stop() { + if (serviceRegistration != null) { + serviceRegistration.unregister(); + } + } + + protected void extendScope(String filter) { + if (filter == null) { + return; + } + LOG.debug("EndpointListener: extending scope by {}", filter); + synchronized (filters) { + filters.add(filter); + } + updateRegistration(); + } + + protected void reduceScope(String filter) { + if (filter == null) { + return; + } + LOG.debug("EndpointListener: reducing scope by {}", filter); + synchronized (filters) { + filters.remove(filter); + } + updateRegistration(); + } + + private Dictionary<String, Object> getRegistrationProperties() { + Dictionary<String, Object> p = new Hashtable<String, Object>(); + + synchronized (filters) { + LOG.debug("Current filter: {}", filters); + p.put(EndpointListener.ENDPOINT_LISTENER_SCOPE, new ArrayList<String>(filters)); + } + + return p; + } + + private void updateRegistration() { + if (serviceRegistration != null) { + serviceRegistration.setProperties(getRegistrationProperties()); + } + } +} http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/5f4c6604/topology-manager/src/main/java/org/apache/aries/rsa/topologymanager/importer/FilterHelper.java ---------------------------------------------------------------------- diff --git a/topology-manager/src/main/java/org/apache/aries/rsa/topologymanager/importer/FilterHelper.java b/topology-manager/src/main/java/org/apache/aries/rsa/topologymanager/importer/FilterHelper.java new file mode 100644 index 0000000..480ae46 --- /dev/null +++ b/topology-manager/src/main/java/org/apache/aries/rsa/topologymanager/importer/FilterHelper.java @@ -0,0 +1,43 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.aries.rsa.topologymanager.importer; + +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +import org.osgi.framework.Constants; + +public final class FilterHelper { + private static final String OBJECTCLASS_EXPRESSION = ".*\\(" + Constants.OBJECTCLASS + "=([a-zA-Z_0-9.]+)\\).*"; + private static final Pattern OBJECTCLASS_PATTERN = Pattern.compile(OBJECTCLASS_EXPRESSION); + + private FilterHelper() { + // prevent instantiation + } + + public static String getObjectClass(String filter) { + if (filter != null) { + Matcher matcher = OBJECTCLASS_PATTERN.matcher(filter); + if (matcher.matches() && matcher.groupCount() >= 1) { + return matcher.group(1); + } + } + return null; + } +} http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/5f4c6604/topology-manager/src/main/java/org/apache/aries/rsa/topologymanager/importer/ListenerHookImpl.java ---------------------------------------------------------------------- diff --git a/topology-manager/src/main/java/org/apache/aries/rsa/topologymanager/importer/ListenerHookImpl.java b/topology-manager/src/main/java/org/apache/aries/rsa/topologymanager/importer/ListenerHookImpl.java new file mode 100644 index 0000000..6766fc1 --- /dev/null +++ b/topology-manager/src/main/java/org/apache/aries/rsa/topologymanager/importer/ListenerHookImpl.java @@ -0,0 +1,119 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.aries.rsa.topologymanager.importer; + +import java.util.Collection; +import java.util.HashSet; +import java.util.Set; + +import org.osgi.framework.BundleContext; +import org.osgi.framework.Constants; +import org.osgi.framework.hooks.service.ListenerHook; +import org.osgi.service.remoteserviceadmin.RemoteConstants; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Listens for service listeners and informs ServiceInterestListener about added and removed interest + * in services + */ +public class ListenerHookImpl implements ListenerHook { + + private static final Logger LOG = LoggerFactory.getLogger(ListenerHookImpl.class); + + // From the old impl. + private static final Set<String> SYSTEM_PACKAGES; + static { + SYSTEM_PACKAGES = new HashSet<String>(); + SYSTEM_PACKAGES.add("org.osgi.service"); + SYSTEM_PACKAGES.add("org.apache.felix"); + SYSTEM_PACKAGES.add("org.ops4j.pax.logging"); + SYSTEM_PACKAGES.add("ch.ethz.iks.slp"); + SYSTEM_PACKAGES.add("org.ungoverned.osgi.service"); + SYSTEM_PACKAGES.add("org.springframework.osgi.context.event.OsgiBundleApplicationContextListener"); + SYSTEM_PACKAGES.add("java.net.ContentHandler"); + } + + private final BundleContext bctx; + private final ServiceInterestListener serviceInterestListener; + private final String frameworkUUID; + + public ListenerHookImpl(BundleContext bc, ServiceInterestListener serviceInterestListener) { + this.bctx = bc; + this.frameworkUUID = bctx.getProperty(Constants.FRAMEWORK_UUID); + this.serviceInterestListener = serviceInterestListener; + } + + @Override + public void added(Collection<ListenerInfo> listeners) { + LOG.debug("added listeners {}", listeners); + for (ListenerInfo listenerInfo : listeners) { + LOG.debug("Filter {}", listenerInfo.getFilter()); + + String className = FilterHelper.getObjectClass(listenerInfo.getFilter()); + + if (listenerInfo.getBundleContext().equals(bctx)) { + LOG.debug("ListenerHookImpl: skipping request from myself"); + continue; + } + + if (listenerInfo.getFilter() == null) { + LOG.debug("skipping empty filter"); + continue; + } + + if (isClassExcluded(className)) { + LOG.debug("Skipping import request for excluded class [{}]", className); + continue; + } + String exFilter = extendFilter(listenerInfo.getFilter()); + serviceInterestListener.addServiceInterest(exFilter); + } + } + + @Override + public void removed(Collection<ListenerInfo> listeners) { + LOG.debug("removed listeners {}", listeners); + + for (ListenerInfo listenerInfo : listeners) { + LOG.debug("Filter {}", listenerInfo.getFilter()); + + // TODO: determine if service was handled? + String exFilter = extendFilter(listenerInfo.getFilter()); + serviceInterestListener.removeServiceInterest(exFilter); + } + } + + private static boolean isClassExcluded(String className) { + if (className == null) { + return true; + } + + for (String p : SYSTEM_PACKAGES) { + if (className.startsWith(p)) { + return true; + } + } + return false; + } + + String extendFilter(String filter) { + return "(&" + filter + "(!(" + RemoteConstants.ENDPOINT_FRAMEWORK_UUID + "=" + frameworkUUID + ")))"; + } +} http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/5f4c6604/topology-manager/src/main/java/org/apache/aries/rsa/topologymanager/importer/RSATracker.java ---------------------------------------------------------------------- diff --git a/topology-manager/src/main/java/org/apache/aries/rsa/topologymanager/importer/RSATracker.java b/topology-manager/src/main/java/org/apache/aries/rsa/topologymanager/importer/RSATracker.java new file mode 100644 index 0000000..4aa648f --- /dev/null +++ b/topology-manager/src/main/java/org/apache/aries/rsa/topologymanager/importer/RSATracker.java @@ -0,0 +1,26 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.aries.rsa.topologymanager.importer; + +import org.osgi.service.remoteserviceadmin.RemoteServiceAdmin; + +public interface RSATracker { + void added(RemoteServiceAdmin rsa); + void removed(RemoteServiceAdmin rsa); +} http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/5f4c6604/topology-manager/src/main/java/org/apache/aries/rsa/topologymanager/importer/ReferenceCounter.java ---------------------------------------------------------------------- diff --git a/topology-manager/src/main/java/org/apache/aries/rsa/topologymanager/importer/ReferenceCounter.java b/topology-manager/src/main/java/org/apache/aries/rsa/topologymanager/importer/ReferenceCounter.java new file mode 100644 index 0000000..71e796c --- /dev/null +++ b/topology-manager/src/main/java/org/apache/aries/rsa/topologymanager/importer/ReferenceCounter.java @@ -0,0 +1,76 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.aries.rsa.topologymanager.importer; + +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; + +/** + * Manages a reference count per key. + * + * @param <K> the key type + */ +public class ReferenceCounter<K> { + + private final ConcurrentMap<K, Integer> counts = new ConcurrentHashMap<K, Integer>(); + + /** + * Increases the reference count for the given key, + * or sets it to 1 if the key has no existing count. + * + * @param key a key + * @return the updated reference count + */ + public int add(K key) { + while (true) { + Integer count = counts.get(key); + if (count == null) { + if (counts.putIfAbsent(key, 1) == null) { + return 1; + } + } else if (counts.replace(key, count, count + 1)) { + return count + 1; + } + } + } + + /** + * Decreases the reference count for the given key, + * and removes it if it reaches 0. + * If the key has no existing count, -1 is returned. + * + * @param key a key + * @return the updated reference count, or -1 if the key has no existing count + */ + public int remove(K key) { + while (true) { + Integer count = counts.get(key); + if (count == null) { + return -1; + } + if (count == 1) { + if (counts.remove(key, 1)) { + return 0; + } + } else if (counts.replace(key, count, count - 1)) { + return count - 1; + } + } + } +} http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/5f4c6604/topology-manager/src/main/java/org/apache/aries/rsa/topologymanager/importer/ServiceInterestListener.java ---------------------------------------------------------------------- diff --git a/topology-manager/src/main/java/org/apache/aries/rsa/topologymanager/importer/ServiceInterestListener.java b/topology-manager/src/main/java/org/apache/aries/rsa/topologymanager/importer/ServiceInterestListener.java new file mode 100644 index 0000000..9e7b70c --- /dev/null +++ b/topology-manager/src/main/java/org/apache/aries/rsa/topologymanager/importer/ServiceInterestListener.java @@ -0,0 +1,26 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.aries.rsa.topologymanager.importer; + +public interface ServiceInterestListener { + + void addServiceInterest(String filter); + + void removeServiceInterest(String filter); +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/5f4c6604/topology-manager/src/main/java/org/apache/aries/rsa/topologymanager/importer/TopologyManagerImport.java ---------------------------------------------------------------------- diff --git a/topology-manager/src/main/java/org/apache/aries/rsa/topologymanager/importer/TopologyManagerImport.java b/topology-manager/src/main/java/org/apache/aries/rsa/topologymanager/importer/TopologyManagerImport.java new file mode 100644 index 0000000..e548288 --- /dev/null +++ b/topology-manager/src/main/java/org/apache/aries/rsa/topologymanager/importer/TopologyManagerImport.java @@ -0,0 +1,345 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.aries.rsa.topologymanager.importer; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; + +import org.osgi.framework.BundleContext; +import org.osgi.framework.hooks.service.ListenerHook; +import org.osgi.service.remoteserviceadmin.EndpointDescription; +import org.osgi.service.remoteserviceadmin.EndpointListener; +import org.osgi.service.remoteserviceadmin.ImportReference; +import org.osgi.service.remoteserviceadmin.ImportRegistration; +import org.osgi.service.remoteserviceadmin.RemoteServiceAdmin; +import org.osgi.service.remoteserviceadmin.RemoteServiceAdminEvent; +import org.osgi.service.remoteserviceadmin.RemoteServiceAdminListener; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Listens for remote endpoints using the EndpointListener interface and the EndpointListenerManager. + * Listens for local service interests using the ListenerHookImpl that calls back through the + * ServiceInterestListener interface. + * Manages local creation and destruction of service imports using the available RemoteServiceAdmin services. + */ +public class TopologyManagerImport implements EndpointListener, RemoteServiceAdminListener, ServiceInterestListener { + + private static final Logger LOG = LoggerFactory.getLogger(TopologyManagerImport.class); + private ExecutorService execService; + + private final EndpointListenerManager endpointListenerManager; + private final BundleContext bctx; + private Set<RemoteServiceAdmin> rsaSet; + private final ListenerHookImpl listenerHook; + + /** + * If set to false only one service is imported for each import interest even it multiple services are + * available. If set to true, all available services are imported. + * + * TODO: Make this available as a configuration option + */ + private boolean importAllAvailable = true; + + /** + * Contains an instance of the Class Import Interest for each distinct import request. If the same filter + * is requested multiple times the existing instance of the Object increments an internal reference + * counter. If an interest is removed, the related ServiceInterest object is used to reduce the reference + * counter until it reaches zero. in this case the interest is removed. + */ + private final ReferenceCounter<String> importInterestsCounter = new ReferenceCounter<String>(); + + /** + * List of Endpoints by matched filter that were reported by the EndpointListener and can be imported + */ + private final Map<String /* filter */, List<EndpointDescription>> importPossibilities + = new HashMap<String, List<EndpointDescription>>(); + + /** + * List of already imported Endpoints by their matched filter + */ + private final Map<String /* filter */, List<ImportRegistration>> importedServices + = new HashMap<String, List<ImportRegistration>>(); + + + public TopologyManagerImport(BundleContext bc) { + this.rsaSet = new HashSet<RemoteServiceAdmin>(); + bctx = bc; + endpointListenerManager = new EndpointListenerManager(bctx, this); + execService = new ThreadPoolExecutor(5, 10, 50, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>()); + listenerHook = new ListenerHookImpl(bc, this); + } + + public void start() { + bctx.registerService(RemoteServiceAdminListener.class, this, null); + bctx.registerService(ListenerHook.class, listenerHook, null); + endpointListenerManager.start(); + } + + public void stop() { + endpointListenerManager.stop(); + execService.shutdown(); + // this is called from Activator.stop(), which implicitly unregisters our registered services + } + + /* (non-Javadoc) + * @see org.apache.cxf.dosgi.topologymanager.ServiceInterestListener#addServiceInterest(java.lang.String) + */ + public void addServiceInterest(String filter) { + if (importInterestsCounter.add(filter) == 1) { + endpointListenerManager.extendScope(filter); + } + } + + /* (non-Javadoc) + * @see org.apache.cxf.dosgi.topologymanager.ServiceInterestListener#removeServiceInterest(java.lang.String) + */ + public void removeServiceInterest(String filter) { + if (importInterestsCounter.remove(filter) == 0) { + LOG.debug("last reference to import interest is gone -> removing interest filter: {}", filter); + endpointListenerManager.reduceScope(filter); + synchronized (importedServices) { + List<ImportRegistration> irs = importedServices.remove(filter); + if (irs != null) { + for (ImportRegistration ir : irs) { + ir.close(); + } + } + } + } + } + + public void endpointAdded(EndpointDescription endpoint, String filter) { + if (filter == null) { + LOG.error("Endpoint is not handled because no matching filter was provided!"); + return; + } + LOG.debug("importable service added for filter {}, endpoint {}", filter, endpoint); + addImportPossibility(endpoint, filter); + triggerImport(filter); + } + + public void endpointRemoved(EndpointDescription endpoint, String filter) { + LOG.debug("EndpointRemoved {}", endpoint); + removeImportPossibility(endpoint, filter); + triggerImport(filter); + } + + private void addImportPossibility(EndpointDescription endpoint, String filter) { + synchronized (importPossibilities) { + List<EndpointDescription> endpoints = importPossibilities.get(filter); + if (endpoints == null) { + endpoints = new ArrayList<EndpointDescription>(); + importPossibilities.put(filter, endpoints); + } + // prevent adding the same endpoint multiple times, which can happen sometimes, + // and which causes imports to remain available even when services are actually down + if (!endpoints.contains(endpoint)) { + endpoints.add(endpoint); + } + } + } + + private void removeImportPossibility(EndpointDescription endpoint, String filter) { + synchronized (importPossibilities) { + List<EndpointDescription> endpoints = importPossibilities.get(filter); + if (endpoints != null) { + endpoints.remove(endpoint); + if (endpoints.isEmpty()) { + importPossibilities.remove(filter); + } + } + } + } + + public void add(RemoteServiceAdmin rsa) { + rsaSet.add(rsa); + synchronized (importPossibilities) { + for (String filter : importPossibilities.keySet()) { + triggerImport(filter); + } + } + } + + public void remove(RemoteServiceAdmin rsa) { + rsaSet.remove(rsa); + } + + + private void triggerImport(final String filter) { + LOG.debug("Import of a service for filter {} was queued", filter); + + execService.execute(new Runnable() { + public void run() { + try { + unexportNotAvailableServices(filter); + importServices(filter); + } catch (Exception e) { + LOG.error(e.getMessage(), e); + } + // Notify EndpointListeners? NO! + } + }); + } + + private void unexportNotAvailableServices(String filter) { + synchronized (importedServices) { + List<ImportRegistration> importRegistrations = importedServices.get(filter); + if (importRegistrations != null) { + // iterate over a copy + for (ImportRegistration ir : new ArrayList<ImportRegistration>(importRegistrations)) { + EndpointDescription endpoint = ir.getImportReference().getImportedEndpoint(); + if (!isImportPossibilityAvailable(endpoint, filter)) { + removeImport(ir, null); // also unexports the service + } + } + } + } + } + + private boolean isImportPossibilityAvailable(EndpointDescription endpoint, String filter) { + synchronized (importPossibilities) { + List<EndpointDescription> endpoints = importPossibilities.get(filter); + return endpoints != null && endpoints.contains(endpoint); + } + } + + // return a copy to prevent sync issues + private List<EndpointDescription> getImportPossibilitiesCopy(String filter) { + synchronized (importPossibilities) { + List<EndpointDescription> possibilities = importPossibilities.get(filter); + return possibilities == null + ? Collections.<EndpointDescription>emptyList() + : new ArrayList<EndpointDescription>(possibilities); + } + } + + private void importServices(String filter) { + synchronized (importedServices) { + List<ImportRegistration> importRegistrations = importedServices.get(filter); + for (EndpointDescription endpoint : getImportPossibilitiesCopy(filter)) { + // TODO but optional: if the service is already imported and the endpoint is still + // in the list of possible imports check if a "better" endpoint is now in the list + if (!alreadyImported(endpoint, importRegistrations)) { + // service not imported yet -> import it now + ImportRegistration ir = importService(endpoint); + if (ir != null) { + // import was successful + if (importRegistrations == null) { + importRegistrations = new ArrayList<ImportRegistration>(); + importedServices.put(filter, importRegistrations); + } + importRegistrations.add(ir); + if (!importAllAvailable) { + return; + } + } + } + } + } + } + + private boolean alreadyImported(EndpointDescription endpoint, List<ImportRegistration> importRegistrations) { + if (importRegistrations != null) { + for (ImportRegistration ir : importRegistrations) { + if (endpoint.equals(ir.getImportReference().getImportedEndpoint())) { + return true; + } + } + } + return false; + } + + /** + * Tries to import the service with each rsa until one import is successful + * + * @param endpoint endpoint to import + * @return import registration of the first successful import + */ + private ImportRegistration importService(EndpointDescription endpoint) { + for (RemoteServiceAdmin rsa : rsaSet) { + ImportRegistration ir = rsa.importService(endpoint); + if (ir != null) { + if (ir.getException() == null) { + LOG.debug("Service import was successful {}", ir); + return ir; + } else { + LOG.info("Error importing service " + endpoint, ir.getException()); + } + } + } + return null; + } + + /** + * Remove and close (unexport) the given import. The import is specified either + * by its ImportRegistration or by its ImportReference (only one of them must + * be specified). + * <p> + * If this method is called from within iterations on the underlying data structure, + * the iterations must be made on copies of the structures rather than the original + * references in order to prevent ConcurrentModificationExceptions. + * + * @param reg the import registration to remove + * @param ref the import reference to remove + */ + private void removeImport(ImportRegistration reg, ImportReference ref) { + // this method may be called recursively by calling ImportRegistration.close() + // and receiving a RemoteServiceAdminEvent for its unregistration, which results + // in a ConcurrentModificationException. We avoid this by closing the registrations + // only after data structure manipulation is done, and being re-entrant. + synchronized (importedServices) { + List<ImportRegistration> removed = new ArrayList<ImportRegistration>(); + for (Iterator<List<ImportRegistration>> it1 = importedServices.values().iterator(); it1.hasNext();) { + Collection<ImportRegistration> irs = it1.next(); + for (Iterator<ImportRegistration> it2 = irs.iterator(); it2.hasNext();) { + ImportRegistration ir = it2.next(); + if (ir.equals(reg) || ir.getImportReference().equals(ref)) { + removed.add(ir); + it2.remove(); + } + } + if (irs.isEmpty()) { + it1.remove(); + } + } + for (ImportRegistration ir : removed) { + ir.close(); + } + } + } + + public void remoteAdminEvent(RemoteServiceAdminEvent event) { + if (event.getType() == RemoteServiceAdminEvent.IMPORT_UNREGISTRATION) { + removeImport(null, event.getImportReference()); + } + } + +} http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/5f4c6604/topology-manager/src/main/java/org/apache/cxf/dosgi/topologymanager/Activator.java ---------------------------------------------------------------------- diff --git a/topology-manager/src/main/java/org/apache/cxf/dosgi/topologymanager/Activator.java b/topology-manager/src/main/java/org/apache/cxf/dosgi/topologymanager/Activator.java deleted file mode 100644 index 62ec1a9..0000000 --- a/topology-manager/src/main/java/org/apache/cxf/dosgi/topologymanager/Activator.java +++ /dev/null @@ -1,192 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.cxf.dosgi.topologymanager; - -import java.util.Dictionary; -import java.util.Hashtable; -import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.ThreadPoolExecutor; -import java.util.concurrent.TimeUnit; - -import org.apache.cxf.dosgi.dsw.api.ExportPolicy; -import org.apache.cxf.dosgi.topologymanager.exporter.DefaultExportPolicy; -import org.apache.cxf.dosgi.topologymanager.exporter.EndpointListenerNotifier; -import org.apache.cxf.dosgi.topologymanager.exporter.EndpointRepository; -import org.apache.cxf.dosgi.topologymanager.exporter.TopologyManagerExport; -import org.apache.cxf.dosgi.topologymanager.importer.TopologyManagerImport; -import org.osgi.framework.BundleActivator; -import org.osgi.framework.BundleContext; -import org.osgi.framework.Filter; -import org.osgi.framework.FrameworkUtil; -import org.osgi.framework.InvalidSyntaxException; -import org.osgi.framework.ServiceEvent; -import org.osgi.framework.ServiceReference; -import org.osgi.service.remoteserviceadmin.EndpointListener; -import org.osgi.service.remoteserviceadmin.RemoteConstants; -import org.osgi.service.remoteserviceadmin.RemoteServiceAdmin; -import org.osgi.util.tracker.ServiceTracker; -import org.osgi.util.tracker.ServiceTrackerCustomizer; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -public class Activator implements BundleActivator { - public static final String RSA_EXPORT_POLICY_FILTER = "rsa.export.policy.filter"; - static final String DOSGI_SERVICES = "(" + RemoteConstants.SERVICE_EXPORTED_INTERFACES + "=*)"; - private static final Logger LOG = LoggerFactory.getLogger(Activator.class); - - private TopologyManagerExport exportManager; - private TopologyManagerImport importManager; - private EndpointListenerNotifier notifier; - private ServiceTracker<RemoteServiceAdmin, RemoteServiceAdmin> rsaTracker; - private ThreadPoolExecutor exportExecutor; - private ServiceTracker<EndpointListener, EndpointListener> epListenerTracker; - private ServiceTracker<ExportPolicy, ExportPolicy> policyTracker; - - public void start(final BundleContext bc) throws Exception { - Dictionary<String, String> props = new Hashtable<String, String>(); - props.put("name", "default"); - bc.registerService(ExportPolicy.class, new DefaultExportPolicy(), props); - - Filter policyFilter = exportPolicyFilter(bc); - policyTracker = new ServiceTracker<ExportPolicy, ExportPolicy>(bc, policyFilter, null) { - - @Override - public ExportPolicy addingService(ServiceReference<ExportPolicy> reference) { - ExportPolicy policy = super.addingService(reference); - if (exportManager == null) { - doStart(bc, policy); - } - return policy; - } - - @Override - public void removedService(ServiceReference<ExportPolicy> reference, ExportPolicy service) { - if (exportManager != null) { - doStop(bc); - } - super.removedService(reference, service); - } - }; - policyTracker.open(); - } - - private Filter exportPolicyFilter(BundleContext bc) throws InvalidSyntaxException { - String filter = bc.getProperty(RSA_EXPORT_POLICY_FILTER); - if (filter == null) { - filter = "(name=default)"; - } - return FrameworkUtil.createFilter(String.format("(&(objectClass=%s)%s)", ExportPolicy.class.getName(), filter)); - } - - public void doStart(final BundleContext bc, ExportPolicy policy) { - LOG.debug("TopologyManager: start()"); - EndpointRepository endpointRepo = new EndpointRepository(); - notifier = new EndpointListenerNotifier(endpointRepo); - epListenerTracker = new EndpointListenerTracker(bc); - endpointRepo.setNotifier(notifier); - exportExecutor = new ThreadPoolExecutor(5, 10, 50, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>()); - exportManager = new TopologyManagerExport(endpointRepo, exportExecutor, policy); - importManager = new TopologyManagerImport(bc); - rsaTracker = new RSATracker(bc, RemoteServiceAdmin.class, null); - bc.addServiceListener(exportManager); - rsaTracker.open(); - epListenerTracker.open(); - exportExistingServices(bc); - importManager.start(); - } - - public void stop(BundleContext bc) throws Exception { - policyTracker.close(); - } - - public void doStop(BundleContext bc) { - LOG.debug("TopologyManager: stop()"); - epListenerTracker.close(); - bc.removeServiceListener(exportManager); - exportExecutor.shutdown(); - importManager.stop(); - rsaTracker.close(); - exportManager = null; - } - - public void exportExistingServices(BundleContext context) { - try { - // cast to String is necessary for compiling against OSGi core version >= 4.3 - ServiceReference<?>[] references = context.getServiceReferences((String)null, DOSGI_SERVICES); - if (references != null) { - for (ServiceReference<?> sref : references) { - exportManager.serviceChanged(new ServiceEvent(ServiceEvent.REGISTERED, sref)); - } - } - } catch (InvalidSyntaxException e) { - LOG.error("Error in filter {}. This should not occur!", DOSGI_SERVICES); - } - } - - private final class EndpointListenerTracker extends ServiceTracker<EndpointListener, EndpointListener> { - private EndpointListenerTracker(BundleContext context) { - super(context, EndpointListener.class, null); - } - - @Override - public EndpointListener addingService(ServiceReference<EndpointListener> reference) { - EndpointListener listener = super.addingService(reference); - notifier.add(listener, EndpointListenerNotifier.getFiltersFromEndpointListenerScope(reference)); - return listener; - } - - @Override - public void modifiedService(ServiceReference<EndpointListener> reference, - EndpointListener listener) { - super.modifiedService(reference, listener); - notifier.add(listener, EndpointListenerNotifier.getFiltersFromEndpointListenerScope(reference)); - } - - @Override - public void removedService(ServiceReference<EndpointListener> reference, - EndpointListener listener) { - notifier.remove(listener); - super.removedService(reference, listener); - } - } - - private final class RSATracker extends ServiceTracker<RemoteServiceAdmin, RemoteServiceAdmin> { - private RSATracker(BundleContext context, Class<RemoteServiceAdmin> clazz, - ServiceTrackerCustomizer<RemoteServiceAdmin, RemoteServiceAdmin> customizer) { - super(context, clazz, customizer); - } - - @Override - public RemoteServiceAdmin addingService(ServiceReference<RemoteServiceAdmin> reference) { - RemoteServiceAdmin rsa = super.addingService(reference); - LOG.debug("New RemoteServiceAdmin {} detected, trying to import and export services with it", rsa); - importManager.add(rsa); - exportManager.add(rsa); - return rsa; - } - - @Override - public void removedService(ServiceReference<RemoteServiceAdmin> reference, - RemoteServiceAdmin rsa) { - exportManager.remove(rsa); - importManager.remove(rsa); - super.removedService(reference, rsa); - } - } -} http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/5f4c6604/topology-manager/src/main/java/org/apache/cxf/dosgi/topologymanager/exporter/DefaultExportPolicy.java ---------------------------------------------------------------------- diff --git a/topology-manager/src/main/java/org/apache/cxf/dosgi/topologymanager/exporter/DefaultExportPolicy.java b/topology-manager/src/main/java/org/apache/cxf/dosgi/topologymanager/exporter/DefaultExportPolicy.java deleted file mode 100644 index 689ebab..0000000 --- a/topology-manager/src/main/java/org/apache/cxf/dosgi/topologymanager/exporter/DefaultExportPolicy.java +++ /dev/null @@ -1,37 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.cxf.dosgi.topologymanager.exporter; - -import java.util.HashMap; -import java.util.Map; - -import org.apache.cxf.dosgi.dsw.api.ExportPolicy; -import org.osgi.framework.ServiceReference; - -/** - * The default is to not customize the way services are exported - */ -public class DefaultExportPolicy implements ExportPolicy { - - @Override - public Map<String, ?> additionalParameters(ServiceReference<?> sref) { - return new HashMap<String, Object>(); - } - -} http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/5f4c6604/topology-manager/src/main/java/org/apache/cxf/dosgi/topologymanager/exporter/EndpointListenerNotifier.java ---------------------------------------------------------------------- diff --git a/topology-manager/src/main/java/org/apache/cxf/dosgi/topologymanager/exporter/EndpointListenerNotifier.java b/topology-manager/src/main/java/org/apache/cxf/dosgi/topologymanager/exporter/EndpointListenerNotifier.java deleted file mode 100644 index 13d7dab..0000000 --- a/topology-manager/src/main/java/org/apache/cxf/dosgi/topologymanager/exporter/EndpointListenerNotifier.java +++ /dev/null @@ -1,133 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.cxf.dosgi.topologymanager.exporter; - -import java.util.Dictionary; -import java.util.HashSet; -import java.util.Hashtable; -import java.util.Map; -import java.util.Set; -import java.util.concurrent.ConcurrentHashMap; - -import org.osgi.framework.Filter; -import org.osgi.framework.FrameworkUtil; -import org.osgi.framework.InvalidSyntaxException; -import org.osgi.framework.ServiceReference; -import org.osgi.service.remoteserviceadmin.EndpointDescription; -import org.osgi.service.remoteserviceadmin.EndpointListener; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * Tracks EndpointListeners and allows to notify them of endpoints. - */ -public class EndpointListenerNotifier implements EndpointListener { - private static final Logger LOG = LoggerFactory.getLogger(EndpointListenerNotifier.class); - private enum NotifyType { ADDED, REMOVED }; - private Map<EndpointListener, Set<Filter>> listeners; - private EndpointRepository endpointRepo; - - public EndpointListenerNotifier(final EndpointRepository endpointRepo) { - this.endpointRepo = endpointRepo; - this.listeners = new ConcurrentHashMap<EndpointListener, Set<Filter>>(); - } - - public static Set<Filter> getFiltersFromEndpointListenerScope(ServiceReference<EndpointListener> sref) { - Set<Filter> filters = new HashSet<Filter>(); - String[] scopes = StringPlus.parse(sref.getProperty(EndpointListener.ENDPOINT_LISTENER_SCOPE)); - for (String scope : scopes) { - try { - filters.add(FrameworkUtil.createFilter(scope)); - } catch (InvalidSyntaxException e) { - LOG.error("invalid endpoint listener scope: {}", scope, e); - } - } - return filters; - } - - public void add(EndpointListener ep, Set<Filter> filters) { - LOG.debug("new EndpointListener detected"); - listeners.put(ep, filters); - for (EndpointDescription endpoint : endpointRepo.getAllEndpoints()) { - notifyListener(NotifyType.ADDED, ep, filters, endpoint); - } - } - - public void remove(EndpointListener ep) { - LOG.debug("EndpointListener modified"); - listeners.remove(ep); - } - - @Override - public void endpointAdded(EndpointDescription endpoint, String matchedFilter) { - notifyListeners(NotifyType.ADDED, endpoint); - } - - @Override - public void endpointRemoved(EndpointDescription endpoint, String matchedFilter) { - notifyListeners(NotifyType.REMOVED, endpoint); - } - - /** - * Notifies all endpoint listeners about endpoints being added or removed. - * - * @param added specifies whether endpoints were added (true) or removed (false) - * @param endpoints the endpoints the listeners should be notified about - */ - private void notifyListeners(NotifyType type, EndpointDescription endpoint) { - for (EndpointListener listener : listeners.keySet()) { - notifyListener(type, listener, listeners.get(listener), endpoint); - } - } - - /** - * Notifies an endpoint listener about endpoints being added or removed. - * - * @param type specifies whether endpoints were added (true) or removed (false) - * @param endpointListenerRef the ServiceReference of an EndpointListener to notify - * @param endpoints the endpoints the listener should be notified about - */ - private void notifyListener(NotifyType type, EndpointListener listener, Set<Filter> filters, - EndpointDescription endpoint) { - LOG.debug("Endpoint {}", type); - Set<Filter> matchingFilters = getMatchingFilters(filters, endpoint); - for (Filter filter : matchingFilters) { - if (type == NotifyType.ADDED) { - listener.endpointAdded(endpoint, filter.toString()); - } else { - listener.endpointRemoved(endpoint, filter.toString()); - } - } - } - - private static Set<Filter> getMatchingFilters(Set<Filter> filters, EndpointDescription endpoint) { - Set<Filter> matchingFilters = new HashSet<Filter>(); - Dictionary<String, Object> dict = new Hashtable<String, Object>(endpoint.getProperties()); - for (Filter filter : filters) { - if (filter.match(dict)) { - LOG.debug("Filter {} matches endpoint {}", filter, dict); - matchingFilters.add(filter); - } else { - LOG.trace("Filter {} does not match endpoint {}", filter, dict); - } - } - return matchingFilters; - } - -}
