This is an automated email from the ASF dual-hosted git repository. timothyjward pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/aries-typedevent.git
commit 07b5f41a8e5d8459801d7703a71f887a659ac55c Author: Tim Ward <[email protected]> AuthorDate: Thu Oct 1 13:01:13 2020 +0100 Initial support for remote events using OSGi Remote Services --- .../typedevent/bus/impl/TypedEventBusImpl.java | 10 +- .../typedevent/bus/impl/TypedEventBusImplTest.java | 101 +++--- .../typedevent/bus/osgi/FilterIntegrationTest.java | 8 +- .../osgi/UnhandledEventHandlerIntegrationTest.java | 3 +- .../org.apache.aries.typedevent.remote.api/pom.xml | 41 +++ .../aries/typedevent/remote/api/FilterDTO.java | 33 ++ .../remote/api/RemoteEventConstants.java | 49 +++ .../typedevent/remote/api/RemoteEventMonitor.java | 107 +++++++ .../aries/typedevent/remote/api/RemoteEvents.java | 27 ++ .../typedevent/remote/api/RemoteMonitorEvent.java | 32 ++ .../aries/typedevent/remote/api/package-info.java | 19 ++ .../pom.xml | 111 +++++++ .../impl/LocalEventBusForwarder.java | 191 +++++++++++ .../remoteservices/impl/RemoteEventBusImpl.java | 213 +++++++++++++ .../impl/RemoteServiceEventsActivator.java | 332 ++++++++++++++++++++ .../remote/remoteservices/spi/RemoteEventBus.java | 42 +++ .../remote/remoteservices/spi/package-info.java | 19 ++ .../remote/remoteservices/common/TestEvent.java | 21 ++ .../impl/RemoteEventBusImplTest.java | 151 +++++++++ .../osgi/AbstractIntegrationTest.java | 70 +++++ .../osgi/RemoteEventBusIntegrationTest.java | 348 +++++++++++++++++++++ .../test.bndrun | 56 ++++ .../org.apache.aries.typedevent.remote.spi/pom.xml | 50 +++ .../remote/spi/LocalEventConsumerManager.java | 183 +++++++++++ .../remote/spi/RemoteEventMonitorImpl.java | 166 ++++++++++ .../aries/typedevent/remote/spi/package-info.java | 19 ++ org.apache.aries.typedevent.remote/pom.xml | 21 ++ pom.xml | 13 + 28 files changed, 2382 insertions(+), 54 deletions(-) diff --git a/org.apache.aries.typedevent.bus/src/main/java/org/apache/aries/typedevent/bus/impl/TypedEventBusImpl.java b/org.apache.aries.typedevent.bus/src/main/java/org/apache/aries/typedevent/bus/impl/TypedEventBusImpl.java index 5be83f9..21a0828 100644 --- a/org.apache.aries.typedevent.bus/src/main/java/org/apache/aries/typedevent/bus/impl/TypedEventBusImpl.java +++ b/org.apache.aries.typedevent.bus/src/main/java/org/apache/aries/typedevent/bus/impl/TypedEventBusImpl.java @@ -19,6 +19,11 @@ package org.apache.aries.typedevent.bus.impl; import static java.util.Collections.emptyMap; import static java.util.stream.Collectors.toList; +import static org.osgi.namespace.implementation.ImplementationNamespace.IMPLEMENTATION_NAMESPACE; +import static org.osgi.namespace.service.ServiceNamespace.SERVICE_NAMESPACE; +import static org.osgi.service.typedevent.TypedEventConstants.TYPED_EVENT_FILTER; +import static org.osgi.service.typedevent.TypedEventConstants.TYPED_EVENT_IMPLEMENTATION; +import static org.osgi.service.typedevent.TypedEventConstants.TYPED_EVENT_SPECIFICATION_VERSION; import static org.osgi.util.converter.Converters.standardConverter; import java.lang.reflect.ParameterizedType; @@ -34,6 +39,7 @@ import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.atomic.AtomicBoolean; import java.util.stream.Stream; +import org.osgi.annotation.bundle.Capability; import org.osgi.framework.Constants; import org.osgi.framework.Filter; import org.osgi.framework.FrameworkUtil; @@ -45,6 +51,8 @@ import org.osgi.service.typedevent.UnhandledEventHandler; import org.osgi.service.typedevent.UntypedEventHandler; import org.osgi.util.converter.TypeReference; +@Capability(namespace=SERVICE_NAMESPACE, attribute="objectClass:List<String>=org.osgi.service.typedevent.TypedEventBus", uses=TypedEventBus.class) +@Capability(namespace=IMPLEMENTATION_NAMESPACE, name=TYPED_EVENT_IMPLEMENTATION, version=TYPED_EVENT_SPECIFICATION_VERSION) public class TypedEventBusImpl implements TypedEventBus { private static final TypeReference<List<String>> LIST_OF_STRINGS = new TypeReference<List<String>>() { @@ -248,7 +256,7 @@ public class TypedEventBusImpl implements TypedEventBus { } private Filter getFilter(Long serviceId, Map<String, Object> properties) throws IllegalArgumentException { - String key = "event.filter"; + String key = TYPED_EVENT_FILTER; return getFilter(serviceId, key, properties.get(key)); } diff --git a/org.apache.aries.typedevent.bus/src/test/java/org/apache/aries/typedevent/bus/impl/TypedEventBusImplTest.java b/org.apache.aries.typedevent.bus/src/test/java/org/apache/aries/typedevent/bus/impl/TypedEventBusImplTest.java index 6667969..39bfbc9 100644 --- a/org.apache.aries.typedevent.bus/src/test/java/org/apache/aries/typedevent/bus/impl/TypedEventBusImplTest.java +++ b/org.apache.aries.typedevent.bus/src/test/java/org/apache/aries/typedevent/bus/impl/TypedEventBusImplTest.java @@ -21,6 +21,11 @@ import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertTrue; import static org.mockito.ArgumentMatchers.argThat; import static org.mockito.ArgumentMatchers.eq; +import static org.osgi.framework.Constants.SERVICE_ID; +import static org.osgi.service.typedevent.TypedEventConstants.TYPED_EVENT_FILTER; +import static org.osgi.service.typedevent.TypedEventConstants.TYPED_EVENT_TOPICS; +import static org.osgi.service.typedevent.TypedEventConstants.TYPED_EVENT_TYPE; +import static org.osgi.util.converter.Converters.standardConverter; import java.util.HashMap; import java.util.Map; @@ -35,11 +40,9 @@ import org.mockito.Mock; import org.mockito.Mockito; import org.mockito.MockitoAnnotations; import org.osgi.framework.Constants; -import org.osgi.service.typedevent.TypedEventConstants; import org.osgi.service.typedevent.TypedEventHandler; import org.osgi.service.typedevent.UnhandledEventHandler; import org.osgi.service.typedevent.UntypedEventHandler; -import org.osgi.util.converter.Converters; public class TypedEventBusImplTest { @@ -132,31 +135,31 @@ public class TypedEventBusImplTest { Map<String, Object> serviceProperties = new HashMap<>(); - serviceProperties.put(TypedEventConstants.TYPED_EVENT_TOPICS, TEST_EVENT_TOPIC); - serviceProperties.put(TypedEventConstants.TYPED_EVENT_TYPE, TestEvent.class.getName()); - serviceProperties.put(Constants.SERVICE_ID, 42L); + serviceProperties.put(TYPED_EVENT_TOPICS, TEST_EVENT_TOPIC); + serviceProperties.put(TYPED_EVENT_TYPE, TestEvent.class.getName()); + serviceProperties.put(SERVICE_ID, 42L); impl.addTypedEventHandler(handlerA, serviceProperties); serviceProperties = new HashMap<>(); - serviceProperties.put(TypedEventConstants.TYPED_EVENT_TOPICS, TestEvent2.class.getName().replace(".", "/")); - serviceProperties.put(TypedEventConstants.TYPED_EVENT_TYPE, TestEvent2.class.getName()); - serviceProperties.put(Constants.SERVICE_ID, 43L); + serviceProperties.put(TYPED_EVENT_TOPICS, TestEvent2.class.getName().replace(".", "/")); + serviceProperties.put(TYPED_EVENT_TYPE, TestEvent2.class.getName()); + serviceProperties.put(SERVICE_ID, 43L); impl.addTypedEventHandler(handlerB, serviceProperties); serviceProperties = new HashMap<>(); - serviceProperties.put(TypedEventConstants.TYPED_EVENT_TOPICS, TEST_EVENT_TOPIC); - serviceProperties.put(Constants.SERVICE_ID, 44L); + serviceProperties.put(TYPED_EVENT_TOPICS, TEST_EVENT_TOPIC); + serviceProperties.put(SERVICE_ID, 44L); impl.addUntypedEventHandler(untypedHandlerA, serviceProperties); serviceProperties = new HashMap<>(); - serviceProperties.put(TypedEventConstants.TYPED_EVENT_TOPICS, TestEvent2.class.getName().replace(".", "/")); - serviceProperties.put(Constants.SERVICE_ID, 45L); + serviceProperties.put(TYPED_EVENT_TOPICS, TestEvent2.class.getName().replace(".", "/")); + serviceProperties.put(SERVICE_ID, 45L); impl.addUntypedEventHandler(untypedHandlerB, serviceProperties); @@ -206,20 +209,20 @@ public class TypedEventBusImplTest { event.message = "boo"; Map<String, Object> serviceProperties = new HashMap<>(); - serviceProperties.put(Constants.SERVICE_ID, 42L); + serviceProperties.put(SERVICE_ID, 42L); impl.addTypedEventHandler(handler, serviceProperties); serviceProperties = new HashMap<>(); - serviceProperties.put(TypedEventConstants.TYPED_EVENT_TYPE, SpecialTestEvent.class.getName()); - serviceProperties.put(Constants.SERVICE_ID, 43L); + serviceProperties.put(TYPED_EVENT_TYPE, SpecialTestEvent.class.getName()); + serviceProperties.put(SERVICE_ID, 43L); impl.addTypedEventHandler(handler2, serviceProperties); serviceProperties = new HashMap<>(); - serviceProperties.put(Constants.SERVICE_ID, 44L); + serviceProperties.put(SERVICE_ID, 44L); impl.addTypedEventHandler(handler3, serviceProperties); @@ -258,35 +261,35 @@ public class TypedEventBusImplTest { Map<String, Object> serviceProperties = new HashMap<>(); - serviceProperties.put(TypedEventConstants.TYPED_EVENT_TOPICS, TestEvent.class.getName()); - serviceProperties.put(TypedEventConstants.TYPED_EVENT_TYPE, TestEvent.class.getName()); - serviceProperties.put(Constants.SERVICE_ID, 42L); + serviceProperties.put(TYPED_EVENT_TOPICS, TestEvent.class.getName()); + serviceProperties.put(TYPED_EVENT_TYPE, TestEvent.class.getName()); + serviceProperties.put(SERVICE_ID, 42L); impl.addTypedEventHandler(handlerA, serviceProperties); serviceProperties = new HashMap<>(); - serviceProperties.put(TypedEventConstants.TYPED_EVENT_TOPICS, TestEvent2.class.getName()); - serviceProperties.put(TypedEventConstants.TYPED_EVENT_TYPE, TestEvent2.class.getName()); - serviceProperties.put(Constants.SERVICE_ID, 43L); + serviceProperties.put(TYPED_EVENT_TOPICS, TestEvent2.class.getName()); + serviceProperties.put(TYPED_EVENT_TYPE, TestEvent2.class.getName()); + serviceProperties.put(SERVICE_ID, 43L); impl.addTypedEventHandler(handlerB, serviceProperties); serviceProperties = new HashMap<>(); - serviceProperties.put(TypedEventConstants.TYPED_EVENT_TOPICS, TestEvent.class.getName()); - serviceProperties.put(Constants.SERVICE_ID, 44L); + serviceProperties.put(TYPED_EVENT_TOPICS, TestEvent.class.getName()); + serviceProperties.put(SERVICE_ID, 44L); impl.addUntypedEventHandler(untypedHandlerA, serviceProperties); serviceProperties = new HashMap<>(); - serviceProperties.put(TypedEventConstants.TYPED_EVENT_TOPICS, TestEvent2.class.getName()); - serviceProperties.put(Constants.SERVICE_ID, 45L); + serviceProperties.put(TYPED_EVENT_TOPICS, TestEvent2.class.getName()); + serviceProperties.put(SERVICE_ID, 45L); impl.addUntypedEventHandler(untypedHandlerB, serviceProperties); - impl.deliver(event.getClass().getName(), Converters.standardConverter().convert(event).to(Map.class)); + impl.deliver(event.getClass().getName(), standardConverter().convert(event).to(Map.class)); assertTrue(semA.tryAcquire(1, TimeUnit.SECONDS)); @@ -314,35 +317,35 @@ public class TypedEventBusImplTest { Map<String, Object> serviceProperties = new HashMap<>(); - serviceProperties.put(TypedEventConstants.TYPED_EVENT_TOPICS, TEST_EVENT_TOPIC); - serviceProperties.put(TypedEventConstants.TYPED_EVENT_TYPE, TestEvent.class.getName()); - serviceProperties.put("event.filter", "(message=foo)"); - serviceProperties.put(Constants.SERVICE_ID, 42L); + serviceProperties.put(TYPED_EVENT_TOPICS, TEST_EVENT_TOPIC); + serviceProperties.put(TYPED_EVENT_TYPE, TestEvent.class.getName()); + serviceProperties.put(TYPED_EVENT_FILTER, "(message=foo)"); + serviceProperties.put(SERVICE_ID, 42L); impl.addTypedEventHandler(handlerA, serviceProperties); serviceProperties = new HashMap<>(); - serviceProperties.put(TypedEventConstants.TYPED_EVENT_TOPICS, TEST_EVENT_TOPIC); - serviceProperties.put(TypedEventConstants.TYPED_EVENT_TYPE, TestEvent.class.getName()); - serviceProperties.put("event.filter", "(message=bar)"); - serviceProperties.put(Constants.SERVICE_ID, 43L); + serviceProperties.put(TYPED_EVENT_TOPICS, TEST_EVENT_TOPIC); + serviceProperties.put(TYPED_EVENT_TYPE, TestEvent.class.getName()); + serviceProperties.put(TYPED_EVENT_FILTER, "(message=bar)"); + serviceProperties.put(SERVICE_ID, 43L); impl.addTypedEventHandler(handlerB, serviceProperties); serviceProperties = new HashMap<>(); - serviceProperties.put(TypedEventConstants.TYPED_EVENT_TOPICS, TEST_EVENT_TOPIC); - serviceProperties.put("event.filter", "(message=foo)"); - serviceProperties.put(Constants.SERVICE_ID, 44L); + serviceProperties.put(TYPED_EVENT_TOPICS, TEST_EVENT_TOPIC); + serviceProperties.put(TYPED_EVENT_FILTER, "(message=foo)"); + serviceProperties.put(SERVICE_ID, 44L); impl.addUntypedEventHandler(untypedHandlerA, serviceProperties); serviceProperties = new HashMap<>(); - serviceProperties.put(TypedEventConstants.TYPED_EVENT_TOPICS, TEST_EVENT_TOPIC); - serviceProperties.put("event.filter", "(message=bar)"); - serviceProperties.put(Constants.SERVICE_ID, 45L); + serviceProperties.put(TYPED_EVENT_TOPICS, TEST_EVENT_TOPIC); + serviceProperties.put(TYPED_EVENT_FILTER, "(message=bar)"); + serviceProperties.put(SERVICE_ID, 45L); impl.addUntypedEventHandler(untypedHandlerB, serviceProperties); @@ -395,10 +398,10 @@ public class TypedEventBusImplTest { Map<String, Object> serviceProperties = new HashMap<>(); - serviceProperties.put(TypedEventConstants.TYPED_EVENT_TOPICS, TEST_EVENT_TOPIC); - serviceProperties.put(TypedEventConstants.TYPED_EVENT_TYPE, TestEvent.class.getName()); - serviceProperties.put("event.filter", ""); - serviceProperties.put(Constants.SERVICE_ID, 42L); + serviceProperties.put(TYPED_EVENT_TOPICS, TEST_EVENT_TOPIC); + serviceProperties.put(TYPED_EVENT_TYPE, TestEvent.class.getName()); + serviceProperties.put(TYPED_EVENT_FILTER, ""); + serviceProperties.put(SERVICE_ID, 42L); impl.addTypedEventHandler(handlerA, serviceProperties); @@ -420,10 +423,10 @@ public class TypedEventBusImplTest { Map<String, Object> serviceProperties = new HashMap<>(); - serviceProperties.put(TypedEventConstants.TYPED_EVENT_TOPICS, TEST_EVENT_TOPIC); - serviceProperties.put(TypedEventConstants.TYPED_EVENT_TYPE, TestEvent.class.getName()); - serviceProperties.put("event.filter", "(message=foo)"); - serviceProperties.put(Constants.SERVICE_ID, 42L); + serviceProperties.put(TYPED_EVENT_TOPICS, TEST_EVENT_TOPIC); + serviceProperties.put(TYPED_EVENT_TYPE, TestEvent.class.getName()); + serviceProperties.put(TYPED_EVENT_FILTER, "(message=foo)"); + serviceProperties.put(SERVICE_ID, 42L); impl.addTypedEventHandler(handlerA, serviceProperties); diff --git a/org.apache.aries.typedevent.bus/src/test/java/org/apache/aries/typedevent/bus/osgi/FilterIntegrationTest.java b/org.apache.aries.typedevent.bus/src/test/java/org/apache/aries/typedevent/bus/osgi/FilterIntegrationTest.java index d4dcc78..2d17847 100644 --- a/org.apache.aries.typedevent.bus/src/test/java/org/apache/aries/typedevent/bus/osgi/FilterIntegrationTest.java +++ b/org.apache.aries.typedevent.bus/src/test/java/org/apache/aries/typedevent/bus/osgi/FilterIntegrationTest.java @@ -16,6 +16,8 @@ */ package org.apache.aries.typedevent.bus.osgi; +import static org.osgi.service.typedevent.TypedEventConstants.TYPED_EVENT_FILTER; + import java.util.Dictionary; import java.util.Hashtable; @@ -71,12 +73,12 @@ public class FilterIntegrationTest extends AbstractIntegrationTest { @Test public void testFilteredListener() throws Exception { Dictionary<String, Object> props = new Hashtable<>(); - props.put("event.filter", "(message=foo)"); + props.put(TYPED_EVENT_FILTER, "(message=foo)"); regs.add(context.registerService(TypedEventHandler.class, typedEventHandler, props)); props = new Hashtable<>(); - props.put("event.filter", "(message=bar)"); + props.put(TYPED_EVENT_FILTER, "(message=bar)"); regs.add(context.registerService(TypedEventHandler.class, typedEventHandlerB, props)); @@ -107,7 +109,7 @@ public class FilterIntegrationTest extends AbstractIntegrationTest { @Test public void testFilteredListenerEmptyString() throws Exception { Dictionary<String, Object> props = new Hashtable<>(); - props.put("event.filter", ""); + props.put(TYPED_EVENT_FILTER, ""); regs.add(context.registerService(TypedEventHandler.class, typedEventHandler, props)); diff --git a/org.apache.aries.typedevent.bus/src/test/java/org/apache/aries/typedevent/bus/osgi/UnhandledEventHandlerIntegrationTest.java b/org.apache.aries.typedevent.bus/src/test/java/org/apache/aries/typedevent/bus/osgi/UnhandledEventHandlerIntegrationTest.java index c781b62..084a92a 100644 --- a/org.apache.aries.typedevent.bus/src/test/java/org/apache/aries/typedevent/bus/osgi/UnhandledEventHandlerIntegrationTest.java +++ b/org.apache.aries.typedevent.bus/src/test/java/org/apache/aries/typedevent/bus/osgi/UnhandledEventHandlerIntegrationTest.java @@ -24,6 +24,7 @@ import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.after; import static org.mockito.Mockito.timeout; import static org.mockito.Mockito.verify; +import static org.osgi.service.typedevent.TypedEventConstants.TYPED_EVENT_FILTER; import java.util.Dictionary; import java.util.Hashtable; @@ -128,7 +129,7 @@ public class UnhandledEventHandlerIntegrationTest extends AbstractIntegrationTes public void testUnhandledDueToFilter() throws InterruptedException { Dictionary<String, Object> props = new Hashtable<>(); - props.put("event.filter", "(message=foo)"); + props.put(TYPED_EVENT_FILTER, "(message=foo)"); regs.add(context.registerService(TypedEventHandler.class, typedEventHandler, props)); diff --git a/org.apache.aries.typedevent.remote/org.apache.aries.typedevent.remote.api/pom.xml b/org.apache.aries.typedevent.remote/org.apache.aries.typedevent.remote.api/pom.xml new file mode 100644 index 0000000..a54960e --- /dev/null +++ b/org.apache.aries.typedevent.remote/org.apache.aries.typedevent.remote.api/pom.xml @@ -0,0 +1,41 @@ +<project xmlns="http://maven.apache.org/POM/4.0.0" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <modelVersion>4.0.0</modelVersion> + <parent> + <groupId>org.apache.aries.typedevent</groupId> + <artifactId>typedevent-parent</artifactId> + <version>0.0.1-SNAPSHOT</version> + </parent> + + <groupId>org.apache.aries.typedevent.remote</groupId> + <artifactId>org.apache.aries.typedevent.remote.api</artifactId> + + <dependencies> + <dependency> + <groupId>org.osgi</groupId> + <artifactId>org.osgi.core</artifactId> + </dependency> + <dependency> + <groupId>org.osgi</groupId> + <artifactId>org.osgi.service.typedevent</artifactId> + </dependency> + <dependency> + <groupId>org.osgi</groupId> + <artifactId>osgi.annotation</artifactId> + </dependency> + <dependency> + <groupId>org.osgi</groupId> + <artifactId>org.osgi.annotation.bundle</artifactId> + </dependency> + </dependencies> + + <build> + <plugins> + <plugin> + <groupId>biz.aQute.bnd</groupId> + <artifactId>bnd-maven-plugin</artifactId> + </plugin> + </plugins> + </build> +</project> \ No newline at end of file diff --git a/org.apache.aries.typedevent.remote/org.apache.aries.typedevent.remote.api/src/main/java/org/apache/aries/typedevent/remote/api/FilterDTO.java b/org.apache.aries.typedevent.remote/org.apache.aries.typedevent.remote.api/src/main/java/org/apache/aries/typedevent/remote/api/FilterDTO.java new file mode 100644 index 0000000..56902c7 --- /dev/null +++ b/org.apache.aries.typedevent.remote/org.apache.aries.typedevent.remote.api/src/main/java/org/apache/aries/typedevent/remote/api/FilterDTO.java @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.aries.typedevent.remote.api; + +import org.osgi.annotation.versioning.ProviderType; +import org.osgi.dto.DTO; + +/** + * A monitoring event filter. + * + * If both LDAP and regular expressions are supplied, then both must match. + */ +@ProviderType +public class FilterDTO extends DTO { + + public String ldapExpression; + + public String regularExpression; +} diff --git a/org.apache.aries.typedevent.remote/org.apache.aries.typedevent.remote.api/src/main/java/org/apache/aries/typedevent/remote/api/RemoteEventConstants.java b/org.apache.aries.typedevent.remote/org.apache.aries.typedevent.remote.api/src/main/java/org/apache/aries/typedevent/remote/api/RemoteEventConstants.java new file mode 100644 index 0000000..9b0ab57 --- /dev/null +++ b/org.apache.aries.typedevent.remote/org.apache.aries.typedevent.remote.api/src/main/java/org/apache/aries/typedevent/remote/api/RemoteEventConstants.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.aries.typedevent.remote.api; + +import org.osgi.annotation.versioning.ProviderType; + +/** + * This interface should not be used by typical users of the + * Typed Event specification. It is intended to be a bridge + * between different mechanisms for broadcasting remote events + */ + +@ProviderType +public class RemoteEventConstants { + + /** + * This property key will be set to true in any event that originated from a remote system. + * This is to allow different remoting implementations to identify events which should not + * be sent on externally, as they are already external. + */ + public static final String REMOTE_EVENT_MARKER = ".org.apache.aries.typedevent.remote"; + + /** + * This service property can be used by Event Handler whiteboard services to signal that + * they wish to receive remote events by using the value <code>true</code>. Depending + * upon the configuration of the remote event backend it may not be necessary to supply + * this property to receive remote events. + */ + public static final String RECEIVE_REMOTE_EVENTS = "org.apache.aries.typedevent.remote.events"; + + private RemoteEventConstants() { + // Deliberately impossible to construct + } + +} diff --git a/org.apache.aries.typedevent.remote/org.apache.aries.typedevent.remote.api/src/main/java/org/apache/aries/typedevent/remote/api/RemoteEventMonitor.java b/org.apache.aries.typedevent.remote/org.apache.aries.typedevent.remote.api/src/main/java/org/apache/aries/typedevent/remote/api/RemoteEventMonitor.java new file mode 100644 index 0000000..0bbcf9c --- /dev/null +++ b/org.apache.aries.typedevent.remote/org.apache.aries.typedevent.remote.api/src/main/java/org/apache/aries/typedevent/remote/api/RemoteEventMonitor.java @@ -0,0 +1,107 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.aries.typedevent.remote.api; + +import org.osgi.annotation.versioning.ProviderType; +import org.osgi.util.pushstream.PushStream; + +import java.time.Instant; + +/** + * The {@link RemoteEventMonitor} service can be used to monitor the events that are + * sent using the EventBus, and that are received from remote EventBus + * instances + */ +@ProviderType +public interface RemoteEventMonitor { + + /** + * Get a stream of events that match any of the filters, starting now. + * <p> + * Filter expressions may be supplied and applied by the monitoring implementation. + * In some cases this may be more optimal than adding your own filter to the returned + * PushStream. + * + * @param filters containing filter expression definitions. The {@link RemoteMonitorEvent#eventType} + * field is available with the key <code>-eventType</code> and the + * {@link RemoteMonitorEvent#publishType} field is available with the key + * <code>-publishType</code>, in addition to fields defined in the event. + * If the event contains nested data structures then those are accessible using + * nested key names separated by a '.' character (e.g. <code>"foo.bar"</code> + * would correspond to the <code>bar<code> field of the <code>foo</code> value + * from the event. + * <p> + * If a {@link FilterDTO} contains both LDAP and regular expressions, then both must match. + * A RegEx pattern allows the whole event content to be matched, without necessarily specifying + * a key (although keys are present and separated with ':'). + * @return A stream of event data + */ + PushStream<RemoteMonitorEvent> monitorEvents(FilterDTO... filters); + + /** + * Get a stream of events, including up to the + * requested number of historical data events, that match any of the filters. + * + * @param history The requested number of historical + * events, note that fewer than this number of events + * may be returned if history is unavailable, or if + * insufficient events have been sent. + * + * @param filters containing filter expression definitions. The {@link RemoteMonitorEvent#eventType} + * field is available with the key <code>-eventType</code> and the + * {@link RemoteMonitorEvent#publishType} field is available with the key + * <code>-publishType</code>, in addition to fields defined in the event. + * If the event contains nested data structures then those are accessible using + * nested key names separated by a '.' character (e.g. <code>"foo.bar"</code> + * would correspond to the <code>bar<code> field of the <code>foo</code> value + * from the event. + * <p> + * If a {@link FilterDTO} contains both LDAP and regular expressions, then both must match. + * A RegEx pattern allows the whole event content to be matched, without necessarily specifying + * a key (although keys are present and separated with ':'). + * + * @return A stream of event data + */ + PushStream<RemoteMonitorEvent> monitorEvents(int history, FilterDTO...filters); + + /** + * Get a stream of events, including historical + * data events prior to the supplied time + * + * @param history The requested time after which + * historical events, should be included. Note + * that events may have been discarded, or history + * unavailable. + * + * @param filters containing filter expression definitions. The {@link RemoteMonitorEvent#eventType} + * field is available with the key <code>-eventType</code> and the + * {@link RemoteMonitorEvent#publishType} field is available with the key + * <code>-publishType</code>, in addition to fields defined in the event. + * If the event contains nested data structures then those are accessible using + * nested key names separated by a '.' character (e.g. <code>"foo.bar"</code> + * would correspond to the <code>bar<code> field of the <code>foo</code> value + * from the event. + * <p> + * If a {@link FilterDTO} contains both LDAP and regular expressions, then both must match. + * A RegEx pattern allows the whole event content to be matched, without necessarily specifying + * a key (although keys are present and separated with ':'). + * + * @return A stream of event data + */ + PushStream<RemoteMonitorEvent> monitorEvents(Instant history, FilterDTO...filters); + +} diff --git a/org.apache.aries.typedevent.remote/org.apache.aries.typedevent.remote.api/src/main/java/org/apache/aries/typedevent/remote/api/RemoteEvents.java b/org.apache.aries.typedevent.remote/org.apache.aries.typedevent.remote.api/src/main/java/org/apache/aries/typedevent/remote/api/RemoteEvents.java new file mode 100644 index 0000000..a5ad278 --- /dev/null +++ b/org.apache.aries.typedevent.remote/org.apache.aries.typedevent.remote.api/src/main/java/org/apache/aries/typedevent/remote/api/RemoteEvents.java @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.aries.typedevent.remote.api; + +import org.osgi.service.component.annotations.ComponentPropertyType; + +/** + * This annotation can be used on a DS component to mark it as wanting to receive remote events + */ +@ComponentPropertyType +public @interface RemoteEvents { + public static final java.lang.String PREFIX_ = "org.apache.aries.typedevent."; +} diff --git a/org.apache.aries.typedevent.remote/org.apache.aries.typedevent.remote.api/src/main/java/org/apache/aries/typedevent/remote/api/RemoteMonitorEvent.java b/org.apache.aries.typedevent.remote/org.apache.aries.typedevent.remote.api/src/main/java/org/apache/aries/typedevent/remote/api/RemoteMonitorEvent.java new file mode 100644 index 0000000..2391b39 --- /dev/null +++ b/org.apache.aries.typedevent.remote/org.apache.aries.typedevent.remote.api/src/main/java/org/apache/aries/typedevent/remote/api/RemoteMonitorEvent.java @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.aries.typedevent.remote.api; + +import org.osgi.annotation.versioning.ProviderType; + +/** + * A monitoring event. + */ +@ProviderType +public class RemoteMonitorEvent extends org.osgi.service.typedevent.monitor.MonitorEvent { + + public static enum PublishType { + LOCAL, REMOTE; + } + + public PublishType publishType; +} diff --git a/org.apache.aries.typedevent.remote/org.apache.aries.typedevent.remote.api/src/main/java/org/apache/aries/typedevent/remote/api/package-info.java b/org.apache.aries.typedevent.remote/org.apache.aries.typedevent.remote.api/src/main/java/org/apache/aries/typedevent/remote/api/package-info.java new file mode 100644 index 0000000..da953a5 --- /dev/null +++ b/org.apache.aries.typedevent.remote/org.apache.aries.typedevent.remote.api/src/main/java/org/apache/aries/typedevent/remote/api/package-info.java @@ -0,0 +1,19 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ [email protected] [email protected]("0.0.1") +package org.apache.aries.typedevent.remote.api; \ No newline at end of file diff --git a/org.apache.aries.typedevent.remote/org.apache.aries.typedevent.remote.remoteservices/pom.xml b/org.apache.aries.typedevent.remote/org.apache.aries.typedevent.remote.remoteservices/pom.xml new file mode 100644 index 0000000..6647b7a --- /dev/null +++ b/org.apache.aries.typedevent.remote/org.apache.aries.typedevent.remote.remoteservices/pom.xml @@ -0,0 +1,111 @@ +<project xmlns="http://maven.apache.org/POM/4.0.0" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <modelVersion>4.0.0</modelVersion> + <parent> + <groupId>org.apache.aries.typedevent</groupId> + <artifactId>typedevent-parent</artifactId> + <version>0.0.1-SNAPSHOT</version> + </parent> + <groupId>org.apache.aries.typedevent.remote.remoteservices</groupId> + <artifactId>org.apache.aries.typedevent.remote.remoteservices</artifactId> + + <dependencyManagement> + <dependencies> + <dependency> + <groupId>org.apache.aries.typedevent</groupId> + <artifactId>typedevent-test-bom</artifactId> + <version>${project.version}</version> + <type>pom</type> + <scope>import</scope> + </dependency> + </dependencies> + </dependencyManagement> + + <dependencies> + <dependency> + <groupId>org.osgi</groupId> + <artifactId>org.osgi.core</artifactId> + </dependency> + <dependency> + <groupId>org.osgi</groupId> + <artifactId>org.osgi.service.typedevent</artifactId> + </dependency> + <dependency> + <groupId>org.osgi</groupId> + <artifactId>osgi.annotation</artifactId> + </dependency> + <dependency> + <groupId>org.apache.aries.typedevent.remote</groupId> + <artifactId>org.apache.aries.typedevent.remote.spi</artifactId> + <version>0.0.1-SNAPSHOT</version> + </dependency> + <dependency> + <groupId>org.osgi</groupId> + <artifactId>org.osgi.annotation.bundle</artifactId> + </dependency> + <dependency> + <groupId>org.osgi</groupId> + <artifactId>org.osgi.namespace.service</artifactId> + </dependency> + <dependency> + <groupId>org.apache.aries.component-dsl</groupId> + <artifactId>org.apache.aries.component-dsl.component-dsl</artifactId> + </dependency> + <dependency> + <groupId>org.apache.felix</groupId> + <artifactId>org.apache.felix.converter</artifactId> + </dependency> + <dependency> + <groupId>org.slf4j</groupId> + <artifactId>slf4j-api</artifactId> + </dependency> + <dependency> + <groupId>org.apache.aries.typedevent</groupId> + <artifactId>org.apache.aries.typedevent.bus</artifactId> + <version>0.0.1-SNAPSHOT</version> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.junit.jupiter</groupId> + <artifactId>junit-jupiter</artifactId> + </dependency> + <dependency> + <groupId>org.mockito</groupId> + <artifactId>mockito-core</artifactId> + </dependency> + <dependency> + <groupId>org.mockito</groupId> + <artifactId>mockito-junit-jupiter</artifactId> + </dependency> + <dependency> + <groupId>org.osgi</groupId> + <artifactId>org.osgi.test.junit5</artifactId> + </dependency> + <dependency> + <groupId>ch.qos.logback</groupId> + <artifactId>logback-classic</artifactId> + </dependency> + <dependency> + <groupId>ch.qos.logback</groupId> + <artifactId>logback-core</artifactId> + </dependency> + </dependencies> + + <build> + <plugins> + <plugin> + <groupId>biz.aQute.bnd</groupId> + <artifactId>bnd-maven-plugin</artifactId> + </plugin> + <plugin> + <groupId>biz.aQute.bnd</groupId> + <artifactId>bnd-resolver-maven-plugin</artifactId> + </plugin> + <plugin> + <groupId>biz.aQute.bnd</groupId> + <artifactId>bnd-testing-maven-plugin</artifactId> + </plugin> + </plugins> + </build> +</project> \ No newline at end of file diff --git a/org.apache.aries.typedevent.remote/org.apache.aries.typedevent.remote.remoteservices/src/main/java/org/apache/aries/typedevent/remote/remoteservices/impl/LocalEventBusForwarder.java b/org.apache.aries.typedevent.remote/org.apache.aries.typedevent.remote.remoteservices/src/main/java/org/apache/aries/typedevent/remote/remoteservices/impl/LocalEventBusForwarder.java new file mode 100644 index 0000000..9a1807f --- /dev/null +++ b/org.apache.aries.typedevent.remote/org.apache.aries.typedevent.remote.remoteservices/src/main/java/org/apache/aries/typedevent/remote/remoteservices/impl/LocalEventBusForwarder.java @@ -0,0 +1,191 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.aries.typedevent.remote.remoteservices.impl; + +import static java.util.Collections.emptyMap; +import static java.util.stream.Collectors.toList; +import static java.util.stream.Collectors.toMap; +import static org.osgi.util.converter.Converters.standardConverter; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.stream.Collectors; + +import org.apache.aries.typedevent.remote.remoteservices.spi.RemoteEventBus; +import org.apache.aries.typedevent.remote.spi.LocalEventConsumerManager; +import org.osgi.framework.Constants; +import org.osgi.framework.Filter; +import org.osgi.framework.FrameworkUtil; +import org.osgi.framework.InvalidSyntaxException; +import org.osgi.util.converter.TypeReference; + +/** + * This class is responsible for taking events from the local framework and + * sending them on to interested remote frameworks + */ +public class LocalEventBusForwarder extends LocalEventConsumerManager { + + private static final TypeReference<List<String>> LIST_OF_STRINGS = new TypeReference<List<String>>() { + }; + + /** + * Map access and mutation must be synchronized on {@link #lock}. Values from + * the map should be copied as the contents are not thread safe. + */ + private final Map<String, Map<RemoteEventBus, Filter>> eventTypeToRemotes = new HashMap<>(); + + /** + * Map access and mutation must be synchronized on {@link #lock}. + * Values from the map should be copied as the contents are not thread safe. + */ + private final Map<Long, List<String>> remoteTopicInterests = new HashMap<>(); + + /** + * Map access and mutation must be synchronized on {@link #lock}. + * Values from the map should be copied as the contents are not thread safe. + */ + private final Map<Long, RemoteEventBus> remoteBuses = new HashMap<>(); + + private final Object lock = new Object(); + + @Override + public void notifyUntyped(String topic, Map<String, Object> event) { + Map<RemoteEventBus, Filter> possibleTargets; + synchronized (lock) { + possibleTargets = eventTypeToRemotes.getOrDefault(topic, emptyMap()); + } + + possibleTargets.entrySet().stream() + .filter(e -> e.getValue() == null || e.getValue().matches(event)) + .map(Entry::getKey) + .forEach(r -> r.notify(topic, event)); + } + + private Long getServiceId(Map<String, Object> properties) { + return standardConverter().convert(properties.get(Constants.SERVICE_ID)).to(Long.class); + } + + void addRemoteEventBus(RemoteEventBus remote, Map<String, Object> properties) { + doAdd(remote, properties); + updateRemoteInterest(); + } + + private void doAdd(RemoteEventBus remote, Map<String, Object> properties) { + Object consumed = properties.get(RemoteEventBus.REMOTE_EVENT_FILTERS); + + if (consumed == null) { + // TODO log a broken behaviour + return; + } + + Map<String, Filter> topicsToFilters = standardConverter().convert(consumed).to(LIST_OF_STRINGS) + .stream() + .map(s -> s.split("=", 2)) + .collect(toMap(s -> s[0], s -> safeCreateFilter(s[1]))); + + Long serviceId = getServiceId(properties); + + List<String> interestedTopics = topicsToFilters.keySet().stream().collect(toList()); + synchronized (lock) { + remoteBuses.put(serviceId, remote); + remoteTopicInterests.put(serviceId, interestedTopics); + + interestedTopics.forEach(s -> { + Map<RemoteEventBus, Filter> perTopicMap = eventTypeToRemotes + .computeIfAbsent(s, x -> new HashMap<>()); + perTopicMap.put(remote, topicsToFilters.get(s)); + }); + } + } + + private Filter safeCreateFilter(String filterString) { + try { + return FrameworkUtil.createFilter(filterString); + } catch (InvalidSyntaxException e) { + // TODO Auto-generated catch block + e.printStackTrace(); + try { + return FrameworkUtil.createFilter("(&(x=true)(x=false))"); + } catch (InvalidSyntaxException e1) { + // TODO log properly + throw new RuntimeException("Serious problem!"); + } + } + } + + void updatedRemoteEventBus(Map<String, Object> properties) { + Long serviceId = getServiceId(properties); + synchronized (lock) { + RemoteEventBus remote = remoteBuses.get(serviceId); + doRemove(remote, properties); + doAdd(remote, properties); + } + updateRemoteInterest(); + } + + void removeRemoteEventBus(RemoteEventBus remote, Map<String, Object> properties) { + doRemove(remote, properties); + updateRemoteInterest(); + } + + private void doRemove(RemoteEventBus remote, Map<String, Object> properties) { + Long serviceId = getServiceId(properties); + + synchronized (lock) { + remoteBuses.remove(serviceId); + List<String> consumed = remoteTopicInterests.remove(serviceId); + if(consumed != null) { + consumed.forEach(s -> { + Map<RemoteEventBus, ?> perTopic = eventTypeToRemotes.get(s); + if(perTopic != null) { + perTopic.remove(remote); + if(perTopic.isEmpty()) { + eventTypeToRemotes.remove(s); + } + } + }); + } + } + } + + private void updateRemoteInterest() { + + Map<String, String> targets; + synchronized (lock) { + targets = eventTypeToRemotes.entrySet().stream() + .collect(Collectors.toMap(Entry::getKey, + e -> e.getValue().values().stream() + .map(f -> f == null ? "" : f.toString()) + .reduce("", this::mergeFilterStrings))); + + } + + updateTargets(targets); + } + + private String mergeFilterStrings(String a, String b) { + if(a == null || "".equals(a)) { + return b == null ? "" : b; + } else if (b == null || "".equals(b)) { + return a; + } else { + return "(|" + a + b + ")"; + } + } +} diff --git a/org.apache.aries.typedevent.remote/org.apache.aries.typedevent.remote.remoteservices/src/main/java/org/apache/aries/typedevent/remote/remoteservices/impl/RemoteEventBusImpl.java b/org.apache.aries.typedevent.remote/org.apache.aries.typedevent.remote.remoteservices/src/main/java/org/apache/aries/typedevent/remote/remoteservices/impl/RemoteEventBusImpl.java new file mode 100644 index 0000000..67f63e4 --- /dev/null +++ b/org.apache.aries.typedevent.remote/org.apache.aries.typedevent.remote.remoteservices/src/main/java/org/apache/aries/typedevent/remote/remoteservices/impl/RemoteEventBusImpl.java @@ -0,0 +1,213 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.aries.typedevent.remote.remoteservices.impl; + +import static java.util.function.Function.identity; +import static java.util.stream.Collectors.toList; +import static java.util.stream.Collectors.toMap; +import static org.osgi.namespace.service.ServiceNamespace.SERVICE_NAMESPACE; + +import java.util.HashMap; +import java.util.Hashtable; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.stream.Collectors; + +import org.apache.aries.typedevent.remote.api.RemoteEventConstants; +import org.apache.aries.typedevent.remote.remoteservices.spi.RemoteEventBus; +import org.osgi.annotation.bundle.Capability; +import org.osgi.framework.BundleContext; +import org.osgi.framework.Constants; +import org.osgi.framework.Filter; +import org.osgi.framework.FrameworkUtil; +import org.osgi.framework.InvalidSyntaxException; +import org.osgi.framework.ServiceRegistration; +import org.osgi.service.typedevent.TypedEventBus; +import org.osgi.service.typedevent.annotations.RequireTypedEvents; + +/** + * This class implements {@link RemoteEventBus} and is responsible for receiving + * events from remote frameworks and publishing them in the local framework + */ +@Capability(namespace=SERVICE_NAMESPACE, attribute="objectClass:List<String>=org.apache.aries.typedevent.remote.remoteservices.spi.RemoteEventBus", uses=RemoteEventBus.class) +@RequireTypedEvents +public class RemoteEventBusImpl implements RemoteEventBus { + + private final TypedEventBus eventBus; + + private ServiceRegistration<RemoteEventBus> reg; + + private Map<String, Filter> topicsToFilters = new HashMap<>(); + + private final Map<Long, Map<String, Filter>> servicesToInterests = new HashMap<>(); + + private final Object lock = new Object(); + + public RemoteEventBusImpl(TypedEventBus eventBus) { + this.eventBus = eventBus; + } + + public void init(BundleContext ctx) { + ServiceRegistration<RemoteEventBus> reg = ctx.registerService(RemoteEventBus.class, this, null); + + Map<String, Filter> filters; + synchronized(lock) { + this.reg = reg; + filters = topicsToFilters; + } + updateReg(filters); + } + + public void destroy() { + try { + ServiceRegistration<?> reg; + synchronized (lock) { + reg = this.reg; + this.reg = null; + } + + if(reg != null) { + reg.unregister(); + } + } catch (IllegalStateException ise) { + // TODO log + } + } + + @Override + public void notify(String topic, Map<String, Object> properties) { + + boolean hasTopicInterest; + Filter filter; + synchronized (lock) { + hasTopicInterest = topicsToFilters.containsKey(topic); + filter = topicsToFilters.get(topic); + } + + if(hasTopicInterest) { + if(filter == null || filter.matches(properties)) { + properties.put(RemoteEventConstants.REMOTE_EVENT_MARKER, Boolean.TRUE); + eventBus.deliverUntyped(topic, properties); + } else { + //TODO log filter mismatch + } + } else { + // TODO log topic mismatch + } + } + + /** + * Update the data structures and registration to reflect the topic interests + * of the local framework + * + * @param id + * @param topics + * @param filter + */ + void updateLocalInterest(Long id, List<String> topics, Filter filter) { + + boolean doUpdate = false; + + Map<String, Filter> newData = topics.stream() + .collect(toMap(identity(), x -> filter, (a,b) -> a)); + + Map<String, Filter> updatedFilters; + synchronized(lock) { + doUpdate = true; + servicesToInterests.put(id, newData); + topicsToFilters = getUpdatedFilters(); + updatedFilters = topicsToFilters; + } + + if(doUpdate) { + updateReg(updatedFilters); + } + } + + private Map<String, Filter> getUpdatedFilters() { + synchronized (lock) { + return servicesToInterests.values().stream() + .flatMap(m -> m.entrySet().stream()) + .collect(Collectors.toMap(Entry::getKey, Entry::getValue, + this::combineFilters)); + } + } + + private Filter combineFilters(Filter a, Filter b) { + if(a == null) { + return b; + } else if (b == null) { + return a; + } else { + try { + return FrameworkUtil.createFilter("(|" + a.toString() + b.toString() + ")"); + } catch (InvalidSyntaxException e) { + // TODO Auto-generated catch block + throw new RuntimeException(e); + } + } + } + + private void updateReg(Map<String, Filter> filters) { + + Hashtable<String, Object> props = new Hashtable<>(); + + props.put(Constants.SERVICE_EXPORTED_INTERFACES, RemoteEventBus.class.getName()); + props.put(Constants.SERVICE_EXPORTED_INTENTS, "osgi.basic"); + List<String> remoteFilters = filters.entrySet().stream() + .map(e -> e.getKey() + "=" + (e.getValue() == null ? "" : e.getValue().toString())) + .collect(toList()); + props.put(REMOTE_EVENT_FILTERS, remoteFilters); + + + ServiceRegistration<?> reg; + synchronized (lock) { + reg = this.reg; + } + + if(reg != null) { + // Only update if there is a change + Object existingFilters = reg.getReference().getProperty(REMOTE_EVENT_FILTERS); + if(!remoteFilters.equals(existingFilters)) { + reg.setProperties(props); + } + // Deal with a race condition if + Map<String, Filter> updatedFilters; + synchronized (lock) { + updatedFilters = topicsToFilters; + } + if(!updatedFilters.equals(filters)) { + updateReg(updatedFilters); + } + } + } + + void removeLocalInterest(Long id) { + + Map<String, Filter> updatedFilters; + synchronized(lock) { + if(servicesToInterests.remove(id) == null) { + return; + } + topicsToFilters = getUpdatedFilters(); + updatedFilters = topicsToFilters; + } + + updateReg(updatedFilters); + } +} diff --git a/org.apache.aries.typedevent.remote/org.apache.aries.typedevent.remote.remoteservices/src/main/java/org/apache/aries/typedevent/remote/remoteservices/impl/RemoteServiceEventsActivator.java b/org.apache.aries.typedevent.remote/org.apache.aries.typedevent.remote.remoteservices/src/main/java/org/apache/aries/typedevent/remote/remoteservices/impl/RemoteServiceEventsActivator.java new file mode 100644 index 0000000..83b80dd --- /dev/null +++ b/org.apache.aries.typedevent.remote/org.apache.aries.typedevent.remote.remoteservices/src/main/java/org/apache/aries/typedevent/remote/remoteservices/impl/RemoteServiceEventsActivator.java @@ -0,0 +1,332 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.aries.typedevent.remote.remoteservices.impl; + +import static java.lang.Boolean.TRUE; +import static java.util.function.Function.identity; +import static org.apache.aries.component.dsl.OSGi.all; +import static org.apache.aries.component.dsl.OSGi.bundleContext; +import static org.apache.aries.component.dsl.OSGi.coalesce; +import static org.apache.aries.component.dsl.OSGi.configuration; +import static org.apache.aries.component.dsl.OSGi.just; +import static org.apache.aries.component.dsl.OSGi.once; +import static org.apache.aries.component.dsl.OSGi.register; +import static org.apache.aries.component.dsl.OSGi.service; +import static org.apache.aries.component.dsl.OSGi.serviceReferences; +import static org.apache.aries.typedevent.remote.spi.LocalEventConsumerManager.ARIES_LOCAL_EVENT_PROXY; +import static org.osgi.framework.Constants.BUNDLE_ACTIVATOR; +import static org.osgi.framework.Constants.SERVICE_ID; +import static org.osgi.service.typedevent.TypedEventConstants.TYPED_EVENT_TOPICS; +import static org.osgi.service.typedevent.TypedEventConstants.TYPED_EVENT_FILTER; +import static org.osgi.util.converter.Converters.standardConverter; + +import java.lang.reflect.ParameterizedType; +import java.util.Arrays; +import java.util.Collections; +import java.util.Dictionary; +import java.util.Enumeration; +import java.util.HashMap; +import java.util.Hashtable; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +import org.apache.aries.component.dsl.OSGi; +import org.apache.aries.component.dsl.OSGiResult; +import org.apache.aries.typedevent.remote.api.RemoteEventMonitor; +import org.apache.aries.typedevent.remote.remoteservices.spi.RemoteEventBus; +import org.apache.aries.typedevent.remote.spi.RemoteEventMonitorImpl; +import org.osgi.annotation.bundle.Header; +import org.osgi.framework.BundleActivator; +import org.osgi.framework.BundleContext; +import org.osgi.framework.Filter; +import org.osgi.framework.FrameworkUtil; +import org.osgi.framework.InvalidSyntaxException; +import org.osgi.framework.ServiceReference; +import org.osgi.service.typedevent.TypedEventBus; +import org.osgi.service.typedevent.TypedEventConstants; +import org.osgi.service.typedevent.TypedEventHandler; +import org.osgi.service.typedevent.UntypedEventHandler; +import org.osgi.service.typedevent.monitor.TypedEventMonitor; +import org.osgi.util.converter.TypeReference; +import org.osgi.util.tracker.ServiceTracker; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +@Header(name = BUNDLE_ACTIVATOR, value = "${@class}") +public class RemoteServiceEventsActivator implements BundleActivator { + + private static final Logger _log = LoggerFactory.getLogger(RemoteServiceEventsActivator.class); + + OSGiResult eventBus; + + @Override + public void start(BundleContext bundleContext) throws Exception { + if (_log.isDebugEnabled()) { + _log.debug("Aries Remote Typed Events (Remote Services) Starting"); + } + + eventBus = coalesce(configuration("org.apache.aries.typedevent.remote.remoteservices"), just(Hashtable::new)) + .map(this::toConfigProps).flatMap(configuration -> createProgram(configuration)).run(bundleContext); + + if (_log.isDebugEnabled()) { + _log.debug("Aries Typed Event Bus Started"); + } + } + + private OSGi<?> createProgram(Map<String, ?> configuration) { + + OSGi<Object> monitor = service(once(serviceReferences(TypedEventMonitor.class))) + .map(RemoteEventMonitorImpl::new) + .flatMap(remi -> register(RemoteEventMonitor.class, remi, new HashMap<>())); + + OSGi<Object> remote = bundleContext().flatMap(ctx -> service(once(serviceReferences(TypedEventBus.class))) + .map(RemoteEventBusImpl::new).effects(rebi -> rebi.init(ctx), rebi -> rebi.destroy()) + .flatMap(rebi -> all( + just(new UntypedEventTracker(ctx, rebi)).map(ServiceTracker.class::cast) + .effects(st -> st.open(), st -> st.close()), + just(new TypedEventTracker(ctx, rebi)).map(ServiceTracker.class::cast).effects(st -> st.open(), + st -> st.close())))); + + OSGi<Object> local = bundleContext() + .flatMap(ctx -> just(new LocalEventBusForwarder()).effects(lebf -> lebf.start(ctx), lebf -> lebf.stop()) + .flatMap(lebf -> serviceReferences(RemoteEventBus.class, "(service.imported=true)", csr -> { + lebf.updatedRemoteEventBus(getServiceProps(csr.getServiceReference())); + return false; + }).flatMap(csr -> service(csr).effects( + reb -> lebf.addRemoteEventBus(reb, getServiceProps(csr.getServiceReference())), + reb -> lebf.removeRemoteEventBus(reb, getServiceProps(csr.getServiceReference())))))); + + return all(monitor, remote, local); + } + + private Map<String, Object> toConfigProps(Dictionary<String, ?> config) { + Enumeration<String> keys = config.keys(); + Map<String, Object> map = new HashMap<>(); + while (keys.hasMoreElements()) { + String key = keys.nextElement(); + map.put(key, config.get(key)); + } + return map; + } + + private Map<String, Object> getServiceProps(ServiceReference<?> ref) { + return Arrays.stream(ref.getPropertyKeys()).collect(Collectors.toMap(identity(), ref::getProperty)); + } + + @Override + public void stop(BundleContext context) throws Exception { + if (_log.isDebugEnabled()) { + _log.debug("Aries Typed Event Bus Stopping"); + } + + eventBus.close(); + + if (_log.isDebugEnabled()) { + _log.debug("Aries Typed Event Bus Stopped"); + } + } + + private static final TypeReference<List<String>> LIST_OF_STRINGS = new TypeReference<List<String>>() { + }; + + private static Long getServiceId(ServiceReference<?> ref) { + return standardConverter().convert(ref.getProperty(SERVICE_ID)).to(Long.class); + } + + private static List<String> getTopics(ServiceReference<?> ref) { + return standardConverter().convert(ref.getProperty(TYPED_EVENT_TOPICS)).to(LIST_OF_STRINGS); + } + + private static Filter getFilter(ServiceReference<?> ref) throws InvalidSyntaxException { + String filter = standardConverter().convert(ref.getProperty(TYPED_EVENT_FILTER)).to(String.class); + if (filter == null || "".equals(filter)) { + return null; + } else { + return FrameworkUtil.createFilter(filter); + } + } + + private static class UntypedEventTracker extends ServiceTracker<UntypedEventHandler, Object> { + + private final RemoteEventBusImpl impl; + + public UntypedEventTracker(BundleContext context, RemoteEventBusImpl impl) { + super(context, UntypedEventHandler.class, null); + this.impl = impl; + } + + @Override + public Object addingService(ServiceReference<UntypedEventHandler> reference) { + + if(TRUE.equals(reference.getProperty(ARIES_LOCAL_EVENT_PROXY))) { + // Ignore remote interest proxies + return null; + } + + Filter filter; + try { + filter = getFilter(reference); + } catch (InvalidSyntaxException e) { + // TODO Auto-generated catch block + return reference; + } + impl.updateLocalInterest(getServiceId(reference), getTopics(reference), filter); + return reference; + } + + @Override + public void modifiedService(ServiceReference<UntypedEventHandler> reference, Object service) { + Filter filter; + try { + filter = getFilter(reference); + } catch (InvalidSyntaxException e) { + // TODO Auto-generated catch block + impl.removeLocalInterest(getServiceId(reference)); + return; + } + impl.updateLocalInterest(getServiceId(reference), getTopics(reference), filter); + } + + @Override + public void removedService(ServiceReference<UntypedEventHandler> reference, Object service) { + impl.removeLocalInterest(getServiceId(reference)); + } + }; + + @SuppressWarnings("rawtypes") + private static class TypedEventTracker extends ServiceTracker<TypedEventHandler, TypedEventHandler> { + + private final RemoteEventBusImpl impl; + + public TypedEventTracker(BundleContext context, RemoteEventBusImpl impl) { + super(context, TypedEventHandler.class, null); + this.impl = impl; + } + + @Override + public TypedEventHandler addingService(ServiceReference<TypedEventHandler> reference) { + TypedEventHandler toReturn = context.getService(reference); + Filter filter; + try { + filter = getFilter(reference); + } catch (InvalidSyntaxException e) { + // TODO Auto-generated catch block + return toReturn; + } + List<String> topics = findTopics(reference, toReturn); + if (!topics.isEmpty()) { + impl.updateLocalInterest(getServiceId(reference), topics, filter); + } + return toReturn; + } + + private List<String> findTopics(ServiceReference<TypedEventHandler> reference, TypedEventHandler service) { + List<String> topics = getTopics(reference); + if (topics.isEmpty()) { + Object type = reference.getProperty(TypedEventConstants.TYPED_EVENT_TYPE); + if (type != null) { + topics = Collections.singletonList(String.valueOf(type).replace(".", "/")); + } else { + Class<?> clazz = discoverTypeForTypedHandler(service); + if (clazz != null) { + topics = Collections.singletonList(clazz.getName().replace(".", "/")); + } + } + } + return topics; + } + + /** + * Extensively copied from the Core Event Bus - is there a better way to share + * this? + * + * @param handler + * @param properties + * @return + */ + private Class<?> discoverTypeForTypedHandler(TypedEventHandler<?> handler) { + Class<?> clazz = null; + Class<?> toCheck = handler.getClass(); + while (clazz == null) { + clazz = findDirectlyImplemented(toCheck); + + if (clazz != null) { + break; + } + + clazz = processInterfaceHierarchyForClass(toCheck); + + if (clazz != null) { + break; + } + + toCheck = toCheck.getSuperclass(); + } + + return clazz; + } + + private Class<?> processInterfaceHierarchyForClass(Class<?> toCheck) { + Class<?> clazz = null; + for (Class<?> iface : toCheck.getInterfaces()) { + clazz = findDirectlyImplemented(iface); + + if (clazz != null) { + break; + } + + clazz = processInterfaceHierarchyForClass(iface); + + if (clazz != null) { + break; + } + } + return clazz; + } + + private Class<?> findDirectlyImplemented(Class<?> toCheck) { + return Arrays.stream(toCheck.getGenericInterfaces()).filter(ParameterizedType.class::isInstance) + .map(ParameterizedType.class::cast).filter(t -> TypedEventHandler.class.equals(t.getRawType())) + .map(t -> t.getActualTypeArguments()[0]).findFirst().map(Class.class::cast).orElse(null); + } + + @Override + public void modifiedService(ServiceReference<TypedEventHandler> reference, TypedEventHandler service) { + Filter filter; + try { + filter = getFilter(reference); + } catch (InvalidSyntaxException e) { + // TODO Auto-generated catch block + impl.removeLocalInterest(getServiceId(reference)); + return; + } + + List<String> topics = findTopics(reference, service); + if (topics.isEmpty()) { + impl.removeLocalInterest(getServiceId(reference)); + } else { + impl.updateLocalInterest(getServiceId(reference), getTopics(reference), filter); + } + } + + @Override + public void removedService(ServiceReference<TypedEventHandler> reference, TypedEventHandler service) { + impl.removeLocalInterest(getServiceId(reference)); + } + }; +} diff --git a/org.apache.aries.typedevent.remote/org.apache.aries.typedevent.remote.remoteservices/src/main/java/org/apache/aries/typedevent/remote/remoteservices/spi/RemoteEventBus.java b/org.apache.aries.typedevent.remote/org.apache.aries.typedevent.remote.remoteservices/src/main/java/org/apache/aries/typedevent/remote/remoteservices/spi/RemoteEventBus.java new file mode 100644 index 0000000..2c0336e --- /dev/null +++ b/org.apache.aries.typedevent.remote/org.apache.aries.typedevent.remote.remoteservices/src/main/java/org/apache/aries/typedevent/remote/remoteservices/spi/RemoteEventBus.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.aries.typedevent.remote.remoteservices.spi; + +import java.util.Map; + +import org.osgi.annotation.versioning.ProviderType; + +/** + * This interface should not be used by typical users of the + * Typed Event specification. It is intended to be a bridge + * between different mechanisms for broadcasting remote events + */ + +@ProviderType +public interface RemoteEventBus { + + /** + * This service property provides a String+ containing <topic>=<filter> + * entries indicating the events that the remote nodes are interested in. + */ + public static final String REMOTE_EVENT_FILTERS = "remote.event.filters"; + + /** + * Called to notify this instance of an event from a remote framework + */ + public void notify(String topic, Map<String, Object> eventData); +} diff --git a/org.apache.aries.typedevent.remote/org.apache.aries.typedevent.remote.remoteservices/src/main/java/org/apache/aries/typedevent/remote/remoteservices/spi/package-info.java b/org.apache.aries.typedevent.remote/org.apache.aries.typedevent.remote.remoteservices/src/main/java/org/apache/aries/typedevent/remote/remoteservices/spi/package-info.java new file mode 100644 index 0000000..ed6cdcc --- /dev/null +++ b/org.apache.aries.typedevent.remote/org.apache.aries.typedevent.remote.remoteservices/src/main/java/org/apache/aries/typedevent/remote/remoteservices/spi/package-info.java @@ -0,0 +1,19 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ [email protected] [email protected]("0.0.1") +package org.apache.aries.typedevent.remote.remoteservices.spi; \ No newline at end of file diff --git a/org.apache.aries.typedevent.remote/org.apache.aries.typedevent.remote.remoteservices/src/test/java/org/apache/aries/typedevent/remote/remoteservices/common/TestEvent.java b/org.apache.aries.typedevent.remote/org.apache.aries.typedevent.remote.remoteservices/src/test/java/org/apache/aries/typedevent/remote/remoteservices/common/TestEvent.java new file mode 100644 index 0000000..3d6dc4e --- /dev/null +++ b/org.apache.aries.typedevent.remote/org.apache.aries.typedevent.remote.remoteservices/src/test/java/org/apache/aries/typedevent/remote/remoteservices/common/TestEvent.java @@ -0,0 +1,21 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.aries.typedevent.remote.remoteservices.common; + +public class TestEvent { + public String message; +} \ No newline at end of file diff --git a/org.apache.aries.typedevent.remote/org.apache.aries.typedevent.remote.remoteservices/src/test/java/org/apache/aries/typedevent/remote/remoteservices/impl/RemoteEventBusImplTest.java b/org.apache.aries.typedevent.remote/org.apache.aries.typedevent.remote.remoteservices/src/test/java/org/apache/aries/typedevent/remote/remoteservices/impl/RemoteEventBusImplTest.java new file mode 100644 index 0000000..9ddd8e1 --- /dev/null +++ b/org.apache.aries.typedevent.remote/org.apache.aries.typedevent.remote.remoteservices/src/test/java/org/apache/aries/typedevent/remote/remoteservices/impl/RemoteEventBusImplTest.java @@ -0,0 +1,151 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.aries.typedevent.remote.remoteservices.impl; + +import static java.util.Collections.emptyList; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNull; +import static org.osgi.framework.FrameworkUtil.createFilter; + +import java.util.Arrays; +import java.util.Dictionary; + +import org.apache.aries.typedevent.remote.remoteservices.spi.RemoteEventBus; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.mockito.ArgumentCaptor; +import org.mockito.Mock; +import org.mockito.Mockito; +import org.mockito.MockitoAnnotations; +import org.osgi.framework.BundleContext; +import org.osgi.framework.InvalidSyntaxException; +import org.osgi.framework.ServiceReference; +import org.osgi.framework.ServiceRegistration; +import org.osgi.service.typedevent.TypedEventBus; + +@SuppressWarnings("unchecked") +public class RemoteEventBusImplTest { + + @Mock + BundleContext context; + + @Mock + ServiceRegistration<RemoteEventBus> remoteReg; + @Mock + ServiceReference<RemoteEventBus> remoteRef; + + @Mock + TypedEventBus eventBusImpl; + + RemoteEventBusImpl remoteImpl; + + private AutoCloseable mocks; + + @BeforeEach + public void start() { + + mocks = MockitoAnnotations.openMocks(this); + + Mockito.when(context.registerService(Mockito.eq(RemoteEventBus.class), + Mockito.any(RemoteEventBus.class), Mockito.any())).thenReturn(remoteReg); + Mockito.when(remoteReg.getReference()).thenReturn(remoteRef); + + remoteImpl = new RemoteEventBusImpl(eventBusImpl); + } + + + @AfterEach + public void destroy() throws Exception { + remoteImpl.destroy(); + mocks.close(); + } + + @Test + public void testEmptyStart() { + remoteImpl.init(context); + + ArgumentCaptor<Dictionary<String, Object>> propsCaptor = ArgumentCaptor.forClass(Dictionary.class); + + Mockito.verify(context).registerService(Mockito.eq(RemoteEventBus.class), Mockito.same(remoteImpl), + propsCaptor.capture()); + + Dictionary<String, Object> props = propsCaptor.getValue(); + assertNull(props); + + Mockito.verify(remoteReg).setProperties(propsCaptor.capture()); + + props = propsCaptor.getValue(); + + assertEquals(RemoteEventBus.class.getName(), props.get("service.exported.interfaces")); + assertEquals(emptyList(), props.get(RemoteEventBus.REMOTE_EVENT_FILTERS)); + } + + @Test + public void testStartWithDetails() throws InvalidSyntaxException { + + remoteImpl.updateLocalInterest(42L, Arrays.asList("FOO"), createFilter("(fizz=buzz)")); + + remoteImpl.init(context); + + ArgumentCaptor<Dictionary<String, Object>> propsCaptor = ArgumentCaptor.forClass(Dictionary.class); + + Mockito.verify(context).registerService(Mockito.eq(RemoteEventBus.class), Mockito.same(remoteImpl), + propsCaptor.capture()); + + Dictionary<String, Object> props = propsCaptor.getValue(); + assertNull(props); + + Mockito.verify(remoteReg).setProperties(propsCaptor.capture()); + + props = propsCaptor.getValue(); + + assertEquals(RemoteEventBus.class.getName(), props.get("service.exported.interfaces")); + assertEquals(Arrays.asList("FOO=(fizz=buzz)"), props.get(RemoteEventBus.REMOTE_EVENT_FILTERS)); + } + + @Test + public void testLateRegisterOfListener() throws InvalidSyntaxException { + remoteImpl.init(context); + + ArgumentCaptor<Dictionary<String, Object>> propsCaptor = ArgumentCaptor.forClass(Dictionary.class); + + Mockito.verify(context).registerService(Mockito.eq(RemoteEventBus.class), Mockito.same(remoteImpl), + propsCaptor.capture()); + + Dictionary<String, Object> props = propsCaptor.getValue(); + assertNull(props); + + Mockito.verify(remoteReg).setProperties(propsCaptor.capture()); + + props = propsCaptor.getValue(); + + assertEquals(RemoteEventBus.class.getName(), props.get("service.exported.interfaces")); + assertEquals(emptyList(), props.get(RemoteEventBus.REMOTE_EVENT_FILTERS)); + + // Add a listener to the remote + + remoteImpl.updateLocalInterest(42L, Arrays.asList("FOO"), createFilter("(fizz=buzz)")); + + Mockito.verify(remoteReg, Mockito.times(2)).setProperties(propsCaptor.capture()); + + props = propsCaptor.getValue(); + + assertEquals(RemoteEventBus.class.getName(), props.get("service.exported.interfaces")); + assertEquals(Arrays.asList("FOO=(fizz=buzz)"), props.get(RemoteEventBus.REMOTE_EVENT_FILTERS)); + } +} diff --git a/org.apache.aries.typedevent.remote/org.apache.aries.typedevent.remote.remoteservices/src/test/java/org/apache/aries/typedevent/remote/remoteservices/osgi/AbstractIntegrationTest.java b/org.apache.aries.typedevent.remote/org.apache.aries.typedevent.remote.remoteservices/src/test/java/org/apache/aries/typedevent/remote/remoteservices/osgi/AbstractIntegrationTest.java new file mode 100644 index 0000000..dfcf586 --- /dev/null +++ b/org.apache.aries.typedevent.remote/org.apache.aries.typedevent.remote.remoteservices/src/test/java/org/apache/aries/typedevent/remote/remoteservices/osgi/AbstractIntegrationTest.java @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.aries.typedevent.remote.remoteservices.osgi; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + +import org.apache.aries.typedevent.remote.remoteservices.common.TestEvent; +import org.junit.jupiter.api.AfterEach; +import org.mockito.ArgumentMatcher; +import org.osgi.framework.ServiceRegistration; + +/** + * This is a JUnit test that will be run inside an OSGi framework. + * + * It can interact with the framework by starting or stopping bundles, + * getting or registering services, or in other ways, and then observing + * the result on the bundle(s) being tested. + */ +public abstract class AbstractIntegrationTest { + + protected static final String TEST_EVENT_TOPIC = TestEvent.class.getName().replace(".", "/"); + + + protected final List<ServiceRegistration<?>> regs = new ArrayList<ServiceRegistration<?>>(); + + @AfterEach + public void tearDown() throws Exception { + regs.forEach(sr -> { + try { + sr.unregister(); + } catch (Exception e) { } + }); + } + + protected ArgumentMatcher<TestEvent> isTestEventWithMessage(String message) { + return new ArgumentMatcher<TestEvent>() { + + @Override + public boolean matches(TestEvent argument) { + return message.equals(argument.message); + } + }; + } + + protected ArgumentMatcher<Map<String, Object>> isUntypedTestEventWithMessage(String message) { + return new ArgumentMatcher<Map<String, Object>>() { + + @Override + public boolean matches(Map<String, Object> argument) { + return argument != null && message.equals(argument.get("message")); + } + }; + } +} \ No newline at end of file diff --git a/org.apache.aries.typedevent.remote/org.apache.aries.typedevent.remote.remoteservices/src/test/java/org/apache/aries/typedevent/remote/remoteservices/osgi/RemoteEventBusIntegrationTest.java b/org.apache.aries.typedevent.remote/org.apache.aries.typedevent.remote.remoteservices/src/test/java/org/apache/aries/typedevent/remote/remoteservices/osgi/RemoteEventBusIntegrationTest.java new file mode 100644 index 0000000..9788848 --- /dev/null +++ b/org.apache.aries.typedevent.remote/org.apache.aries.typedevent.remote.remoteservices/src/test/java/org/apache/aries/typedevent/remote/remoteservices/osgi/RemoteEventBusIntegrationTest.java @@ -0,0 +1,348 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.aries.typedevent.remote.remoteservices.osgi; + +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.mockito.ArgumentMatchers.argThat; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.verify; +import static org.osgi.service.typedevent.TypedEventConstants.TYPED_EVENT_FILTER; +import static org.osgi.service.typedevent.TypedEventConstants.TYPED_EVENT_TOPICS; + +import java.io.File; +import java.lang.reflect.Proxy; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.Dictionary; +import java.util.HashMap; +import java.util.Hashtable; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.ServiceLoader; +import java.util.UUID; +import java.util.concurrent.ConcurrentHashMap; + +import org.apache.aries.typedevent.remote.remoteservices.common.TestEvent; +import org.apache.aries.typedevent.remote.remoteservices.spi.RemoteEventBus; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.Mock; +import org.mockito.Mockito; +import org.mockito.MockitoAnnotations; +import org.osgi.framework.Bundle; +import org.osgi.framework.BundleContext; +import org.osgi.framework.BundleException; +import org.osgi.framework.Constants; +import org.osgi.framework.ServiceException; +import org.osgi.framework.ServiceFactory; +import org.osgi.framework.ServiceReference; +import org.osgi.framework.ServiceRegistration; +import org.osgi.framework.launch.Framework; +import org.osgi.framework.launch.FrameworkFactory; +import org.osgi.framework.wiring.FrameworkWiring; +import org.osgi.service.typedevent.TypedEventBus; +import org.osgi.service.typedevent.UnhandledEventHandler; +import org.osgi.service.typedevent.UntypedEventHandler; +import org.osgi.test.common.annotation.InjectBundleContext; +import org.osgi.test.common.annotation.InjectService; +import org.osgi.test.junit5.context.BundleContextExtension; +import org.osgi.test.junit5.service.ServiceExtension; +import org.osgi.util.tracker.ServiceTracker; + +/** + * This is a JUnit test that will be run inside an OSGi framework. + * + * It can interact with the framework by starting or stopping bundles, getting + * or registering services, or in other ways, and then observing the result on + * the bundle(s) being tested. + */ +@ExtendWith(BundleContextExtension.class) +@ExtendWith(ServiceExtension.class) +public class RemoteEventBusIntegrationTest extends AbstractIntegrationTest { + + private static final String REMOTE_BUS = RemoteEventBus.class.getName(); + private static final String UNTYPED_HANDLER = UntypedEventHandler.class.getName(); + private static final String UNHANDLED_HANDLER = UnhandledEventHandler.class.getName(); + private Map<UUID, Framework> frameworks; + private Map<UUID, ServiceTracker<?,?>> remoteServicePublishers = new ConcurrentHashMap<>(); + + @InjectBundleContext + BundleContext bundleContext; + + @InjectService + TypedEventBus bus; + + @Mock + UntypedEventHandler untypedEventHandler; + + @Mock + UnhandledEventHandler unhandledEventHandler; + + AutoCloseable mocks; + + @BeforeEach + public void setUpFrameworks() throws Exception { + mocks = MockitoAnnotations.openMocks(this); + + assertNotNull(bundleContext, "OSGi Bundle tests must be run inside an OSGi framework"); + + frameworks = createFrameworks(2); + frameworks.put(getMasterFrameworkUUID(), bundleContext.getBundle(0).adapt(Framework.class)); + + for (Entry<UUID, Framework> entry : frameworks.entrySet()) { + Framework f = entry.getValue(); + + BundleContext context = f.getBundleContext(); + ServiceTracker<Object, Object> tracker = createCrossFrameworkPublisher(entry, context); + + remoteServicePublishers.put(entry.getKey(), tracker); + } + } + + private ServiceTracker<Object, Object> createCrossFrameworkPublisher(Entry<UUID, Framework> entry, + BundleContext context) { + ServiceTracker<Object, Object> tracker = new ServiceTracker<Object, Object>(context, + REMOTE_BUS, null) { + + Map<UUID, ServiceRegistration<?>> registered = new ConcurrentHashMap<>(); + + @Override + public Object addingService(ServiceReference<Object> reference) { + + if(reference.getBundle().getBundleId() == 0) { + return null; + } + + Object service = super.addingService(reference); + + for (Entry<UUID, Framework> e : frameworks.entrySet()) { + UUID fwkId = entry.getKey(); + if(fwkId.equals(e.getKey())) { + // Skip this framework as it's the same framework the service came from + continue; + } + + Framework fw = e.getValue(); + + registered.put(fwkId, fw.getBundleContext().registerService( + REMOTE_BUS, new EventHandlerFactory(service, REMOTE_BUS), + getRegistrationProps(reference))); + } + + return service; + } + + Dictionary<String, Object> getRegistrationProps(ServiceReference<?> ref) { + Dictionary<String, Object> toReturn = new Hashtable<String, Object>(); + String[] props = ref.getPropertyKeys(); + for(String key : props) { + toReturn.put(key, ref.getProperty(key)); + } + + toReturn.put("service.imported", true); + return toReturn; + } + + @Override + public void modifiedService(ServiceReference<Object> reference, Object service) { + for(ServiceRegistration<?> reg : registered.values()) { + reg.setProperties(getRegistrationProps(reference)); + } + } + + @Override + public void removedService(ServiceReference<Object> reference, Object service) { + for (ServiceRegistration<?> registration : registered.values()) { + try { + registration.unregister(); + } catch (Exception e) { + // Never mind + } + } + registered.clear(); + super.removedService(reference, service); + } + + }; + tracker.open(true); + return tracker; + } + + @AfterEach + public void shutdownFrameworks() throws Exception { + + frameworks.remove(getMasterFrameworkUUID()); + + remoteServicePublishers.values().forEach(ServiceTracker::close); + remoteServicePublishers.clear(); + + frameworks.values().forEach(f -> { + try { + f.stop(); + } catch (BundleException be) { + // Never mind + } + }); + + frameworks.clear(); + + mocks.close(); + } + + private Map<UUID, Framework> createFrameworks(int size) throws BundleException { + + FrameworkFactory ff = ServiceLoader.load(FrameworkFactory.class, + FrameworkFactory.class.getClassLoader()).iterator().next(); + + List<String> locations = new ArrayList<>(); + + for(Bundle b : bundleContext.getBundles()) { + if( + b.getSymbolicName().equals("org.apache.aries.typedevent.bus") || + b.getSymbolicName().equals("org.apache.aries.typedevent.remote.api") || + b.getSymbolicName().equals("org.apache.aries.typedevent.remote.spi") || + b.getSymbolicName().equals("org.apache.aries.typedevent.remote.remoteservices") || + b.getSymbolicName().equals("org.apache.aries.component-dsl.component-dsl") || + b.getSymbolicName().equals("org.apache.felix.converter") || + b.getSymbolicName().equals("org.apache.felix.configadmin") || + b.getSymbolicName().equals("org.osgi.service.typedevent") || + b.getSymbolicName().equals("org.osgi.util.function") || + b.getSymbolicName().equals("org.osgi.util.promise") || + b.getSymbolicName().equals("org.osgi.util.pushstream") || + b.getSymbolicName().equals("slf4j.api") || + b.getSymbolicName().startsWith("ch.qos.logback")) { + locations.add(b.getLocation()); + } + } + + Map<UUID, Framework> frameworks = new HashMap<UUID, Framework>(); + for(int i = 1; i < size; i++) { + Map<String, String> fwConfig = new HashMap<>(); + fwConfig.put(Constants.FRAMEWORK_STORAGE, new File(bundleContext.getDataFile(""), "Test-Cluster" + i).getAbsolutePath()); + fwConfig.put(Constants.FRAMEWORK_STORAGE_CLEAN, Constants.FRAMEWORK_STORAGE_CLEAN_ONFIRSTINIT); + Framework f = ff.newFramework(fwConfig); + f.init(); + for(String s : locations) { + f.getBundleContext().installBundle(s); + } + f.start(); + f.adapt(FrameworkWiring.class).resolveBundles(Collections.emptySet()); + for(Bundle b : f.getBundleContext().getBundles()) { + if(b.getHeaders().get(Constants.FRAGMENT_HOST) == null) { + b.start(); + } + } + frameworks.put(getUUID(f), f); + } + return frameworks; + } + + private UUID getMasterFrameworkUUID() { + return UUID.fromString(bundleContext.getProperty(Constants.FRAMEWORK_UUID)); + } + + private UUID getUUID(Framework f) { + return UUID.fromString(f.getBundleContext().getProperty(Constants.FRAMEWORK_UUID)); + } + + + public static class EventHandlerFactory implements ServiceFactory<Object> { + + private final Object delegate; + private final String typeToMimic; + + public EventHandlerFactory(Object delegate, String typeToMimic) { + this.delegate = delegate; + this.typeToMimic = typeToMimic; + } + + @Override + public Object getService(Bundle bundle, ServiceRegistration<Object> registration) { + + try { + Class<?> loadClass = bundle.loadClass(typeToMimic); + + return Proxy.newProxyInstance(loadClass.getClassLoader(), new Class<?>[] {loadClass}, + (o,m,a) -> { + + if(m.getName().startsWith("notify") && m.getParameterTypes().length > 0) { + return delegate.getClass().getMethod(m.getName(), m.getParameterTypes()) + .invoke(delegate, a); + } else { + return m.invoke(delegate, a); + } + }); + + } catch (Exception e) { + throw new ServiceException("failed to create service", e); + } + } + + @Override + public void ungetService(Bundle bundle, ServiceRegistration<Object> registration, Object service) { + // TODO Auto-generated method stub + + } + + } + + @Test + public void testSendToRemoteFramework() throws InterruptedException { + + Dictionary<String, Object> props = new Hashtable<>(); + regs.add(bundleContext.registerService(UNHANDLED_HANDLER, unhandledEventHandler, props)); + + TestEvent event = new TestEvent(); + event.message = "boo"; + + bus.deliver(event); + + + verify(unhandledEventHandler, Mockito.after(100).times(1)) + .notifyUnhandled(eq(TEST_EVENT_TOPIC), argThat(isUntypedTestEventWithMessage("boo"))); + + + BundleContext remoteContext = frameworks.values().stream() + .filter(fw -> !getUUID(fw).equals(getMasterFrameworkUUID())) + .flatMap(fw -> Arrays.stream(fw.getBundleContext().getBundles())) + .filter(b -> b.getSymbolicName().equals("org.osgi.service.typedevent")) + .map(Bundle::getBundleContext) + .findFirst() + .get(); + + props = new Hashtable<>(); + props.put(TYPED_EVENT_TOPICS, TEST_EVENT_TOPIC); + props.put(TYPED_EVENT_FILTER, "(message=boo)"); + + regs.add(remoteContext.registerService(UNTYPED_HANDLER, + new EventHandlerFactory(untypedEventHandler, UNTYPED_HANDLER), props)); + + + bus.deliver(event); + + verify(unhandledEventHandler, Mockito.after(1000).times(1)) + .notifyUnhandled(eq(TEST_EVENT_TOPIC), argThat(isUntypedTestEventWithMessage("boo"))); + + verify(untypedEventHandler) + .notifyUntyped(eq(TEST_EVENT_TOPIC), argThat(isUntypedTestEventWithMessage("boo"))); + } + +} \ No newline at end of file diff --git a/org.apache.aries.typedevent.remote/org.apache.aries.typedevent.remote.remoteservices/test.bndrun b/org.apache.aries.typedevent.remote/org.apache.aries.typedevent.remote.remoteservices/test.bndrun new file mode 100644 index 0000000..704e7b4 --- /dev/null +++ b/org.apache.aries.typedevent.remote/org.apache.aries.typedevent.remote.remoteservices/test.bndrun @@ -0,0 +1,56 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +-tester: biz.aQute.tester.junit-platform + +-runfw: org.apache.felix.framework + +-runrequires: bnd.identity;id="org.apache.aries.typedevent.remote.remoteservices-tests",\ + bnd.identity;id="junit-jupiter-engine",\ + bnd.identity;id="junit-platform-launcher" + +-runsystempackages: sun.reflect + +-resolve.effective: active +-runbundles: \ + ch.qos.logback.classic;version='[1.2.3,1.2.4)',\ + ch.qos.logback.core;version='[1.2.3,1.2.4)',\ + org.apache.aries.component-dsl.component-dsl;version='[1.2.2,1.2.3)',\ + org.apache.felix.configadmin;version='[1.9.18,1.9.19)',\ + org.apache.felix.converter;version='[1.0.14,1.0.15)',\ + org.osgi.service.typedevent;version='[1.0.0,1.0.1)',\ + org.osgi.util.function;version='[1.1.0,1.1.1)',\ + org.osgi.util.promise;version='[1.1.1,1.1.2)',\ + org.osgi.util.pushstream;version='[1.0.1,1.0.2)',\ + slf4j.api;version='[1.7.30,1.7.31)',\ + junit-jupiter-api;version='[5.6.2,5.6.3)',\ + junit-platform-commons;version='[1.6.2,1.6.3)',\ + net.bytebuddy.byte-buddy;version='[1.10.13,1.10.14)',\ + net.bytebuddy.byte-buddy-agent;version='[1.10.13,1.10.14)',\ + org.mockito.mockito-core;version='[3.5.10,3.5.11)',\ + org.objenesis;version='[3.1.0,3.1.1)',\ + org.opentest4j;version='[1.2.0,1.2.1)',\ + org.osgi.test.common;version='[0.9.0,0.9.1)',\ + org.osgi.test.junit5;version='[0.9.0,0.9.1)',\ + junit-platform-engine;version='[1.6.2,1.6.3)',\ + junit-platform-launcher;version='[1.6.2,1.6.3)',\ + junit-jupiter-engine;version='[5.6.2,5.6.3)',\ + org.apache.aries.typedevent.remote.api;version='[0.0.1,0.0.2)',\ + org.apache.aries.typedevent.remote.remoteservices;version='[0.0.1,0.0.2)',\ + org.apache.aries.typedevent.remote.remoteservices-tests;version='[0.0.1,0.0.2)',\ + org.apache.aries.typedevent.remote.spi;version='[0.0.1,0.0.2)',\ + org.apache.aries.typedevent.bus;version='[0.0.1,0.0.2)' diff --git a/org.apache.aries.typedevent.remote/org.apache.aries.typedevent.remote.spi/pom.xml b/org.apache.aries.typedevent.remote/org.apache.aries.typedevent.remote.spi/pom.xml new file mode 100644 index 0000000..a288407 --- /dev/null +++ b/org.apache.aries.typedevent.remote/org.apache.aries.typedevent.remote.spi/pom.xml @@ -0,0 +1,50 @@ +<project xmlns="http://maven.apache.org/POM/4.0.0" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <modelVersion>4.0.0</modelVersion> + <parent> + <groupId>org.apache.aries.typedevent</groupId> + <artifactId>typedevent-parent</artifactId> + <version>0.0.1-SNAPSHOT</version> + </parent> + + <groupId>org.apache.aries.typedevent.remote</groupId> + <artifactId>org.apache.aries.typedevent.remote.spi</artifactId> + + <dependencies> + <dependency> + <groupId>org.osgi</groupId> + <artifactId>org.osgi.core</artifactId> + </dependency> + <dependency> + <groupId>org.osgi</groupId> + <artifactId>org.osgi.service.typedevent</artifactId> + </dependency> + <dependency> + <groupId>org.osgi</groupId> + <artifactId>osgi.annotation</artifactId> + </dependency> + <dependency> + <groupId>org.osgi</groupId> + <artifactId>org.osgi.annotation.bundle</artifactId> + </dependency> + <dependency> + <groupId>org.apache.felix</groupId> + <artifactId>org.apache.felix.converter</artifactId> + </dependency> + <dependency> + <groupId>org.apache.aries.typedevent.remote</groupId> + <artifactId>org.apache.aries.typedevent.remote.api</artifactId> + <version>0.0.1-SNAPSHOT</version> + </dependency> + </dependencies> + + <build> + <plugins> + <plugin> + <groupId>biz.aQute.bnd</groupId> + <artifactId>bnd-maven-plugin</artifactId> + </plugin> + </plugins> + </build> +</project> \ No newline at end of file diff --git a/org.apache.aries.typedevent.remote/org.apache.aries.typedevent.remote.spi/src/main/java/org/apache/aries/typedevent/remote/spi/LocalEventConsumerManager.java b/org.apache.aries.typedevent.remote/org.apache.aries.typedevent.remote.spi/src/main/java/org/apache/aries/typedevent/remote/spi/LocalEventConsumerManager.java new file mode 100644 index 0000000..ff3a13e --- /dev/null +++ b/org.apache.aries.typedevent.remote/org.apache.aries.typedevent.remote.spi/src/main/java/org/apache/aries/typedevent/remote/spi/LocalEventConsumerManager.java @@ -0,0 +1,183 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.aries.typedevent.remote.spi; + +import static java.lang.Boolean.TRUE; +import static org.osgi.service.typedevent.TypedEventConstants.TYPED_EVENT_FILTER; +import static org.osgi.service.typedevent.TypedEventConstants.TYPED_EVENT_TOPICS; + +import java.util.Dictionary; +import java.util.HashMap; +import java.util.Hashtable; +import java.util.Map; +import java.util.Map.Entry; +import java.util.stream.Collectors; + +import org.osgi.framework.BundleContext; +import org.osgi.framework.ServiceRegistration; +import org.osgi.service.typedevent.UntypedEventHandler; + +/** + * A simple helper class used to manage the registrations of {@link UntypedEventHandler} + * services in the local service registry, used to feed events into the remote events + * implementation. + * + * Implementations should extend this type and override the {@link #notifyUntyped(String, Map)} method + * to receive events. The set of events received can be altered by calling {@link #updateTargets(Map)}. + */ +public abstract class LocalEventConsumerManager implements UntypedEventHandler { + + /** + * A service property indicating that the event handler is a proxy created for a remote node and so + * should not be considered as a local interest. + */ + public static final String ARIES_LOCAL_EVENT_PROXY = "org.apache.aries.typedevent.remote.spi.local.proxy"; + + /** + * A filter to exclude local proxy interests from remote nodes + */ + public static final String ARIES_LOCAL_EVENT_PROXY_EXCLUSION_FILTER = "(!(" + ARIES_LOCAL_EVENT_PROXY + "=true))"; + + private final Object lock = new Object(); + private final Map<String, ServiceRegistration<UntypedEventHandler>> listenerRegistrations = new HashMap<>(); + private final Map<String, String> topicsToFilters = new HashMap<>(); + private BundleContext ctx; + + /** + * Starts this manager, registering any necessary whiteboard services with the + * appropriate topic and filters; + * @param ctx + */ + public final void start(BundleContext ctx) { + synchronized (lock) { + this.ctx = ctx; + } + updateServiceRegistrations(); + } + + /** + * Stops this manager, unregistering any whiteboard services + */ + public final void stop() { + synchronized (lock) { + this.ctx = null; + } + Map<String, ServiceRegistration<UntypedEventHandler>> toUnregister; + synchronized (lock) { + toUnregister = new HashMap<>(listenerRegistrations); + listenerRegistrations.clear(); + } + toUnregister.values().stream().forEach(this::safeUnregister); + } + + + private void updateServiceRegistrations() { + Map<String, String> possibleUpdates = new HashMap<String, String>(); + Map<String, ServiceRegistration<UntypedEventHandler>> toUnregister; + synchronized (lock) { + possibleUpdates = new HashMap<>(topicsToFilters); + toUnregister = listenerRegistrations.entrySet().stream() + .filter(e -> !topicsToFilters.containsKey(e.getKey())) + .collect(Collectors.toMap(Entry::getKey, Entry::getValue)); + listenerRegistrations.keySet().removeAll(toUnregister.keySet()); + } + + toUnregister.values().stream().forEach(this::safeUnregister); + + for (Entry<String, String> entry : possibleUpdates.entrySet()) { + + String topic = entry.getKey(); + String filter = entry.getValue(); + + ServiceRegistration<UntypedEventHandler> reg; + BundleContext ctx; + synchronized (lock) { + reg = listenerRegistrations.get(topic); + ctx = this.ctx; + } + + if(reg == null) { + if(ctx != null) { + Dictionary<String, Object> props = new Hashtable<>(); + props.put(TYPED_EVENT_TOPICS, topic); + props.put(ARIES_LOCAL_EVENT_PROXY, TRUE); + if(filter != null && !filter.contentEquals("")) { + props.put(TYPED_EVENT_FILTER, filter); + } + reg = ctx.registerService(UntypedEventHandler.class, this, props); + + synchronized (lock) { + ServiceRegistration<UntypedEventHandler> oldReg = listenerRegistrations.putIfAbsent(topic, reg); + if(oldReg == null) { + reg = null; + } + } + if(reg != null) { + reg.unregister(); + } + } + } else if(ctx != null) { + + Dictionary<String, Object> props = new Hashtable<>(); + props.put(TYPED_EVENT_TOPICS, topic); + props.put(ARIES_LOCAL_EVENT_PROXY, TRUE); + if(filter != null && !filter.contentEquals("")) { + if(filter.equals(reg.getReference().getProperty(TYPED_EVENT_FILTER))) { + // Filter unchanged - no need to update + continue; + } + props.put(TYPED_EVENT_FILTER, filter); + } else if (reg.getReference().getProperty(TYPED_EVENT_FILTER) == null) { + // Filter unchanged - no need to update + continue; + } + reg.setProperties(props); + } + } + + boolean changed; + synchronized (lock) { + changed = !possibleUpdates.equals(topicsToFilters); + } + if(changed) { + updateServiceRegistrations(); + } + } + + private void safeUnregister(ServiceRegistration<?> reg) { + try { + reg.unregister(); + } catch (IllegalStateException ise) { + // Just ignore it + } + } + + /** + * Set the topic and filter targets for which whiteboard listeners + * should be registered + * @param updated - A Map of topic names (or globs) to filters + */ + protected final void updateTargets(Map<String, String> updated) { + synchronized (lock) { + topicsToFilters.clear(); + topicsToFilters.putAll(updated); + } + + updateServiceRegistrations(); + } + +} diff --git a/org.apache.aries.typedevent.remote/org.apache.aries.typedevent.remote.spi/src/main/java/org/apache/aries/typedevent/remote/spi/RemoteEventMonitorImpl.java b/org.apache.aries.typedevent.remote/org.apache.aries.typedevent.remote.spi/src/main/java/org/apache/aries/typedevent/remote/spi/RemoteEventMonitorImpl.java new file mode 100644 index 0000000..61ef87f --- /dev/null +++ b/org.apache.aries.typedevent.remote/org.apache.aries.typedevent.remote.spi/src/main/java/org/apache/aries/typedevent/remote/spi/RemoteEventMonitorImpl.java @@ -0,0 +1,166 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.aries.typedevent.remote.spi; + +import static org.apache.aries.typedevent.remote.api.RemoteEventConstants.REMOTE_EVENT_MARKER; + +import java.time.Instant; +import java.util.AbstractMap; +import java.util.Arrays; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.SortedMap; +import java.util.TreeMap; +import java.util.regex.Pattern; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +import org.apache.aries.typedevent.remote.api.FilterDTO; +import org.apache.aries.typedevent.remote.api.RemoteEventMonitor; +import org.apache.aries.typedevent.remote.api.RemoteMonitorEvent; +import org.apache.aries.typedevent.remote.api.RemoteMonitorEvent.PublishType; +import org.osgi.framework.Filter; +import org.osgi.framework.FrameworkUtil; +import org.osgi.framework.InvalidSyntaxException; +import org.osgi.service.typedevent.monitor.MonitorEvent; +import org.osgi.service.typedevent.monitor.TypedEventMonitor; +import org.osgi.util.converter.Converters; +import org.osgi.util.function.Predicate; +import org.osgi.util.pushstream.PushStream; + +public class RemoteEventMonitorImpl implements RemoteEventMonitor { + + private final TypedEventMonitor monitor; + + public RemoteEventMonitorImpl(TypedEventMonitor monitor) { + + this.monitor = monitor; + } + + private static RemoteMonitorEvent toRemoteEvent(MonitorEvent event) { + + Object remoteMarker = event.eventData.get(REMOTE_EVENT_MARKER); + + RemoteMonitorEvent me = Converters.standardConverter().convert(event).sourceAsDTO().targetAsDTO().to(RemoteMonitorEvent.class); + me.publishType = Boolean.valueOf(String.valueOf(remoteMarker)) ? PublishType.REMOTE : PublishType.LOCAL; + + return me; + } + + @Override + public PushStream<RemoteMonitorEvent> monitorEvents(FilterDTO... filters) { + return monitorEvents(0, filters); + } + + @Override + public PushStream<RemoteMonitorEvent> monitorEvents(int history, FilterDTO...filters) { + return monitor.monitorEvents(history) + .map(RemoteEventMonitorImpl::toRemoteEvent) + .filter(createFilter(filters)); + } + + @Override + public PushStream<RemoteMonitorEvent> monitorEvents(Instant history, FilterDTO...filters) { + return monitor.monitorEvents(history) + .map(RemoteEventMonitorImpl::toRemoteEvent) + .filter(createFilter(filters)); + } + + private class FilterPair { + Filter ldap; + Pattern regex; + + FilterPair(FilterDTO filter) { + if (filter.ldapExpression != null && !filter.ldapExpression.isEmpty()) { + try { + ldap = FrameworkUtil.createFilter(filter.ldapExpression); + } catch (InvalidSyntaxException e) { + throw new IllegalArgumentException(e); + } + } + + if (filter.regularExpression != null && !filter.regularExpression.isEmpty()) { + regex = Pattern.compile(filter.regularExpression); + } + } + } + + private Predicate<RemoteMonitorEvent> createFilter(FilterDTO... filters) { + List<FilterPair> filterPairs = Arrays.asList(filters).stream() + .map(FilterPair::new).collect(Collectors.toList()); + + if (filterPairs.isEmpty()) { + return x -> true; + } + + return event -> { + // We use a TreeMap to ensure predictable ordering of keys + // This is important for the regex matching contract. + + SortedMap<String, Object> toFilter = new TreeMap<>(); + + // Using a collector blew up with null values, even though they are + // supported by the TreeMap + event.eventData.entrySet().stream() + .flatMap(e -> flatten("", e)) + .forEach(e -> toFilter.put(e.getKey(), e.getValue())); + + toFilter.put("-topic", event.topic); + toFilter.put("-publishType", event.publishType); + + StringBuilder eventText = new StringBuilder(); + + if (filterPairs.stream().anyMatch(p -> p.regex != null)) { + toFilter.forEach((k, v) -> { + eventText.append(k).append(':').append(v).append(','); + }); + } + + // If a FilterDTO contains both LDAP and regular expressions, then both must match. + return filterPairs.stream().anyMatch(p -> + (p.ldap == null || p.ldap.matches(toFilter)) && + (p.regex == null || p.regex.matcher(eventText).find()) + ); + }; + } + + private Stream<Entry<String, Object>> flatten(String parentScope, + Entry<String, Object> entry) { + + if (entry.getValue() instanceof Map) { + + String keyPrefix = parentScope + entry.getKey() + "."; + + @SuppressWarnings("unchecked") + Map<String, Object> subMap = (Map<String, Object>) entry.getValue(); + + // Recursively flatten maps that are inside our map + return subMap.entrySet().stream() + .flatMap(e -> flatten(keyPrefix, e)); + } else if(parentScope.isEmpty()) { + // Fast path for top-level entries + return Stream.of(entry); + } else { + // Map the key of a nested entry into x.y.z + return Stream.of(new AbstractMap.SimpleEntry<>( + parentScope + entry.getKey(), entry.getValue())); + } + + } + +} diff --git a/org.apache.aries.typedevent.remote/org.apache.aries.typedevent.remote.spi/src/main/java/org/apache/aries/typedevent/remote/spi/package-info.java b/org.apache.aries.typedevent.remote/org.apache.aries.typedevent.remote.spi/src/main/java/org/apache/aries/typedevent/remote/spi/package-info.java new file mode 100644 index 0000000..a08926c --- /dev/null +++ b/org.apache.aries.typedevent.remote/org.apache.aries.typedevent.remote.spi/src/main/java/org/apache/aries/typedevent/remote/spi/package-info.java @@ -0,0 +1,19 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ [email protected] [email protected]("0.0.1") +package org.apache.aries.typedevent.remote.spi; \ No newline at end of file diff --git a/org.apache.aries.typedevent.remote/pom.xml b/org.apache.aries.typedevent.remote/pom.xml new file mode 100644 index 0000000..79ff373 --- /dev/null +++ b/org.apache.aries.typedevent.remote/pom.xml @@ -0,0 +1,21 @@ +<project xmlns="http://maven.apache.org/POM/4.0.0" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <modelVersion>4.0.0</modelVersion> + <parent> + <groupId>org.apache.aries.typedevent</groupId> + <artifactId>typedevent-parent</artifactId> + <version>0.0.1-SNAPSHOT</version> + </parent> + + <groupId>org.apache.aries.typedevent.remote</groupId> + <artifactId>org.apache.aries.typedevent.remote</artifactId> + <packaging>pom</packaging> + + + <modules> + <module>org.apache.aries.typedevent.remote.api</module> + <module>org.apache.aries.typedevent.remote.spi</module> + <module>org.apache.aries.typedevent.remote.remoteservices</module> + </modules> +</project> \ No newline at end of file diff --git a/pom.xml b/pom.xml index dfb1c98..7e89a7d 100644 --- a/pom.xml +++ b/pom.xml @@ -45,6 +45,18 @@ <enabled>false</enabled> </releases> </repository> + <repository> + <id>Sonatype-snapshots</id> + <url>https://oss.sonatype.org/content/repositories/snapshots</url> + <snapshots> + <enabled>true</enabled> + <updatePolicy>daily</updatePolicy> + <checksumPolicy>ignore</checksumPolicy> + </snapshots> + <releases> + <enabled>false</enabled> + </releases> + </repository> </repositories> <properties> @@ -322,5 +334,6 @@ <modules> <module>typedevent-test-bom</module> <module>org.apache.aries.typedevent.bus</module> + <module>org.apache.aries.typedevent.remote</module> </modules> </project>
