http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/d73a3a7f/topology-manager/src/main/java/org/apache/cxf/dosgi/topologymanager/importer/ServiceInterestListener.java ---------------------------------------------------------------------- diff --git a/topology-manager/src/main/java/org/apache/cxf/dosgi/topologymanager/importer/ServiceInterestListener.java b/topology-manager/src/main/java/org/apache/cxf/dosgi/topologymanager/importer/ServiceInterestListener.java new file mode 100644 index 0000000..f4db92e --- /dev/null +++ b/topology-manager/src/main/java/org/apache/cxf/dosgi/topologymanager/importer/ServiceInterestListener.java @@ -0,0 +1,26 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.cxf.dosgi.topologymanager.importer; + +public interface ServiceInterestListener { + + void addServiceInterest(String filter); + + void removeServiceInterest(String filter); +} \ No newline at end of file
http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/d73a3a7f/topology-manager/src/main/java/org/apache/cxf/dosgi/topologymanager/importer/TopologyManagerImport.java ---------------------------------------------------------------------- diff --git a/topology-manager/src/main/java/org/apache/cxf/dosgi/topologymanager/importer/TopologyManagerImport.java b/topology-manager/src/main/java/org/apache/cxf/dosgi/topologymanager/importer/TopologyManagerImport.java new file mode 100644 index 0000000..30fe4c2 --- /dev/null +++ b/topology-manager/src/main/java/org/apache/cxf/dosgi/topologymanager/importer/TopologyManagerImport.java @@ -0,0 +1,345 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.cxf.dosgi.topologymanager.importer; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; + +import org.osgi.framework.BundleContext; +import org.osgi.framework.hooks.service.ListenerHook; +import org.osgi.service.remoteserviceadmin.EndpointDescription; +import org.osgi.service.remoteserviceadmin.EndpointListener; +import org.osgi.service.remoteserviceadmin.ImportReference; +import org.osgi.service.remoteserviceadmin.ImportRegistration; +import org.osgi.service.remoteserviceadmin.RemoteServiceAdmin; +import org.osgi.service.remoteserviceadmin.RemoteServiceAdminEvent; +import org.osgi.service.remoteserviceadmin.RemoteServiceAdminListener; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Listens for remote endpoints using the EndpointListener interface and the EndpointListenerManager. + * Listens for local service interests using the ListenerHookImpl that calls back through the + * ServiceInterestListener interface. + * Manages local creation and destruction of service imports using the available RemoteServiceAdmin services. + */ +public class TopologyManagerImport implements EndpointListener, RemoteServiceAdminListener, ServiceInterestListener { + + private static final Logger LOG = LoggerFactory.getLogger(TopologyManagerImport.class); + private ExecutorService execService; + + private final EndpointListenerManager endpointListenerManager; + private final BundleContext bctx; + private Set<RemoteServiceAdmin> rsaSet; + private final ListenerHookImpl listenerHook; + + /** + * If set to false only one service is imported for each import interest even it multiple services are + * available. If set to true, all available services are imported. + * + * TODO: Make this available as a configuration option + */ + private boolean importAllAvailable = true; + + /** + * Contains an instance of the Class Import Interest for each distinct import request. If the same filter + * is requested multiple times the existing instance of the Object increments an internal reference + * counter. If an interest is removed, the related ServiceInterest object is used to reduce the reference + * counter until it reaches zero. in this case the interest is removed. + */ + private final ReferenceCounter<String> importInterestsCounter = new ReferenceCounter<String>(); + + /** + * List of Endpoints by matched filter that were reported by the EndpointListener and can be imported + */ + private final Map<String /* filter */, List<EndpointDescription>> importPossibilities + = new HashMap<String, List<EndpointDescription>>(); + + /** + * List of already imported Endpoints by their matched filter + */ + private final Map<String /* filter */, List<ImportRegistration>> importedServices + = new HashMap<String, List<ImportRegistration>>(); + + + public TopologyManagerImport(BundleContext bc) { + this.rsaSet = new HashSet<RemoteServiceAdmin>(); + bctx = bc; + endpointListenerManager = new EndpointListenerManager(bctx, this); + execService = new ThreadPoolExecutor(5, 10, 50, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>()); + listenerHook = new ListenerHookImpl(bc, this); + } + + public void start() { + bctx.registerService(RemoteServiceAdminListener.class, this, null); + bctx.registerService(ListenerHook.class, listenerHook, null); + endpointListenerManager.start(); + } + + public void stop() { + endpointListenerManager.stop(); + execService.shutdown(); + // this is called from Activator.stop(), which implicitly unregisters our registered services + } + + /* (non-Javadoc) + * @see org.apache.cxf.dosgi.topologymanager.ServiceInterestListener#addServiceInterest(java.lang.String) + */ + public void addServiceInterest(String filter) { + if (importInterestsCounter.add(filter) == 1) { + endpointListenerManager.extendScope(filter); + } + } + + /* (non-Javadoc) + * @see org.apache.cxf.dosgi.topologymanager.ServiceInterestListener#removeServiceInterest(java.lang.String) + */ + public void removeServiceInterest(String filter) { + if (importInterestsCounter.remove(filter) == 0) { + LOG.debug("last reference to import interest is gone -> removing interest filter: {}", filter); + endpointListenerManager.reduceScope(filter); + synchronized (importedServices) { + List<ImportRegistration> irs = importedServices.remove(filter); + if (irs != null) { + for (ImportRegistration ir : irs) { + ir.close(); + } + } + } + } + } + + public void endpointAdded(EndpointDescription endpoint, String filter) { + if (filter == null) { + LOG.error("Endpoint is not handled because no matching filter was provided!"); + return; + } + LOG.debug("importable service added for filter {}, endpoint {}", filter, endpoint); + addImportPossibility(endpoint, filter); + triggerImport(filter); + } + + public void endpointRemoved(EndpointDescription endpoint, String filter) { + LOG.debug("EndpointRemoved {}", endpoint); + removeImportPossibility(endpoint, filter); + triggerImport(filter); + } + + private void addImportPossibility(EndpointDescription endpoint, String filter) { + synchronized (importPossibilities) { + List<EndpointDescription> endpoints = importPossibilities.get(filter); + if (endpoints == null) { + endpoints = new ArrayList<EndpointDescription>(); + importPossibilities.put(filter, endpoints); + } + // prevent adding the same endpoint multiple times, which can happen sometimes, + // and which causes imports to remain available even when services are actually down + if (!endpoints.contains(endpoint)) { + endpoints.add(endpoint); + } + } + } + + private void removeImportPossibility(EndpointDescription endpoint, String filter) { + synchronized (importPossibilities) { + List<EndpointDescription> endpoints = importPossibilities.get(filter); + if (endpoints != null) { + endpoints.remove(endpoint); + if (endpoints.isEmpty()) { + importPossibilities.remove(filter); + } + } + } + } + + public void add(RemoteServiceAdmin rsa) { + rsaSet.add(rsa); + synchronized (importPossibilities) { + for (String filter : importPossibilities.keySet()) { + triggerImport(filter); + } + } + } + + public void remove(RemoteServiceAdmin rsa) { + rsaSet.remove(rsa); + } + + + private void triggerImport(final String filter) { + LOG.debug("Import of a service for filter {} was queued", filter); + + execService.execute(new Runnable() { + public void run() { + try { + unexportNotAvailableServices(filter); + importServices(filter); + } catch (Exception e) { + LOG.error(e.getMessage(), e); + } + // Notify EndpointListeners? NO! + } + }); + } + + private void unexportNotAvailableServices(String filter) { + synchronized (importedServices) { + List<ImportRegistration> importRegistrations = importedServices.get(filter); + if (importRegistrations != null) { + // iterate over a copy + for (ImportRegistration ir : new ArrayList<ImportRegistration>(importRegistrations)) { + EndpointDescription endpoint = ir.getImportReference().getImportedEndpoint(); + if (!isImportPossibilityAvailable(endpoint, filter)) { + removeImport(ir, null); // also unexports the service + } + } + } + } + } + + private boolean isImportPossibilityAvailable(EndpointDescription endpoint, String filter) { + synchronized (importPossibilities) { + List<EndpointDescription> endpoints = importPossibilities.get(filter); + return endpoints != null && endpoints.contains(endpoint); + } + } + + // return a copy to prevent sync issues + private List<EndpointDescription> getImportPossibilitiesCopy(String filter) { + synchronized (importPossibilities) { + List<EndpointDescription> possibilities = importPossibilities.get(filter); + return possibilities == null + ? Collections.<EndpointDescription>emptyList() + : new ArrayList<EndpointDescription>(possibilities); + } + } + + private void importServices(String filter) { + synchronized (importedServices) { + List<ImportRegistration> importRegistrations = importedServices.get(filter); + for (EndpointDescription endpoint : getImportPossibilitiesCopy(filter)) { + // TODO but optional: if the service is already imported and the endpoint is still + // in the list of possible imports check if a "better" endpoint is now in the list + if (!alreadyImported(endpoint, importRegistrations)) { + // service not imported yet -> import it now + ImportRegistration ir = importService(endpoint); + if (ir != null) { + // import was successful + if (importRegistrations == null) { + importRegistrations = new ArrayList<ImportRegistration>(); + importedServices.put(filter, importRegistrations); + } + importRegistrations.add(ir); + if (!importAllAvailable) { + return; + } + } + } + } + } + } + + private boolean alreadyImported(EndpointDescription endpoint, List<ImportRegistration> importRegistrations) { + if (importRegistrations != null) { + for (ImportRegistration ir : importRegistrations) { + if (endpoint.equals(ir.getImportReference().getImportedEndpoint())) { + return true; + } + } + } + return false; + } + + /** + * Tries to import the service with each rsa until one import is successful + * + * @param endpoint endpoint to import + * @return import registration of the first successful import + */ + private ImportRegistration importService(EndpointDescription endpoint) { + for (RemoteServiceAdmin rsa : rsaSet) { + ImportRegistration ir = rsa.importService(endpoint); + if (ir != null) { + if (ir.getException() == null) { + LOG.debug("Service import was successful {}", ir); + return ir; + } else { + LOG.info("Error importing service " + endpoint, ir.getException()); + } + } + } + return null; + } + + /** + * Remove and close (unexport) the given import. The import is specified either + * by its ImportRegistration or by its ImportReference (only one of them must + * be specified). + * <p> + * If this method is called from within iterations on the underlying data structure, + * the iterations must be made on copies of the structures rather than the original + * references in order to prevent ConcurrentModificationExceptions. + * + * @param reg the import registration to remove + * @param ref the import reference to remove + */ + private void removeImport(ImportRegistration reg, ImportReference ref) { + // this method may be called recursively by calling ImportRegistration.close() + // and receiving a RemoteServiceAdminEvent for its unregistration, which results + // in a ConcurrentModificationException. We avoid this by closing the registrations + // only after data structure manipulation is done, and being re-entrant. + synchronized (importedServices) { + List<ImportRegistration> removed = new ArrayList<ImportRegistration>(); + for (Iterator<List<ImportRegistration>> it1 = importedServices.values().iterator(); it1.hasNext();) { + Collection<ImportRegistration> irs = it1.next(); + for (Iterator<ImportRegistration> it2 = irs.iterator(); it2.hasNext();) { + ImportRegistration ir = it2.next(); + if (ir.equals(reg) || ir.getImportReference().equals(ref)) { + removed.add(ir); + it2.remove(); + } + } + if (irs.isEmpty()) { + it1.remove(); + } + } + for (ImportRegistration ir : removed) { + ir.close(); + } + } + } + + public void remoteAdminEvent(RemoteServiceAdminEvent event) { + if (event.getType() == RemoteServiceAdminEvent.IMPORT_UNREGISTRATION) { + removeImport(null, event.getImportReference()); + } + } + +} http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/d73a3a7f/topology-manager/src/test/java/org/apache/cxf/dosgi/topologymanager/ActivatorTest.java ---------------------------------------------------------------------- diff --git a/topology-manager/src/test/java/org/apache/cxf/dosgi/topologymanager/ActivatorTest.java b/topology-manager/src/test/java/org/apache/cxf/dosgi/topologymanager/ActivatorTest.java new file mode 100644 index 0000000..100e3a3 --- /dev/null +++ b/topology-manager/src/test/java/org/apache/cxf/dosgi/topologymanager/ActivatorTest.java @@ -0,0 +1,68 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.cxf.dosgi.topologymanager; + +import org.apache.cxf.dosgi.topologymanager.exporter.DefaultExportPolicy; +import org.apache.cxf.dosgi.topologymanager.exporter.TopologyManagerExport; +import org.easymock.Capture; +import org.easymock.EasyMock; +import org.easymock.IAnswer; +import org.easymock.IMocksControl; +import org.junit.Test; +import org.osgi.framework.Bundle; +import org.osgi.framework.BundleContext; +import org.osgi.framework.Constants; +import org.osgi.framework.Filter; +import org.osgi.framework.FrameworkUtil; +import org.osgi.framework.ServiceReference; + +public class ActivatorTest { + + @Test + public void testStart() throws Exception { + IMocksControl c = EasyMock.createNiceControl(); + BundleContext context = c.createMock(BundleContext.class); + EasyMock.expect(context.getProperty(Constants.FRAMEWORK_UUID)).andReturn("myid"); + context.addServiceListener(EasyMock.isA(TopologyManagerExport.class)); + EasyMock.expectLastCall(); + final Capture<String> filter = EasyMock.newCapture(); + EasyMock.expect(context.createFilter(EasyMock.capture(filter))) + .andAnswer(new IAnswer<Filter>() { + public Filter answer() throws Throwable { + return FrameworkUtil.createFilter(filter.getValue()); + } + }).times(2); + ServiceReference<?> sref = c.createMock(ServiceReference.class); + Bundle bundle = c.createMock(Bundle.class); + EasyMock.expect(sref.getBundle()).andReturn(bundle).anyTimes(); + EasyMock.expect(context.getServiceReferences((String)null, Activator.DOSGI_SERVICES)) + .andReturn(new ServiceReference[]{sref}); + + c.replay(); + Activator activator = new Activator(); + activator.doStart(context, new DefaultExportPolicy()); + c.verify(); + + c.reset(); + c.replay(); + activator.doStop(context); + c.verify(); + } + +} http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/d73a3a7f/topology-manager/src/test/java/org/apache/cxf/dosgi/topologymanager/exporter/EndpointListenerNotifierTest.java ---------------------------------------------------------------------- diff --git a/topology-manager/src/test/java/org/apache/cxf/dosgi/topologymanager/exporter/EndpointListenerNotifierTest.java b/topology-manager/src/test/java/org/apache/cxf/dosgi/topologymanager/exporter/EndpointListenerNotifierTest.java new file mode 100644 index 0000000..04bd017 --- /dev/null +++ b/topology-manager/src/test/java/org/apache/cxf/dosgi/topologymanager/exporter/EndpointListenerNotifierTest.java @@ -0,0 +1,160 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.cxf.dosgi.topologymanager.exporter; + +import java.util.Arrays; +import java.util.Collection; +import java.util.Dictionary; +import java.util.HashSet; +import java.util.Hashtable; +import java.util.Iterator; +import java.util.Map; +import java.util.Set; + +import org.easymock.EasyMock; +import org.junit.Assert; +import org.junit.Test; +import org.osgi.framework.Filter; +import org.osgi.framework.FrameworkUtil; +import org.osgi.framework.InvalidSyntaxException; +import org.osgi.framework.ServiceReference; +import org.osgi.service.remoteserviceadmin.EndpointDescription; +import org.osgi.service.remoteserviceadmin.EndpointListener; +import org.osgi.service.remoteserviceadmin.RemoteConstants; + +import static org.hamcrest.core.Is.is; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertThat; + +@SuppressWarnings({ + "rawtypes", "unchecked" + }) +public class EndpointListenerNotifierTest { + + @Test + public void testNotifyListener() throws InvalidSyntaxException { + EndpointDescription endpoint1 = createEndpoint("myClass"); + EndpointDescription endpoint2 = createEndpoint("notMyClass"); + + // Expect listener to be called for endpoint1 but not for endpoint2 + EndpointListener epl = listenerExpects(endpoint1, "(objectClass=myClass)"); + + EndpointRepository exportRepository = new EndpointRepository(); + EndpointListenerNotifier tm = new EndpointListenerNotifier(exportRepository); + + EasyMock.replay(epl); + Set<Filter> filters = new HashSet<Filter>(); + filters.add(FrameworkUtil.createFilter("(objectClass=myClass)")); + tm.add(epl, filters); + tm.endpointAdded(endpoint1, null); + tm.endpointAdded(endpoint2, null); + tm.endpointRemoved(endpoint1, null); + tm.endpointRemoved(endpoint2, null); + EasyMock.verify(epl); + } + + private EndpointListener listenerExpects(EndpointDescription endpoint, String filter) { + EndpointListener epl = EasyMock.createStrictMock(EndpointListener.class); + epl.endpointAdded(EasyMock.eq(endpoint), EasyMock.eq(filter)); + EasyMock.expectLastCall().once(); + epl.endpointRemoved(EasyMock.eq(endpoint), EasyMock.eq(filter)); + EasyMock.expectLastCall().once(); + return epl; + } + + @Test + public void testNotifyListeners() throws InvalidSyntaxException { + EndpointDescription endpoint1 = createEndpoint("myClass"); + + EndpointListener epl = EasyMock.createStrictMock(EndpointListener.class); + epl.endpointAdded(EasyMock.eq(endpoint1), EasyMock.eq("(objectClass=myClass)")); + EasyMock.expectLastCall().once(); + epl.endpointRemoved(EasyMock.eq(endpoint1), EasyMock.eq("(objectClass=myClass)")); + EasyMock.expectLastCall().once(); + + EndpointRepository exportRepository = new EndpointRepository(); + EndpointListenerNotifier tm = new EndpointListenerNotifier(exportRepository); + + EasyMock.replay(epl); + Set<Filter> filters = new HashSet<Filter>(); + filters.add(FrameworkUtil.createFilter("(objectClass=myClass)")); + tm.add(epl, filters); + tm.endpointAdded(endpoint1, null); + tm.endpointRemoved(endpoint1, null); + tm.remove(epl); + EasyMock.verify(epl); + } + + public EndpointDescription createEndpoint(String iface) { + Map<String, Object> props = new Hashtable<String, Object>(); + props.put("objectClass", new String[]{iface}); + props.put(RemoteConstants.ENDPOINT_ID, iface); + props.put(RemoteConstants.SERVICE_IMPORTED_CONFIGS, "any"); + return new EndpointDescription(props); + } + + @Test + public void testNormalizeScopeForSingleString() { + ServiceReference sr = createListenerServiceWithFilter("(myProp=A)"); + Set<Filter> res = EndpointListenerNotifier.getFiltersFromEndpointListenerScope(sr); + assertEquals(1, res.size()); + Filter filter = res.iterator().next(); + filterMatches(filter); + } + + @Test + public void testNormalizeScopeForStringArray() { + String[] filters = {"(myProp=A)", "(otherProp=B)"}; + ServiceReference sr = createListenerServiceWithFilter(filters); + Set<Filter> res = EndpointListenerNotifier.getFiltersFromEndpointListenerScope(sr); + assertEquals(filters.length, res.size()); + Iterator<Filter> it = res.iterator(); + Filter filter1 = it.next(); + Filter filter2 = it.next(); + Dictionary<String, String> props = new Hashtable(); + props.put("myProp", "A"); + assertThat(filter1.match(props) || filter2.match(props), is(true)); + } + + @Test + public void testNormalizeScopeForCollection() { + Collection<String> collection = Arrays.asList("(myProp=A)", "(otherProp=B)"); + ServiceReference sr = createListenerServiceWithFilter(collection); + Set<Filter> res = EndpointListenerNotifier.getFiltersFromEndpointListenerScope(sr); + Iterator<Filter> it = res.iterator(); + Filter filter1 = it.next(); + Filter filter2 = it.next(); + Dictionary<String, String> props = new Hashtable(); + props.put("myProp", "A"); + Assert.assertThat(filter1.match(props) || filter2.match(props), is(true)); + } + + private void filterMatches(Filter filter) { + Dictionary<String, String> props = new Hashtable(); + props.put("myProp", "A"); + Assert.assertTrue("Filter should match", filter.match(props)); + } + + private ServiceReference createListenerServiceWithFilter(Object filters) { + ServiceReference sr = EasyMock.createMock(ServiceReference.class); + EasyMock.expect(sr.getProperty(EndpointListener.ENDPOINT_LISTENER_SCOPE)).andReturn(filters); + EasyMock.replay(sr); + return sr; + } +} http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/d73a3a7f/topology-manager/src/test/java/org/apache/cxf/dosgi/topologymanager/exporter/EndpointRepositoryTest.java ---------------------------------------------------------------------- diff --git a/topology-manager/src/test/java/org/apache/cxf/dosgi/topologymanager/exporter/EndpointRepositoryTest.java b/topology-manager/src/test/java/org/apache/cxf/dosgi/topologymanager/exporter/EndpointRepositoryTest.java new file mode 100644 index 0000000..cb07f43 --- /dev/null +++ b/topology-manager/src/test/java/org/apache/cxf/dosgi/topologymanager/exporter/EndpointRepositoryTest.java @@ -0,0 +1,82 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.cxf.dosgi.topologymanager.exporter; + +import java.util.Arrays; +import java.util.Hashtable; +import java.util.List; +import java.util.Map; + +import org.easymock.EasyMock; +import org.easymock.IMocksControl; +import org.junit.Test; +import org.osgi.framework.Bundle; +import org.osgi.framework.InvalidSyntaxException; +import org.osgi.framework.ServiceReference; +import org.osgi.service.remoteserviceadmin.EndpointDescription; +import org.osgi.service.remoteserviceadmin.EndpointListener; +import org.osgi.service.remoteserviceadmin.RemoteConstants; +import org.osgi.service.remoteserviceadmin.RemoteServiceAdmin; + +public class EndpointRepositoryTest { + + @Test + public void testAddRemove() throws InvalidSyntaxException { + EndpointDescription ep1 = createEndpoint("my"); + + IMocksControl c = EasyMock.createControl(); + ServiceReference<?> sref = createService(c); + RemoteServiceAdmin rsa = c.createMock(RemoteServiceAdmin.class); + EndpointListener notifier = c.createMock(EndpointListener.class); + + notifier.endpointAdded(ep1, null); + EasyMock.expectLastCall(); + + c.replay(); + EndpointRepository repo = new EndpointRepository(); + repo.setNotifier(notifier); + List<EndpointDescription> endpoints = Arrays.asList(ep1); + repo.addEndpoints(sref, rsa, endpoints); + c.verify(); + + c.reset(); + notifier.endpointRemoved(ep1, null); + EasyMock.expectLastCall(); + + c.replay(); + repo.removeRemoteServiceAdmin(rsa); + c.verify(); + } + + private ServiceReference<?> createService(IMocksControl c) { + ServiceReference<?> sref = c.createMock(ServiceReference.class); + Bundle bundle = c.createMock(Bundle.class); + EasyMock.expect(bundle.getSymbolicName()).andReturn("myBundle"); + EasyMock.expect(sref.getBundle()).andReturn(bundle); + return sref; + } + + public EndpointDescription createEndpoint(String iface) { + Map<String, Object> props = new Hashtable<String, Object>(); + props.put("objectClass", new String[]{iface}); + props.put(RemoteConstants.ENDPOINT_ID, iface); + props.put(RemoteConstants.SERVICE_IMPORTED_CONFIGS, "any"); + return new EndpointDescription(props); + } +} http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/d73a3a7f/topology-manager/src/test/java/org/apache/cxf/dosgi/topologymanager/exporter/TopologyManagerExportTest.java ---------------------------------------------------------------------- diff --git a/topology-manager/src/test/java/org/apache/cxf/dosgi/topologymanager/exporter/TopologyManagerExportTest.java b/topology-manager/src/test/java/org/apache/cxf/dosgi/topologymanager/exporter/TopologyManagerExportTest.java new file mode 100644 index 0000000..0eda150 --- /dev/null +++ b/topology-manager/src/test/java/org/apache/cxf/dosgi/topologymanager/exporter/TopologyManagerExportTest.java @@ -0,0 +1,153 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.cxf.dosgi.topologymanager.exporter; + +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.Executor; + +import org.apache.cxf.dosgi.dsw.api.ExportPolicy; +import org.easymock.EasyMock; +import org.easymock.IMocksControl; +import org.junit.Test; +import org.osgi.framework.Bundle; +import org.osgi.framework.Constants; +import org.osgi.framework.ServiceEvent; +import org.osgi.framework.ServiceReference; +import org.osgi.service.remoteserviceadmin.EndpointDescription; +import org.osgi.service.remoteserviceadmin.EndpointListener; +import org.osgi.service.remoteserviceadmin.ExportReference; +import org.osgi.service.remoteserviceadmin.ExportRegistration; +import org.osgi.service.remoteserviceadmin.RemoteConstants; +import org.osgi.service.remoteserviceadmin.RemoteServiceAdmin; + +import static org.easymock.EasyMock.expectLastCall; + +@SuppressWarnings({"rawtypes", "unchecked"}) +public class TopologyManagerExportTest { + + /** + * This tests if the topology manager handles a service marked to be exported correctly by exporting it to + * an available RemoteServiceAdmin and notifying an EndpointListener afterwards. + * + * @throws Exception + */ + @Test + public void testServiceExportUnexport() throws Exception { + IMocksControl c = EasyMock.createControl(); + RemoteServiceAdmin rsa = c.createMock(RemoteServiceAdmin.class); + final EndpointListener notifier = c.createMock(EndpointListener.class); + final ServiceReference sref = createUserService(c); + EndpointDescription epd = createEndpoint(); + expectServiceExported(c, rsa, notifier, sref, epd); + + c.replay(); + EndpointRepository endpointRepo = new EndpointRepository(); + endpointRepo.setNotifier(notifier); + Executor executor = syncExecutor(); + ExportPolicy policy = new DefaultExportPolicy(); + TopologyManagerExport exportManager = new TopologyManagerExport(endpointRepo, executor, policy); + exportManager.add(rsa); + exportManager.serviceChanged(new ServiceEvent(ServiceEvent.REGISTERED, sref)); + c.verify(); + + c.reset(); + notifier.endpointRemoved(epd, null); + expectLastCall().once(); + c.replay(); + exportManager.serviceChanged(new ServiceEvent(ServiceEvent.UNREGISTERING, sref)); + c.verify(); + + c.reset(); + c.replay(); + exportManager.serviceChanged(new ServiceEvent(ServiceEvent.MODIFIED, sref)); + c.verify(); + + c.reset(); + c.replay(); + exportManager.remove(rsa); + c.verify(); + } + + @Test + public void testExportExisting() throws Exception { + IMocksControl c = EasyMock.createControl(); + RemoteServiceAdmin rsa = c.createMock(RemoteServiceAdmin.class); + final EndpointListenerNotifier mockEpListenerNotifier = c.createMock(EndpointListenerNotifier.class); + final ServiceReference sref = createUserService(c); + expectServiceExported(c, rsa, mockEpListenerNotifier, sref, createEndpoint()); + c.replay(); + + EndpointRepository endpointRepo = new EndpointRepository(); + endpointRepo.setNotifier(mockEpListenerNotifier); + ExportPolicy policy = new DefaultExportPolicy(); + TopologyManagerExport exportManager = new TopologyManagerExport(endpointRepo, syncExecutor(), policy); + exportManager.serviceChanged(new ServiceEvent(ServiceEvent.REGISTERED, sref)); + exportManager.add(rsa); + c.verify(); + } + + private void expectServiceExported(IMocksControl c, RemoteServiceAdmin rsa, + final EndpointListener listener, + final ServiceReference sref, EndpointDescription epd) { + ExportRegistration exportRegistration = createExportRegistration(c, epd); + EasyMock.expect(rsa.exportService(EasyMock.same(sref), (Map<String, Object>)EasyMock.anyObject())) + .andReturn(Collections.singletonList(exportRegistration)).once(); + listener.endpointAdded(epd, null); + EasyMock.expectLastCall().once(); + } + + private Executor syncExecutor() { + return new Executor() { + @Override + public void execute(Runnable command) { + command.run(); + } + }; + } + + private ExportRegistration createExportRegistration(IMocksControl c, EndpointDescription endpoint) { + ExportRegistration exportRegistration = c.createMock(ExportRegistration.class); + ExportReference exportReference = c.createMock(ExportReference.class); + EasyMock.expect(exportRegistration.getExportReference()).andReturn(exportReference).anyTimes(); + EasyMock.expect(exportRegistration.getException()).andReturn(null).anyTimes(); + EasyMock.expect(exportReference.getExportedEndpoint()).andReturn(endpoint).anyTimes(); + return exportRegistration; + } + + private EndpointDescription createEndpoint() { + Map<String, Object> props = new HashMap<String, Object>(); + props.put(RemoteConstants.ENDPOINT_ID, "1"); + props.put(Constants.OBJECTCLASS, new String[] {"abc"}); + props.put(RemoteConstants.SERVICE_IMPORTED_CONFIGS, "cxf"); + return new EndpointDescription(props); + } + + private ServiceReference createUserService(IMocksControl c) { + final ServiceReference sref = c.createMock(ServiceReference.class); + EasyMock.expect(sref.getProperty(EasyMock.same(RemoteConstants.SERVICE_EXPORTED_INTERFACES))) + .andReturn("*").anyTimes(); + Bundle srefBundle = c.createMock(Bundle.class); + EasyMock.expect(sref.getBundle()).andReturn(srefBundle).atLeastOnce(); + EasyMock.expect(sref.getProperty("objectClass")).andReturn("org.My").anyTimes(); + EasyMock.expect(srefBundle.getSymbolicName()).andReturn("serviceBundleName").atLeastOnce(); + return sref; + } +} http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/d73a3a7f/topology-manager/src/test/java/org/apache/cxf/dosgi/topologymanager/importer/EndpointListenerImplTest.java ---------------------------------------------------------------------- diff --git a/topology-manager/src/test/java/org/apache/cxf/dosgi/topologymanager/importer/EndpointListenerImplTest.java b/topology-manager/src/test/java/org/apache/cxf/dosgi/topologymanager/importer/EndpointListenerImplTest.java new file mode 100644 index 0000000..c736197 --- /dev/null +++ b/topology-manager/src/test/java/org/apache/cxf/dosgi/topologymanager/importer/EndpointListenerImplTest.java @@ -0,0 +1,111 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.cxf.dosgi.topologymanager.importer; + +import java.util.Dictionary; +import java.util.List; + +import org.easymock.EasyMock; +import org.easymock.IAnswer; +import org.easymock.IMocksControl; +import org.junit.Assert; +import org.junit.Test; +import org.osgi.framework.BundleContext; +import org.osgi.framework.ServiceRegistration; +import org.osgi.service.remoteserviceadmin.EndpointListener; + +public class EndpointListenerImplTest extends Assert { + + int testCase; + + @SuppressWarnings({ + "rawtypes", "unchecked" + }) + @Test + public void testScopeChange() { + IMocksControl c = EasyMock.createNiceControl(); + BundleContext bc = c.createMock(BundleContext.class); + TopologyManagerImport tm = c.createMock(TopologyManagerImport.class); + ServiceRegistration sr = c.createMock(ServiceRegistration.class); + + // expect Listener registration + EasyMock.expect(bc.registerService(EasyMock.anyObject(Class.class), + EasyMock.anyObject(), + (Dictionary)EasyMock.anyObject())).andReturn(sr).atLeastOnce(); + + sr.setProperties((Dictionary)EasyMock.anyObject()); + + // expect property changes based on later calls + EasyMock.expectLastCall().andAnswer(new IAnswer<Object>() { + + public Object answer() throws Throwable { + Object[] args = EasyMock.getCurrentArguments(); + Dictionary props = (Dictionary)args[0]; + List<String> scope = (List<String>)props.get(EndpointListener.ENDPOINT_LISTENER_SCOPE); + switch (testCase) { + case 1: + assertEquals(1, scope.size()); + assertEquals("(a=b)", scope.get(0)); + break; + case 2: + assertEquals(0, scope.size()); + break; + case 3: + assertEquals("adding entry to empty list failed", 1, scope.size()); + assertEquals("(a=b)", scope.get(0)); + break; + case 4: + assertEquals("adding second entry failed", 2, scope.size()); + assertNotNull(scope.contains("(a=b)")); + assertNotNull(scope.contains("(c=d)")); + break; + case 5: + assertEquals("remove failed", 1, scope.size()); + assertEquals("(c=d)", scope.get(0)); + break; + default: + assertTrue("This should not happen!", false); + } + return null; + } + }).atLeastOnce(); + + c.replay(); + + EndpointListenerManager endpointListener = new EndpointListenerManager(bc, tm); + + endpointListener.start(); + + testCase = 1; + endpointListener.extendScope("(a=b)"); + testCase = 2; + endpointListener.reduceScope("(a=b)"); + + testCase = 3; + endpointListener.extendScope("(a=b)"); + testCase = 4; + endpointListener.extendScope("(c=d)"); + testCase = 5; + endpointListener.reduceScope("(a=b)"); + + endpointListener.stop(); + + c.verify(); + } +} http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/d73a3a7f/topology-manager/src/test/java/org/apache/cxf/dosgi/topologymanager/importer/ListenerHookImplTest.java ---------------------------------------------------------------------- diff --git a/topology-manager/src/test/java/org/apache/cxf/dosgi/topologymanager/importer/ListenerHookImplTest.java b/topology-manager/src/test/java/org/apache/cxf/dosgi/topologymanager/importer/ListenerHookImplTest.java new file mode 100644 index 0000000..1e2f90c --- /dev/null +++ b/topology-manager/src/test/java/org/apache/cxf/dosgi/topologymanager/importer/ListenerHookImplTest.java @@ -0,0 +1,89 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.cxf.dosgi.topologymanager.importer; + +import java.util.Collection; +import java.util.Collections; +import java.util.Dictionary; +import java.util.Hashtable; + +import org.easymock.EasyMock; +import org.easymock.IMocksControl; +import org.junit.Test; +import org.osgi.framework.BundleContext; +import org.osgi.framework.Filter; +import org.osgi.framework.FrameworkUtil; +import org.osgi.framework.InvalidSyntaxException; +import org.osgi.framework.hooks.service.ListenerHook.ListenerInfo; +import org.osgi.service.remoteserviceadmin.RemoteConstants; + +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +public class ListenerHookImplTest { + + @Test + public void testExtendFilter() throws InvalidSyntaxException { + String filter = "(a=b)"; + BundleContext bc = createBundleContext(); + filter = new ListenerHookImpl(bc, null).extendFilter(filter); + + Filter f = FrameworkUtil.createFilter(filter); + + Dictionary<String, String> m = new Hashtable<String, String>(); + m.put("a", "b"); + assertTrue(filter + " filter must match as uuid is missing", f.match(m)); + m.put(RemoteConstants.ENDPOINT_FRAMEWORK_UUID, "MyUUID"); + assertFalse(filter + " filter must NOT match as uuid is the local one", f.match(m)); + } + + @Test + public void testAddedRemoved() throws InvalidSyntaxException { + IMocksControl c = EasyMock.createControl(); + String filter = "(objectClass=My)"; + BundleContext bc = createBundleContext(); + BundleContext listenerBc = createBundleContext(); + ServiceInterestListener serviceInterestListener = c.createMock(ServiceInterestListener.class); + ListenerHookImpl listenerHook = new ListenerHookImpl(bc, serviceInterestListener); + + ListenerInfo listener = c.createMock(ListenerInfo.class); + EasyMock.expect(listener.getBundleContext()).andReturn(listenerBc); + EasyMock.expect(listener.getFilter()).andReturn(filter).atLeastOnce(); + + // Main assertions + serviceInterestListener.addServiceInterest(listenerHook.extendFilter(filter)); + EasyMock.expectLastCall(); + serviceInterestListener.removeServiceInterest(listenerHook.extendFilter(filter)); + EasyMock.expectLastCall(); + + Collection<ListenerInfo> listeners = Collections.singletonList(listener); + + c.replay(); + listenerHook.added(listeners); + listenerHook.removed(listeners); + c.verify(); + } + + private BundleContext createBundleContext() { + BundleContext bc = EasyMock.createNiceMock(BundleContext.class); + EasyMock.expect(bc.getProperty(EasyMock.eq("org.osgi.framework.uuid"))).andReturn("MyUUID").atLeastOnce(); + EasyMock.replay(bc); + return bc; + } +} http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/d73a3a7f/topology-manager/src/test/java/org/apache/cxf/dosgi/topologymanager/importer/ReferenceCounterTest.java ---------------------------------------------------------------------- diff --git a/topology-manager/src/test/java/org/apache/cxf/dosgi/topologymanager/importer/ReferenceCounterTest.java b/topology-manager/src/test/java/org/apache/cxf/dosgi/topologymanager/importer/ReferenceCounterTest.java new file mode 100644 index 0000000..3ab78db --- /dev/null +++ b/topology-manager/src/test/java/org/apache/cxf/dosgi/topologymanager/importer/ReferenceCounterTest.java @@ -0,0 +1,42 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.cxf.dosgi.topologymanager.importer; + +import org.junit.Test; + +import static org.junit.Assert.assertEquals; + +public class ReferenceCounterTest { + + @Test + public void testCounter() { + ReferenceCounter<String> counter = new ReferenceCounter<String>(); + assertEquals(-1, counter.remove("a")); + assertEquals(-1, counter.remove("a")); + assertEquals(1, counter.add("a")); + assertEquals(2, counter.add("a")); + assertEquals(3, counter.add("a")); + assertEquals(2, counter.remove("a")); + assertEquals(1, counter.remove("a")); + assertEquals(2, counter.add("a")); + assertEquals(1, counter.remove("a")); + assertEquals(0, counter.remove("a")); + assertEquals(-1, counter.remove("a")); + } +} http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/d73a3a7f/topology-manager/src/test/java/org/apache/cxf/dosgi/topologymanager/importer/TopologyManagerImportTest.java ---------------------------------------------------------------------- diff --git a/topology-manager/src/test/java/org/apache/cxf/dosgi/topologymanager/importer/TopologyManagerImportTest.java b/topology-manager/src/test/java/org/apache/cxf/dosgi/topologymanager/importer/TopologyManagerImportTest.java new file mode 100644 index 0000000..00be969 --- /dev/null +++ b/topology-manager/src/test/java/org/apache/cxf/dosgi/topologymanager/importer/TopologyManagerImportTest.java @@ -0,0 +1,87 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.cxf.dosgi.topologymanager.importer; + +import java.util.Dictionary; +import java.util.concurrent.Semaphore; +import java.util.concurrent.TimeUnit; + +import org.easymock.EasyMock; +import org.easymock.IAnswer; +import org.easymock.IMocksControl; +import org.junit.Test; +import org.osgi.framework.BundleContext; +import org.osgi.framework.Constants; +import org.osgi.framework.ServiceRegistration; +import org.osgi.service.remoteserviceadmin.EndpointDescription; +import org.osgi.service.remoteserviceadmin.ImportReference; +import org.osgi.service.remoteserviceadmin.ImportRegistration; +import org.osgi.service.remoteserviceadmin.RemoteServiceAdmin; + +import static org.junit.Assert.assertTrue; + +public class TopologyManagerImportTest { + + @SuppressWarnings({ + "rawtypes", "unchecked" + }) + @Test + public void testImportForNewlyAddedRSA() throws InterruptedException { + IMocksControl c = EasyMock.createControl(); + + c.makeThreadSafe(true); + + final Semaphore sema = new Semaphore(0); + + ServiceRegistration sreg = c.createMock(ServiceRegistration.class); + sreg.unregister(); + EasyMock.expectLastCall().once(); + + BundleContext bc = c.createMock(BundleContext.class); + EasyMock.expect(bc.registerService(EasyMock.anyObject(Class.class), + EasyMock.anyObject(), + (Dictionary)EasyMock.anyObject())).andReturn(sreg).anyTimes(); + EasyMock.expect(bc.getProperty(Constants.FRAMEWORK_UUID)).andReturn("myid"); + + EndpointDescription endpoint = c.createMock(EndpointDescription.class); + RemoteServiceAdmin rsa = c.createMock(RemoteServiceAdmin.class); + final ImportRegistration ireg = c.createMock(ImportRegistration.class); + EasyMock.expect(ireg.getException()).andReturn(null).anyTimes(); + ImportReference iref = c.createMock(ImportReference.class); + EasyMock.expect(ireg.getImportReference()).andReturn(iref).anyTimes(); + EasyMock.expect(iref.getImportedEndpoint()).andReturn(endpoint).anyTimes(); + + EasyMock.expect(rsa.importService(EasyMock.eq(endpoint))).andAnswer(new IAnswer<ImportRegistration>() { + public ImportRegistration answer() throws Throwable { + sema.release(); + return ireg; + } + }).once(); + c.replay(); + + TopologyManagerImport tm = new TopologyManagerImport(bc); + tm.start(); + tm.endpointAdded(endpoint, "myFilter"); + tm.add(rsa); + assertTrue("rsa.ImportService should have been called", + sema.tryAcquire(100, TimeUnit.SECONDS)); + tm.stop(); + c.verify(); + } +}
