This is an automated email from the ASF dual-hosted git repository. amichai pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/aries-rsa.git
commit 0b94606ef815d6a8ceea72fe39ef8cc8aa2261a0 Author: Amichai Rothman <[email protected]> AuthorDate: Mon Jun 3 16:26:14 2024 +0300 Convert tabs to spaces --- LICENSE | 36 +-- discovery/local/bnd.bnd | 2 +- .../aries/rsa/discovery/local/LocalDiscovery.java | 8 +- .../apache/aries/rsa/discovery/mdns/Interest.java | 134 ++++---- .../aries/rsa/discovery/mdns/InterestManager.java | 166 +++++----- .../aries/rsa/discovery/mdns/MdnsDiscovery.java | 350 ++++++++++----------- .../discovery/mdns/PublishingEndpointListener.java | 206 ++++++------ itests/tck/tck.bndrun | 34 +- provider/tcp/bnd.bnd | 4 +- 9 files changed, 470 insertions(+), 470 deletions(-) diff --git a/LICENSE b/LICENSE index 2654c436..a00e56bb 100644 --- a/LICENSE +++ b/LICENSE @@ -202,24 +202,24 @@ limitations under the License. - ====================================== - LICENSES FOR INCLUDED DEPENDENCIES - ====================================== - - All the source code for the Aries JPA project is released under the - license above. Additionally, the Aries JPA binary distribution - includes a number of third-party files that are required in - order to the software to function. Unless noted below, these jars - and resource files are also released under the ASF license above. - - The exceptions are as follows: - - =========================== - persistence-xsd.rsrc - included in the org.apache.aries.jpa.container jar, taken from: - http://java.sun.com/xml/ns/persistence/persistence_1_0.xsd) - persistence_2_0-xsd.rsrc - included in the org.apache.aries.jpa.container jar, taken from: - http://java.sun.com/xml/ns/persistence/persistence_2_0.xsd) - =========================== + ====================================== + LICENSES FOR INCLUDED DEPENDENCIES + ====================================== + + All the source code for the Aries JPA project is released under the + license above. Additionally, the Aries JPA binary distribution + includes a number of third-party files that are required in + order to the software to function. Unless noted below, these jars + and resource files are also released under the ASF license above. + + The exceptions are as follows: + + =========================== + persistence-xsd.rsrc - included in the org.apache.aries.jpa.container jar, taken from: + http://java.sun.com/xml/ns/persistence/persistence_1_0.xsd) + persistence_2_0-xsd.rsrc - included in the org.apache.aries.jpa.container jar, taken from: + http://java.sun.com/xml/ns/persistence/persistence_2_0.xsd) + =========================== COMMON DEVELOPMENT AND DISTRIBUTION LICENSE (CDDL) Version 1.0 diff --git a/discovery/local/bnd.bnd b/discovery/local/bnd.bnd index 30698353..542135a3 100644 --- a/discovery/local/bnd.bnd +++ b/discovery/local/bnd.bnd @@ -16,4 +16,4 @@ # specific language governing permissions and limitations # under the License. Export-Package: \ - org.osgi.xmlns.rsa.v1_0;version=1.0.0 + org.osgi.xmlns.rsa.v1_0;version=1.0.0 diff --git a/discovery/local/src/main/java/org/apache/aries/rsa/discovery/local/LocalDiscovery.java b/discovery/local/src/main/java/org/apache/aries/rsa/discovery/local/LocalDiscovery.java index 5df81bd8..89cae9df 100644 --- a/discovery/local/src/main/java/org/apache/aries/rsa/discovery/local/LocalDiscovery.java +++ b/discovery/local/src/main/java/org/apache/aries/rsa/discovery/local/LocalDiscovery.java @@ -130,10 +130,10 @@ public class LocalDiscovery implements BundleListener { void updatedListener(ServiceReference<EndpointEventListener> endpointListenerRef, EndpointEventListener endpointListener) { // if service properties have been updated, the filter (scope) // might have changed so we remove and re-add the listener - // TODO fix this so that we don't: - // 1. remove and add when there is no change - // 2. remove and add instead of modifying - // 3. remove instead of modified end match + // TODO fix this so that we don't: + // 1. remove and add when there is no change + // 2. remove and add instead of modifying + // 3. remove instead of modified end match synchronized (listenerToFilters) { unbindListener(endpointListener); bindListener(endpointListenerRef, endpointListener); diff --git a/discovery/mdns/src/main/java/org/apache/aries/rsa/discovery/mdns/Interest.java b/discovery/mdns/src/main/java/org/apache/aries/rsa/discovery/mdns/Interest.java index 530751d9..ef0c7953 100644 --- a/discovery/mdns/src/main/java/org/apache/aries/rsa/discovery/mdns/Interest.java +++ b/discovery/mdns/src/main/java/org/apache/aries/rsa/discovery/mdns/Interest.java @@ -49,41 +49,41 @@ public class Interest { public Interest(Long id, EndpointEventListener epListener, Map<String, Object> props) { - this.id = id; + this.id = id; this.scopes.set(StringPlus.normalize(props.get(ENDPOINT_LISTENER_SCOPE))); this.epListener = epListener; } public void update(Map<String, Object> props) { - - List<String> newScopes = StringPlus.normalize(props.get(ENDPOINT_LISTENER_SCOPE)); - List<String> oldScopes = this.scopes.getAndSet(newScopes); - - added.values().removeIf(ed -> { - Optional<String> newScope = getFirstMatch(ed, newScopes); - Optional<String> oldScope = getFirstMatch(ed, oldScopes); - EndpointEvent event; - boolean remove; - String filter; - if(newScope.isPresent()) { - remove = false; - filter = newScope.get(); - if(oldScope.isPresent() && oldScope.get().equals(filter)) { - event = null; - } else { - event = new EndpointEvent(MODIFIED, ed); - } - } else { - remove = true; - event = new EndpointEvent(REMOVED, ed); - filter = oldScope.orElse(null); - } + + List<String> newScopes = StringPlus.normalize(props.get(ENDPOINT_LISTENER_SCOPE)); + List<String> oldScopes = this.scopes.getAndSet(newScopes); + + added.values().removeIf(ed -> { + Optional<String> newScope = getFirstMatch(ed, newScopes); + Optional<String> oldScope = getFirstMatch(ed, oldScopes); + EndpointEvent event; + boolean remove; + String filter; + if(newScope.isPresent()) { + remove = false; + filter = newScope.get(); + if(oldScope.isPresent() && oldScope.get().equals(filter)) { + event = null; + } else { + event = new EndpointEvent(MODIFIED, ed); + } + } else { + remove = true; + event = new EndpointEvent(REMOVED, ed); + filter = oldScope.orElse(null); + } - if (event != null) - notifyListener(event, filter); - - return remove; - }); + if (event != null) + notifyListener(event, filter); + + return remove; + }); } public Object getEpListener() { @@ -91,57 +91,57 @@ public class Interest { } public void endpointChanged(EndpointDescription ed) { - List<String> scopes = this.scopes.get(); - Optional<String> currentScope = getFirstMatch(ed, scopes); - boolean alreadyAdded = added.containsKey(ed.getId()); - EndpointEvent event; - String filter; + List<String> scopes = this.scopes.get(); + Optional<String> currentScope = getFirstMatch(ed, scopes); + boolean alreadyAdded = added.containsKey(ed.getId()); + EndpointEvent event; + String filter; if (currentScope.isPresent()) { - if(LOG.isDebugEnabled()) { - LOG.debug("Listener {} is interested in endpoint {}. It will be {}", id, ed, alreadyAdded ? "MODIFIED" : "ADDED"); - } - added.put(ed.getId(), ed); - event = new EndpointEvent(alreadyAdded ? MODIFIED : ADDED, ed); - filter = currentScope.get(); + if(LOG.isDebugEnabled()) { + LOG.debug("Listener {} is interested in endpoint {}. It will be {}", id, ed, alreadyAdded ? "MODIFIED" : "ADDED"); + } + added.put(ed.getId(), ed); + event = new EndpointEvent(alreadyAdded ? MODIFIED : ADDED, ed); + filter = currentScope.get(); } else if(alreadyAdded) { - if(LOG.isDebugEnabled()) { - LOG.debug("Listener {} is no longer interested in endpoint {}. It will be {}", id, ed, "MODIFIED"); - } - EndpointDescription previous = added.remove(ed.getId()); - event = new EndpointEvent(MODIFIED_ENDMATCH, ed); - filter = getFirstMatch(previous, scopes).orElse(null); + if(LOG.isDebugEnabled()) { + LOG.debug("Listener {} is no longer interested in endpoint {}. It will be {}", id, ed, "MODIFIED"); + } + EndpointDescription previous = added.remove(ed.getId()); + event = new EndpointEvent(MODIFIED_ENDMATCH, ed); + filter = getFirstMatch(previous, scopes).orElse(null); } else { - if(LOG.isDebugEnabled()) { - LOG.debug("Listener {} not interested in endpoint {}", id, ed); - } - return; + if(LOG.isDebugEnabled()) { + LOG.debug("Listener {} not interested in endpoint {}", id, ed); + } + return; } - notifyListener(event, filter); + notifyListener(event, filter); } - public void endpointRemoved(String id) { - EndpointDescription previous = added.remove(id); - if(previous != null) { - if(LOG.isDebugEnabled()) { - LOG.debug("Endpoint {} is no longer available for listener {}", id, this.id); - } - notifyListener(new EndpointEvent(REMOVED, previous), getFirstMatch(previous, scopes.get()).orElse(null)); - } - } + public void endpointRemoved(String id) { + EndpointDescription previous = added.remove(id); + if(previous != null) { + if(LOG.isDebugEnabled()) { + LOG.debug("Endpoint {} is no longer available for listener {}", id, this.id); + } + notifyListener(new EndpointEvent(REMOVED, previous), getFirstMatch(previous, scopes.get()).orElse(null)); + } + } - private void notifyListener(EndpointEvent event, String filter) { - EndpointDescription endpoint = event.getEndpoint(); - LOG.info("Calling endpointChanged on class {} for filter {}, type {}, endpoint {} ", - epListener, filter, event.getType(), endpoint); - epListener.endpointChanged(event, filter); - } + private void notifyListener(EndpointEvent event, String filter) { + EndpointDescription endpoint = event.getEndpoint(); + LOG.info("Calling endpointChanged on class {} for filter {}, type {}, endpoint {} ", + epListener, filter, event.getType(), endpoint); + epListener.endpointChanged(event, filter); + } private Optional<String> getFirstMatch(EndpointDescription endpoint, List<String> scopes) { return scopes.stream().filter(endpoint::matches).findFirst(); } - @Override + @Override public String toString() { return "Interest [scopes=" + scopes + ", epListener=" + epListener.getClass() + "]"; } diff --git a/discovery/mdns/src/main/java/org/apache/aries/rsa/discovery/mdns/InterestManager.java b/discovery/mdns/src/main/java/org/apache/aries/rsa/discovery/mdns/InterestManager.java index 3ba9bf99..473416d0 100644 --- a/discovery/mdns/src/main/java/org/apache/aries/rsa/discovery/mdns/InterestManager.java +++ b/discovery/mdns/src/main/java/org/apache/aries/rsa/discovery/mdns/InterestManager.java @@ -64,90 +64,90 @@ public class InterestManager { private final ConcurrentMap<String, SseEventSource> streams = new ConcurrentHashMap<>(); public InterestManager(SseEventSourceFactory factory, EndpointDescriptionParser parser, Client client) { - - this.eventSourceFactory = factory; - this.parser = parser; - this.client = client; - + + this.eventSourceFactory = factory; + this.parser = parser; + this.client = client; + } public void deactivate() { - streams.values().forEach(SseEventSource::close); - streams.clear(); - + streams.values().forEach(SseEventSource::close); + streams.clear(); + interests.clear(); } public void remoteAdded(String uri) { - if(streams.containsKey(uri)) { - return; - } - - if(LOG.isInfoEnabled()) { - LOG.info("Discovered a remote at {}", uri); - } - - SseEventSource sse = eventSourceFactory.newBuilder(client.target(uri)).build(); - sse.register(i -> onEndpointEvent(uri, i), t -> lostRemoteStream(uri, t), () -> lostRemoteStream(uri, null)); - streams.put(uri, sse); - sse.open(); + if(streams.containsKey(uri)) { + return; + } + + if(LOG.isInfoEnabled()) { + LOG.info("Discovered a remote at {}", uri); + } + + SseEventSource sse = eventSourceFactory.newBuilder(client.target(uri)).build(); + sse.register(i -> onEndpointEvent(uri, i), t -> lostRemoteStream(uri, t), () -> lostRemoteStream(uri, null)); + streams.put(uri, sse); + sse.open(); } public void remoteRemoved(String uri) { - if(LOG.isInfoEnabled()) { - LOG.info("Remote at {} is no longer present", uri); - } - - SseEventSource sseEventSource = streams.remove(uri); - if(sseEventSource != null) { - sseEventSource.close(); - } + if(LOG.isInfoEnabled()) { + LOG.info("Remote at {} is no longer present", uri); + } + + SseEventSource sseEventSource = streams.remove(uri); + if(sseEventSource != null) { + sseEventSource.close(); + } } private void onEndpointEvent(String source, InboundSseEvent event) { - String name = event.getName(); - - if(LOG.isDebugEnabled()) { - LOG.debug("Received a {} notification from {}", name, source); - } - - if(ENDPOINT_UPDATED.equals(name)) { - EndpointDescription ed = parser.readEndpoint(event.readData(InputStream.class)); - endpointsBySource.compute(source, (a,b) -> { - return b == null ? singleton(ed) : concat(b.stream(), Stream.of(ed)).collect(toSet()); - }); - interests.values().forEach(i -> i.endpointChanged(ed)); - } else if (ENDPOINT_REVOKED.equals(name)) { - String id = event.readData(); - endpointsBySource.compute(source, (a,b) -> { - if(b == null) { - return null; - } else { - Set<EndpointDescription> set = b.stream().filter(ed -> !ed.getId().equals(id)).collect(toSet()); - return set.isEmpty() ? null : set; - } - }); - interests.values().forEach(i -> i.endpointRemoved(id)); - } + String name = event.getName(); + + if(LOG.isDebugEnabled()) { + LOG.debug("Received a {} notification from {}", name, source); + } + + if(ENDPOINT_UPDATED.equals(name)) { + EndpointDescription ed = parser.readEndpoint(event.readData(InputStream.class)); + endpointsBySource.compute(source, (a,b) -> { + return b == null ? singleton(ed) : concat(b.stream(), Stream.of(ed)).collect(toSet()); + }); + interests.values().forEach(i -> i.endpointChanged(ed)); + } else if (ENDPOINT_REVOKED.equals(name)) { + String id = event.readData(); + endpointsBySource.compute(source, (a,b) -> { + if(b == null) { + return null; + } else { + Set<EndpointDescription> set = b.stream().filter(ed -> !ed.getId().equals(id)).collect(toSet()); + return set.isEmpty() ? null : set; + } + }); + interests.values().forEach(i -> i.endpointRemoved(id)); + } } private void lostRemoteStream(String source, Throwable t) { - - if(t != null) { - if(LOG.isWarnEnabled()) { - LOG.warn("The remote {} had a failure", source, t); - } - } else { - if(LOG.isInfoEnabled()) { - LOG.info("The remote {} has disconnected", source); - } - } - - Set<EndpointDescription> remove = endpointsBySource.remove(source); - if(remove != null) { - remove.forEach(ed -> interests.values().forEach(i -> i.endpointRemoved(ed.getId()))); - } + + if(t != null) { + if(LOG.isWarnEnabled()) { + LOG.warn("The remote {} had a failure", source, t); + } + } else { + if(LOG.isInfoEnabled()) { + LOG.info("The remote {} has disconnected", source); + } + } + + Set<EndpointDescription> remove = endpointsBySource.remove(source); + if(remove != null) { + remove.forEach(ed -> interests.values().forEach(i -> i.endpointRemoved(ed.getId()))); + } } public void bindEndpointEventListener(EndpointEventListener epListener, Map<String, Object> props) { @@ -162,18 +162,18 @@ public class InterestManager { interests.remove(getServiceId(props)); } - private Long getServiceId(Map<String, Object> props) { - return (Long) props.get("service.id"); - } + private Long getServiceId(Map<String, Object> props) { + return (Long) props.get("service.id"); + } private void addInterest(EndpointEventListener epListener, Map<String, Object> props) { - - Long id = getServiceId(props); - - if(LOG.isInfoEnabled()) { - LOG.info("Service {} has registered an interest in endpoint events", id); - } - + + Long id = getServiceId(props); + + if(LOG.isInfoEnabled()) { + LOG.info("Service {} has registered an interest in endpoint events", id); + } + Interest interest = new Interest(getServiceId(props), epListener, props); interests.put(getServiceId(props), interest); @@ -183,13 +183,13 @@ public class InterestManager { } private void updatedInterest(Map<String, Object> props) { - + Long id = getServiceId(props); - - if(LOG.isInfoEnabled()) { - LOG.info("Service {} has changed its interest in endpoint events", id); - } - + + if(LOG.isInfoEnabled()) { + LOG.info("Service {} has changed its interest in endpoint events", id); + } + interests.get(id).update(props); } } diff --git a/discovery/mdns/src/main/java/org/apache/aries/rsa/discovery/mdns/MdnsDiscovery.java b/discovery/mdns/src/main/java/org/apache/aries/rsa/discovery/mdns/MdnsDiscovery.java index 441fe7b2..c6034635 100644 --- a/discovery/mdns/src/main/java/org/apache/aries/rsa/discovery/mdns/MdnsDiscovery.java +++ b/discovery/mdns/src/main/java/org/apache/aries/rsa/discovery/mdns/MdnsDiscovery.java @@ -59,31 +59,31 @@ import org.slf4j.LoggerFactory; @Component public class MdnsDiscovery { - private static final String _ARIES_DISCOVERY_HTTP_TCP_LOCAL = "_aries-discovery._tcp.local."; + private static final String _ARIES_DISCOVERY_HTTP_TCP_LOCAL = "_aries-discovery._tcp.local."; - private static final Logger LOG = LoggerFactory.getLogger(MdnsDiscovery.class); - - private final Client client; - - private final String fwUuid; - - private final InterestManager interestManager; - - private final PublishingEndpointListener publishingListener; - - private JaxrsServiceRuntime runtime; - - private JmDNS jmdns; + private static final Logger LOG = LoggerFactory.getLogger(MdnsDiscovery.class); + + private final Client client; + + private final String fwUuid; + + private final InterestManager interestManager; + + private final PublishingEndpointListener publishingListener; + + private JaxrsServiceRuntime runtime; + + private JmDNS jmdns; - @Activate - public MdnsDiscovery(BundleContext ctx, @Reference SseEventSourceFactory eventSourceFactory, - @Reference ClientBuilder clientBuilder, @Reference EndpointDescriptionParser parser) { - this.client = clientBuilder.build(); - this.interestManager = new InterestManager(eventSourceFactory, parser, client); - fwUuid = ctx.getProperty(FRAMEWORK_UUID); - this.publishingListener = new PublishingEndpointListener(parser, ctx, fwUuid); - } + @Activate + public MdnsDiscovery(BundleContext ctx, @Reference SseEventSourceFactory eventSourceFactory, + @Reference ClientBuilder clientBuilder, @Reference EndpointDescriptionParser parser) { + this.client = clientBuilder.build(); + this.interestManager = new InterestManager(eventSourceFactory, parser, client); + fwUuid = ctx.getProperty(FRAMEWORK_UUID); + this.publishingListener = new PublishingEndpointListener(parser, ctx, fwUuid); + } @Reference(cardinality = ReferenceCardinality.MULTIPLE, policy = ReferencePolicy.DYNAMIC) public void bindEndpointEventListener(EndpointEventListener epListener, Map<String, Object> props) { @@ -100,174 +100,174 @@ public class MdnsDiscovery { @Reference(policy = ReferencePolicy.DYNAMIC) public void bindJaxrsServiceRuntime(JaxrsServiceRuntime runtime) { - updateAndRegister(runtime); + updateAndRegister(runtime); } - public void updatedJaxrsServiceRuntime(JaxrsServiceRuntime runtime) { - updateAndRegister(runtime); - } + public void updatedJaxrsServiceRuntime(JaxrsServiceRuntime runtime) { + updateAndRegister(runtime); + } - public void unbindJaxrsServiceRuntime(JaxrsServiceRuntime runtime) { - JmDNS jmdns = null; - synchronized (this) { - if(runtime == this.runtime) { - jmdns = this.jmdns; - this.runtime = null; - } - } - - if(jmdns != null) { - jmdns.unregisterAllServices(); - } - } + public void unbindJaxrsServiceRuntime(JaxrsServiceRuntime runtime) { + JmDNS jmdns = null; + synchronized (this) { + if(runtime == this.runtime) { + jmdns = this.jmdns; + this.runtime = null; + } + } + + if(jmdns != null) { + jmdns.unregisterAllServices(); + } + } - private void updateAndRegister(JaxrsServiceRuntime runtime) { - JmDNS jmdns; - synchronized (this) { - this.runtime = runtime; - jmdns = this.jmdns; - } - - if(jmdns != null) { - RuntimeDTO runtimeDTO = runtime.getRuntimeDTO(); - List<String> uris = StringPlus.normalize(runtimeDTO.serviceDTO.properties.get(JAX_RS_SERVICE_ENDPOINT)); - - if(uris == null || uris.isEmpty()) { - LOG.warn("Unable to advertise discovery as there are no endpoint URIs"); - return; - } - - String base = runtimeDTO.defaultApplication.base; - if(base == null) { - base = ""; - } - - base += "/aries/rsa/discovery"; - - URI uri = uris.stream() - .filter(s -> s.matches(".*(?:[0-9]{1,3}\\.){3}[0-9]{1,3}.*")) - .findFirst() - .map(URI::create) - .orElseGet(() -> URI.create(uris.get(0))); - - Map<String, Object> props = new HashMap<>(); - props.put("scheme", uri.getScheme() == null ? "" : uri.getScheme()); - props.put("path", uri.getPath() == null ? base : uri.getPath() + base); - props.put("frameworkUuid", fwUuid); - - ServiceInfo info = ServiceInfo.create(_ARIES_DISCOVERY_HTTP_TCP_LOCAL, fwUuid, uri.getPort(), 0, 0, props); - - try { - jmdns.registerService(info); - } catch (IOException ioe) { - LOG.error("Unable to advertise discovery", ioe); - } - } - } + private void updateAndRegister(JaxrsServiceRuntime runtime) { + JmDNS jmdns; + synchronized (this) { + this.runtime = runtime; + jmdns = this.jmdns; + } + + if(jmdns != null) { + RuntimeDTO runtimeDTO = runtime.getRuntimeDTO(); + List<String> uris = StringPlus.normalize(runtimeDTO.serviceDTO.properties.get(JAX_RS_SERVICE_ENDPOINT)); + + if(uris == null || uris.isEmpty()) { + LOG.warn("Unable to advertise discovery as there are no endpoint URIs"); + return; + } + + String base = runtimeDTO.defaultApplication.base; + if(base == null) { + base = ""; + } + + base += "/aries/rsa/discovery"; + + URI uri = uris.stream() + .filter(s -> s.matches(".*(?:[0-9]{1,3}\\.){3}[0-9]{1,3}.*")) + .findFirst() + .map(URI::create) + .orElseGet(() -> URI.create(uris.get(0))); + + Map<String, Object> props = new HashMap<>(); + props.put("scheme", uri.getScheme() == null ? "" : uri.getScheme()); + props.put("path", uri.getPath() == null ? base : uri.getPath() + base); + props.put("frameworkUuid", fwUuid); + + ServiceInfo info = ServiceInfo.create(_ARIES_DISCOVERY_HTTP_TCP_LOCAL, fwUuid, uri.getPort(), 0, 0, props); + + try { + jmdns.registerService(info); + } catch (IOException ioe) { + LOG.error("Unable to advertise discovery", ioe); + } + } + } public static @interface Config { - public String bind_address(); + public String bind_address(); } @Activate public void start(Config config) throws UnknownHostException, IOException { - String bind = config.bind_address(); - - JmDNS jmdns = JmDNS.create(bind == null ? null : InetAddress.getByName(bind)); - - JaxrsServiceRuntime runtime; - synchronized (this) { - this.jmdns = jmdns; - runtime = this.runtime; - } - - if(runtime != null) { - updateAndRegister(runtime); - } - - // Add a service listener - jmdns.addServiceListener(_ARIES_DISCOVERY_HTTP_TCP_LOCAL, new MdnsListener()); - + String bind = config.bind_address(); + + JmDNS jmdns = JmDNS.create(bind == null ? null : InetAddress.getByName(bind)); + + JaxrsServiceRuntime runtime; + synchronized (this) { + this.jmdns = jmdns; + runtime = this.runtime; + } + + if(runtime != null) { + updateAndRegister(runtime); + } + + // Add a service listener + jmdns.addServiceListener(_ARIES_DISCOVERY_HTTP_TCP_LOCAL, new MdnsListener()); + } @Deactivate public void stop () { - try { - jmdns.close(); - } catch (IOException e) { - LOG.warn("An exception occurred closing the mdns discovery"); - } - - interestManager.deactivate(); - publishingListener.stop(); + try { + jmdns.close(); + } catch (IOException e) { + LOG.warn("An exception occurred closing the mdns discovery"); + } + + interestManager.deactivate(); + publishingListener.stop(); } - private class MdnsListener implements ServiceListener { - - private final ConcurrentMap<String, String> namesToUris = new ConcurrentHashMap<>(); - - @Override - public void serviceAdded(ServiceEvent event) { - } - - @Override - public void serviceRemoved(ServiceEvent event) { - ServiceInfo info = event.getInfo(); - if(info != null) { - String removed = namesToUris.remove(info.getKey()); - if(removed != null) { - interestManager.remoteRemoved(removed); - } - } - } - - @Override - public void serviceResolved(ServiceEvent event) { - ServiceInfo info = event.getInfo(); - - String infoUuid = info.getPropertyString("frameworkUuid"); - - if(infoUuid == null || infoUuid.equals(fwUuid)) { - // Ignore until we can see if this is for our own endpoint - return; - } - - String scheme = info.getPropertyString("scheme"); - if(scheme == null) { - scheme = "http"; - } - - String path = info.getPropertyString("path"); - if(path == null) { - // Not a complete record yet - return; - } - - int port = info.getPort(); - if(port == -1) { - switch(scheme) { - case "http": - port = 80; - break; - case "https": - port = 443; - break; - default: - LOG.error("Unknown URI scheme advertised {} by framework {} on host {}", - scheme, info.getName(), info.getDomain()); - } - } - - String address = info.getInetAddresses()[0].getHostAddress(); - - String uri = String.format("%s://%s:%d/%s", scheme, address, port, path); - - LOG.info("Discovered remote at {}", uri); - - namesToUris.put(info.getKey(), uri); - - interestManager.remoteAdded(uri); - } - } + private class MdnsListener implements ServiceListener { + + private final ConcurrentMap<String, String> namesToUris = new ConcurrentHashMap<>(); + + @Override + public void serviceAdded(ServiceEvent event) { + } + + @Override + public void serviceRemoved(ServiceEvent event) { + ServiceInfo info = event.getInfo(); + if(info != null) { + String removed = namesToUris.remove(info.getKey()); + if(removed != null) { + interestManager.remoteRemoved(removed); + } + } + } + + @Override + public void serviceResolved(ServiceEvent event) { + ServiceInfo info = event.getInfo(); + + String infoUuid = info.getPropertyString("frameworkUuid"); + + if(infoUuid == null || infoUuid.equals(fwUuid)) { + // Ignore until we can see if this is for our own endpoint + return; + } + + String scheme = info.getPropertyString("scheme"); + if(scheme == null) { + scheme = "http"; + } + + String path = info.getPropertyString("path"); + if(path == null) { + // Not a complete record yet + return; + } + + int port = info.getPort(); + if(port == -1) { + switch(scheme) { + case "http": + port = 80; + break; + case "https": + port = 443; + break; + default: + LOG.error("Unknown URI scheme advertised {} by framework {} on host {}", + scheme, info.getName(), info.getDomain()); + } + } + + String address = info.getInetAddresses()[0].getHostAddress(); + + String uri = String.format("%s://%s:%d/%s", scheme, address, port, path); + + LOG.info("Discovered remote at {}", uri); + + namesToUris.put(info.getKey(), uri); + + interestManager.remoteAdded(uri); + } + } } diff --git a/discovery/mdns/src/main/java/org/apache/aries/rsa/discovery/mdns/PublishingEndpointListener.java b/discovery/mdns/src/main/java/org/apache/aries/rsa/discovery/mdns/PublishingEndpointListener.java index 46872fc0..d78a5824 100644 --- a/discovery/mdns/src/main/java/org/apache/aries/rsa/discovery/mdns/PublishingEndpointListener.java +++ b/discovery/mdns/src/main/java/org/apache/aries/rsa/discovery/mdns/PublishingEndpointListener.java @@ -76,14 +76,14 @@ public class PublishingEndpointListener { private final Set<Subscription> listeners = ConcurrentHashMap.newKeySet(); @SuppressWarnings("serial") - public PublishingEndpointListener(EndpointDescriptionParser parser, BundleContext bctx, String uuid) { + public PublishingEndpointListener(EndpointDescriptionParser parser, BundleContext bctx, String uuid) { this.parser = parser; - this.uuid = uuid; + this.uuid = uuid; String[] ifAr = { EndpointEventListener.class.getName() }; Dictionary<String, Object> props = serviceProperties(uuid); listenerReg = bctx.registerService(ifAr, new ListenerFactory(), props); resourceReg = bctx.registerService(PublishingEndpointListener.class, this, - new Hashtable<String, Object>() {{put("osgi.jaxrs.resource", Boolean.TRUE);}}); + new Hashtable<String, Object>() {{put("osgi.jaxrs.resource", Boolean.TRUE);}}); } @Deactivate @@ -94,41 +94,41 @@ public class PublishingEndpointListener { } private void endpointUpdate(Long bundleId, EndpointDescription ed, int type) { - String edFwUuid = ed.getFrameworkUUID(); - if(edFwUuid == null || !edFwUuid.equals(uuid)) { - LOG.warn("This listener has been called with an endpoint {} for a remote framework {}", ed.getId(), edFwUuid); - return; - } - String id = ed.getId(); - switch(type) { - case EndpointEvent.ADDED: - case EndpointEvent.MODIFIED: - localEndpoints.compute(id, (k,v) -> { - return v == null ? new SponsoredEndpoint(ed, singleton(bundleId)) : - new SponsoredEndpoint(ed, concat(v.sponsors.stream(), Stream.of(bundleId)).collect(toSet())); - }); - String data = toEndpointData(ed); - listeners.forEach(s -> s.update(data)); - break; - case EndpointEvent.MODIFIED_ENDMATCH: - case EndpointEvent.REMOVED: - boolean act = localEndpoints.compute(id, (k,v) -> { - if(v == null) { - return null; - } else { - Set<Long> updated = v.sponsors.stream().filter(l -> !bundleId.equals(l)).collect(toSet()); - return updated.isEmpty() ? null : new SponsoredEndpoint(v.ed, updated); - } - }) == null; - - if(act) { - listeners.forEach(s -> s.revoke(id)); - } - break; - default: - LOG.error("Unknown event type {} for endpoint {}", type, ed); - } - } + String edFwUuid = ed.getFrameworkUUID(); + if(edFwUuid == null || !edFwUuid.equals(uuid)) { + LOG.warn("This listener has been called with an endpoint {} for a remote framework {}", ed.getId(), edFwUuid); + return; + } + String id = ed.getId(); + switch(type) { + case EndpointEvent.ADDED: + case EndpointEvent.MODIFIED: + localEndpoints.compute(id, (k,v) -> { + return v == null ? new SponsoredEndpoint(ed, singleton(bundleId)) : + new SponsoredEndpoint(ed, concat(v.sponsors.stream(), Stream.of(bundleId)).collect(toSet())); + }); + String data = toEndpointData(ed); + listeners.forEach(s -> s.update(data)); + break; + case EndpointEvent.MODIFIED_ENDMATCH: + case EndpointEvent.REMOVED: + boolean act = localEndpoints.compute(id, (k,v) -> { + if(v == null) { + return null; + } else { + Set<Long> updated = v.sponsors.stream().filter(l -> !bundleId.equals(l)).collect(toSet()); + return updated.isEmpty() ? null : new SponsoredEndpoint(v.ed, updated); + } + }) == null; + + if(act) { + listeners.forEach(s -> s.revoke(id)); + } + break; + default: + LOG.error("Unknown event type {} for endpoint {}", type, ed); + } + } private Dictionary<String, Object> serviceProperties(String uuid) { String scope = String.format("(&(%s=*)(%s=%s))", Constants.OBJECTCLASS, @@ -139,60 +139,60 @@ public class PublishingEndpointListener { } private String toEndpointData(EndpointDescription ed) { - try { - ByteArrayOutputStream baos = new ByteArrayOutputStream(); - parser.writeEndpoint(ed, baos); - return new String(baos.toByteArray(), StandardCharsets.UTF_8).replace("\n", "").replace("\r", ""); - } catch (Exception e) { - LOG.error("Unable to serialize the endpoint {}", ed, e); - throw new RuntimeException(e); - } + try { + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + parser.writeEndpoint(ed, baos); + return new String(baos.toByteArray(), StandardCharsets.UTF_8).replace("\n", "").replace("\r", ""); + } catch (Exception e) { + LOG.error("Unable to serialize the endpoint {}", ed, e); + throw new RuntimeException(e); + } } @GET - @Produces(SERVER_SENT_EVENTS) - @Path("aries/rsa/discovery") - public void listen(@Context Sse sse, @Context SseEventSink sink) { - Subscription subscription = new Subscription(sse, sink); - listeners.add(subscription); - - localEndpoints.values().stream() - .map(s -> toEndpointData(s.ed)) - .forEach(subscription::update); - } - - private class ListenerFactory implements ServiceFactory<PerClientEndpointEventListener> { - - @Override - public PerClientEndpointEventListener getService(Bundle bundle, - ServiceRegistration<PerClientEndpointEventListener> registration) { - return new PerClientEndpointEventListener(bundle.getBundleId()); - } - - @Override - public void ungetService(Bundle bundle, ServiceRegistration<PerClientEndpointEventListener> registration, - PerClientEndpointEventListener service) { - Long bundleId = service.bundleId; - localEndpoints.values().stream() - .filter(s -> s.sponsors.contains(bundleId)) - .forEach(s -> endpointUpdate(bundleId, s.ed, EndpointEvent.REMOVED)); - } - + @Produces(SERVER_SENT_EVENTS) + @Path("aries/rsa/discovery") + public void listen(@Context Sse sse, @Context SseEventSink sink) { + Subscription subscription = new Subscription(sse, sink); + listeners.add(subscription); + + localEndpoints.values().stream() + .map(s -> toEndpointData(s.ed)) + .forEach(subscription::update); + } + + private class ListenerFactory implements ServiceFactory<PerClientEndpointEventListener> { + + @Override + public PerClientEndpointEventListener getService(Bundle bundle, + ServiceRegistration<PerClientEndpointEventListener> registration) { + return new PerClientEndpointEventListener(bundle.getBundleId()); + } + + @Override + public void ungetService(Bundle bundle, ServiceRegistration<PerClientEndpointEventListener> registration, + PerClientEndpointEventListener service) { + Long bundleId = service.bundleId; + localEndpoints.values().stream() + .filter(s -> s.sponsors.contains(bundleId)) + .forEach(s -> endpointUpdate(bundleId, s.ed, EndpointEvent.REMOVED)); + } + } private class PerClientEndpointEventListener implements EndpointEventListener { - - private final Long bundleId; - - public PerClientEndpointEventListener(Long bundleId) { - super(); - this.bundleId = bundleId; - } - - @Override - public void endpointChanged(EndpointEvent event, String filter) { - endpointUpdate(bundleId, event.getEndpoint(), event.getType()); - } + + private final Long bundleId; + + public PerClientEndpointEventListener(Long bundleId) { + super(); + this.bundleId = bundleId; + } + + @Override + public void endpointChanged(EndpointEvent event, String filter) { + endpointUpdate(bundleId, event.getEndpoint(), event.getType()); + } } class Subscription { @@ -204,11 +204,11 @@ public class PublishingEndpointListener { SseEventSink eventSink; public Subscription(Sse sse, SseEventSink eventSink) { - this.sse = sse; - this.eventSink = eventSink; - } + this.sse = sse; + this.eventSink = eventSink; + } - public void update(String endpointData) { + public void update(String endpointData) { eventSink.send(sse.newEvent(ENDPOINT_UPDATED, endpointData)) .whenComplete(this::sendFailure); } @@ -219,27 +219,27 @@ public class PublishingEndpointListener { } public void close() { - eventSink.close(); - listeners.remove(this); + eventSink.close(); + listeners.remove(this); } private void sendFailure(Object o, Throwable t) { if(t != null) { - LOG.error("Failed to send endpoint message, closing"); - listeners.remove(this); - eventSink.close(); + LOG.error("Failed to send endpoint message, closing"); + listeners.remove(this); + eventSink.close(); } } } private static class SponsoredEndpoint { - private final EndpointDescription ed; - private final Set<Long> sponsors; - - public SponsoredEndpoint(EndpointDescription ed, Set<Long> sponsors) { - super(); - this.ed = ed; - this.sponsors = sponsors; - } + private final EndpointDescription ed; + private final Set<Long> sponsors; + + public SponsoredEndpoint(EndpointDescription ed, Set<Long> sponsors) { + super(); + this.ed = ed; + this.sponsors = sponsors; + } } } diff --git a/itests/tck/tck.bndrun b/itests/tck/tck.bndrun index 31b1e752..a41b4ca7 100644 --- a/itests/tck/tck.bndrun +++ b/itests/tck/tck.bndrun @@ -40,22 +40,22 @@ -runfw: org.eclipse.osgi;version='[3.8.0.v20120529-1548,3.8.0.v20120529-1548]' -runee: JavaSE-1.8 -runrequires: \ - osgi.identity;filter:='(osgi.identity=org.osgi.test.cases.remoteserviceadmin)',\ - osgi.identity;filter:='(osgi.identity=org.apache.aries.rsa.topology-manager)',\ - osgi.identity;filter:='(osgi.identity=org.ops4j.pax.logging.pax-logging-service)' - + osgi.identity;filter:='(osgi.identity=org.osgi.test.cases.remoteserviceadmin)',\ + osgi.identity;filter:='(osgi.identity=org.apache.aries.rsa.topology-manager)',\ + osgi.identity;filter:='(osgi.identity=org.ops4j.pax.logging.pax-logging-service)' + -runblacklist: \ - osgi.identity;filter:='(osgi.identity=osgi.cmpn)',\ - osgi.identity;filter:='(osgi.identity=slf4j.api)' + osgi.identity;filter:='(osgi.identity=osgi.cmpn)',\ + osgi.identity;filter:='(osgi.identity=slf4j.api)' -runbundles: \ - org.apache.aries.rsa.core;version='[1.10.0,1.10.1)',\ - org.apache.aries.rsa.discovery.config;version='[1.10.0,1.10.1)',\ - org.apache.aries.rsa.provider.tcp;version='[1.10.0,1.10.1)',\ - org.apache.aries.rsa.spi;version='[1.10.0,1.10.1)',\ - org.apache.aries.rsa.topology-manager;version='[1.10.0,1.10.1)',\ - org.apache.felix.configadmin;version='[1.9.26,1.9.27)',\ - org.apache.felix.eventadmin;version='[1.6.4,1.6.5)',\ - org.apache.servicemix.bundles.junit;version='[3.8.2,3.8.3)',\ - org.osgi.test.cases.remoteserviceadmin;version='[5.0.0,5.0.1)',\ - org.ops4j.pax.logging.pax-logging-api;version='[1.11.2,1.11.3)',\ - org.ops4j.pax.logging.pax-logging-service;version='[1.11.2,1.11.3)' + org.apache.aries.rsa.core;version='[1.10.0,1.10.1)',\ + org.apache.aries.rsa.discovery.config;version='[1.10.0,1.10.1)',\ + org.apache.aries.rsa.provider.tcp;version='[1.10.0,1.10.1)',\ + org.apache.aries.rsa.spi;version='[1.10.0,1.10.1)',\ + org.apache.aries.rsa.topology-manager;version='[1.10.0,1.10.1)',\ + org.apache.felix.configadmin;version='[1.9.26,1.9.27)',\ + org.apache.felix.eventadmin;version='[1.6.4,1.6.5)',\ + org.apache.servicemix.bundles.junit;version='[3.8.2,3.8.3)',\ + org.osgi.test.cases.remoteserviceadmin;version='[5.0.0,5.0.1)',\ + org.ops4j.pax.logging.pax-logging-api;version='[1.11.2,1.11.3)',\ + org.ops4j.pax.logging.pax-logging-service;version='[1.11.2,1.11.3)' diff --git a/provider/tcp/bnd.bnd b/provider/tcp/bnd.bnd index d07bc353..5fb3a1c9 100644 --- a/provider/tcp/bnd.bnd +++ b/provider/tcp/bnd.bnd @@ -16,5 +16,5 @@ # specific language governing permissions and limitations # under the License. Private-Package: \ - org.apache.aries.rsa.util,\ - org.osgi.util.converter + org.apache.aries.rsa.util,\ + org.osgi.util.converter
