This is an automated email from the ASF dual-hosted git repository. timothyjward pushed a commit to branch feature/v1.1 in repository https://gitbox.apache.org/repos/asf/aries-typedevent.git
commit 7b6220ac1c4b5d1736477789cdd259444896a39c Author: Tim Ward <[email protected]> AuthorDate: Wed Mar 6 15:58:40 2024 +0000 Add support for Records as events This commit makes the Aries Typed Event bundle a multi-release bundle which works on Java 8, but adds support for Record types on Java 16 or later. Event Publishers and Event Handlers can use Record types as normal. The build now requires Java 17 (the LTS release) as a result of this change. Signed-off-by: Tim Ward <[email protected]> --- org.apache.aries.typedevent.bus/bnd.bnd | 1 + org.apache.aries.typedevent.bus/pom.xml | 27 ++ .../aries/typedevent/bus/impl/EventConverter.java | 10 +- .../aries/typedevent/bus/impl/RecordConverter.java | 36 +++ .../aries/typedevent/bus/impl/TopicHistory.java | 1 - .../typedevent/bus/impl/TypedEventBusImpl.java | 2 +- .../aries/typedevent/bus/impl/RecordConverter.java | 102 ++++++++ .../typedevent/bus/osgi/RecordIntegrationTest.java | 272 +++++++++++++++++++++ pom.xml | 2 +- 9 files changed, 448 insertions(+), 5 deletions(-) diff --git a/org.apache.aries.typedevent.bus/bnd.bnd b/org.apache.aries.typedevent.bus/bnd.bnd new file mode 100644 index 0000000..6cd22a6 --- /dev/null +++ b/org.apache.aries.typedevent.bus/bnd.bnd @@ -0,0 +1 @@ +Multi-Release: true \ No newline at end of file diff --git a/org.apache.aries.typedevent.bus/pom.xml b/org.apache.aries.typedevent.bus/pom.xml index fc6c662..4cd7b3a 100644 --- a/org.apache.aries.typedevent.bus/pom.xml +++ b/org.apache.aries.typedevent.bus/pom.xml @@ -145,6 +145,33 @@ <groupId>biz.aQute.bnd</groupId> <artifactId>bnd-run-maven-plugin</artifactId> </plugin> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-compiler-plugin</artifactId> + <executions> + <execution> + <id>java16-compile</id> + <goals> + <goal>compile</goal> + </goals> + <configuration> + <compileSourceRoots>${project.basedir}/src/main/java16</compileSourceRoots> + <source>16</source> + <target>16</target> + <release>16</release> + <multiReleaseOutput>true</multiReleaseOutput> + </configuration> + </execution> + <execution> + <id>default-testCompile</id> + <configuration> + <source>16</source> + <target>16</target> + <release>16</release> + </configuration> + </execution> + </executions> + </plugin> </plugins> </build> </project> \ No newline at end of file diff --git a/org.apache.aries.typedevent.bus/src/main/java/org/apache/aries/typedevent/bus/impl/EventConverter.java b/org.apache.aries.typedevent.bus/src/main/java/org/apache/aries/typedevent/bus/impl/EventConverter.java index 93d0d6f..4ca11a5 100644 --- a/org.apache.aries.typedevent.bus/src/main/java/org/apache/aries/typedevent/bus/impl/EventConverter.java +++ b/org.apache.aries.typedevent.bus/src/main/java/org/apache/aries/typedevent/bus/impl/EventConverter.java @@ -61,7 +61,7 @@ public class EventConverter { }; private static final TypeReference<Set<Object>> SET_OF_OBJECTS = new TypeReference<Set<Object>>() { }; - private static final TypeReference<Map<String, Object>> MAP_WITH_STRING_KEYS = new TypeReference<Map<String, Object>>() { + static final TypeReference<Map<String, Object>> MAP_WITH_STRING_KEYS = new TypeReference<Map<String, Object>>() { }; private static final TypeReference<Map<Object, Object>> MAP_OF_OBJECT_TO_OBJECT = new TypeReference<Map<Object, Object>>() { }; @@ -100,10 +100,16 @@ public class EventConverter { specialClasses.add(ZonedDateTime.class); specialClasses.add(UUID.class); - eventConverter = Converters.standardConverter().newConverterBuilder().rule(EventConverter::convert) + eventConverter = Converters.standardConverter().newConverterBuilder() + .rule(EventConverter::convertRecord) + .rule(EventConverter::convert) .errorHandler(EventConverter::attemptRecovery).build(); } + static Object convertRecord(Object o, Type target) { + return RecordConverter.convert(eventConverter, o, target); + } + static Object convert(Object o, Type target) { if (target != Object.class || o == null) { diff --git a/org.apache.aries.typedevent.bus/src/main/java/org/apache/aries/typedevent/bus/impl/RecordConverter.java b/org.apache.aries.typedevent.bus/src/main/java/org/apache/aries/typedevent/bus/impl/RecordConverter.java new file mode 100644 index 0000000..ef0a516 --- /dev/null +++ b/org.apache.aries.typedevent.bus/src/main/java/org/apache/aries/typedevent/bus/impl/RecordConverter.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.aries.typedevent.bus.impl; + +import java.lang.reflect.Type; + +import org.osgi.util.converter.Converter; +import org.osgi.util.converter.ConverterFunction; + +/** + * This class is responsible for converting Record events to and from their + * "flattened" representations. As Java 8 doesn't support Records this is + * not handled + */ +public class RecordConverter { + + static Object convert(Converter converter, Object o, Type target) { + return ConverterFunction.CANNOT_HANDLE; + } + +} \ No newline at end of file diff --git a/org.apache.aries.typedevent.bus/src/main/java/org/apache/aries/typedevent/bus/impl/TopicHistory.java b/org.apache.aries.typedevent.bus/src/main/java/org/apache/aries/typedevent/bus/impl/TopicHistory.java index 8e5703e..4dbceff 100644 --- a/org.apache.aries.typedevent.bus/src/main/java/org/apache/aries/typedevent/bus/impl/TopicHistory.java +++ b/org.apache.aries.typedevent.bus/src/main/java/org/apache/aries/typedevent/bus/impl/TopicHistory.java @@ -20,7 +20,6 @@ package org.apache.aries.typedevent.bus.impl; import java.util.ArrayDeque; import java.util.ArrayList; import java.util.Deque; -import java.util.Iterator; import java.util.List; import java.util.Map.Entry; diff --git a/org.apache.aries.typedevent.bus/src/main/java/org/apache/aries/typedevent/bus/impl/TypedEventBusImpl.java b/org.apache.aries.typedevent.bus/src/main/java/org/apache/aries/typedevent/bus/impl/TypedEventBusImpl.java index 2f7d360..a056d0f 100644 --- a/org.apache.aries.typedevent.bus/src/main/java/org/apache/aries/typedevent/bus/impl/TypedEventBusImpl.java +++ b/org.apache.aries.typedevent.bus/src/main/java/org/apache/aries/typedevent/bus/impl/TypedEventBusImpl.java @@ -154,7 +154,7 @@ public class TypedEventBusImpl implements TypedEventBus { } } else { Class<?> toCheck = handler.getClass(); - outer: while(clazz == null) { + outer: while(clazz == null && toCheck != null) { clazz = findDirectlyImplemented(toCheck); if(clazz != null) { diff --git a/org.apache.aries.typedevent.bus/src/main/java16/org/apache/aries/typedevent/bus/impl/RecordConverter.java b/org.apache.aries.typedevent.bus/src/main/java16/org/apache/aries/typedevent/bus/impl/RecordConverter.java new file mode 100644 index 0000000..1e32c3c --- /dev/null +++ b/org.apache.aries.typedevent.bus/src/main/java16/org/apache/aries/typedevent/bus/impl/RecordConverter.java @@ -0,0 +1,102 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.aries.typedevent.bus.impl; + +import static java.util.stream.Collectors.toMap; + +import java.lang.reflect.RecordComponent; +import java.lang.reflect.Type; +import java.util.Arrays; +import java.util.Map; + +import org.osgi.util.converter.ConversionException; +import org.osgi.util.converter.Converter; +import org.osgi.util.converter.ConverterFunction; + +/** + * This class is responsible for converting Record events to and from their + * "flattened" representations. This version runs on Java 17 + */ +public class RecordConverter { + + static Object convert(Converter converter, Object o, Type target) { + + if (Record.class.isInstance(o)) { + RecordComponent[] sourceComponents = o.getClass().getRecordComponents(); + + if(target instanceof Class<?> clz && Record.class.isAssignableFrom(clz)) { + RecordComponent[] targetComponents = clz.getRecordComponents(); + Object[] args = new Object[targetComponents.length]; + Class<?>[] argTypes = new Class<?>[targetComponents.length]; + for(int i = 0; i < targetComponents.length; i++) { + RecordComponent targetComponent = targetComponents[i]; + String name = targetComponent.getName(); + Object arg = null; + for(int j = 0; j < sourceComponents.length; j++) { + if(sourceComponents[j].getName().equals(name)) { + Object sourceArg = getComponentValue(sourceComponents[j], o); + Type targetArgType = targetComponent.getGenericType(); + arg = converter.convert(sourceArg).to(targetArgType); + break; + } + } + args[i] = arg; + argTypes[i] = targetComponent.getType(); + } + return createRecord(clz, args, argTypes); + } else { + Map<String, Object> converted = Arrays.stream(sourceComponents) + .collect(toMap(RecordComponent::getName, rc -> getComponentValue(rc, o))); + + return converter.convert(converted).to(target); + } + } else if(target instanceof Class<?> clz && Record.class.isAssignableFrom(clz)) { + Map<String, Object> intermediate = converter.convert(o).to(EventConverter.MAP_WITH_STRING_KEYS); + RecordComponent[] targetComponents = clz.getRecordComponents(); + Object[] args = new Object[targetComponents.length]; + Class<?>[] argTypes = new Class<?>[targetComponents.length]; + for(int i = 0; i < targetComponents.length; i++) { + RecordComponent targetComponent = targetComponents[i]; + Object sourceArg = intermediate.get(targetComponent.getName()); + Type targetArgType = targetComponent.getGenericType(); + args[i] = converter.convert(sourceArg).to(targetArgType); + argTypes[i] = targetComponent.getType(); + } + return createRecord(clz, args, argTypes); + } + + return ConverterFunction.CANNOT_HANDLE; + + } + + private static Object createRecord(Class<?> clz, Object[] args, Class<?>[] argTypes) { + try { + return clz.getDeclaredConstructor(argTypes).newInstance(args); + } catch (Exception e) { + throw new ConversionException("Unable to instantiate record component " + clz.getName(), e); + } + } + + private static Object getComponentValue(RecordComponent rc, Object o) { + try { + return rc.getAccessor().invoke(o); + } catch (Exception e) { + throw new ConversionException("Unable to process record component " + rc.getName() + " from type " + rc.getDeclaringRecord().getName(), e); + } + } +} \ No newline at end of file diff --git a/org.apache.aries.typedevent.bus/src/test/java/org/apache/aries/typedevent/bus/osgi/RecordIntegrationTest.java b/org.apache.aries.typedevent.bus/src/test/java/org/apache/aries/typedevent/bus/osgi/RecordIntegrationTest.java new file mode 100644 index 0000000..8b5d69d --- /dev/null +++ b/org.apache.aries.typedevent.bus/src/test/java/org/apache/aries/typedevent/bus/osgi/RecordIntegrationTest.java @@ -0,0 +1,272 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or eventBusied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.aries.typedevent.bus.osgi; + +import static org.osgi.service.typedevent.TypedEventConstants.TYPED_EVENT_FILTER; +import static org.osgi.service.typedevent.TypedEventConstants.TYPED_EVENT_TOPICS; + +import java.util.Dictionary; +import java.util.Hashtable; + +import org.apache.aries.typedevent.bus.common.TestEvent; +import org.apache.aries.typedevent.bus.common.TestEventConsumer; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.ArgumentMatcher; +import org.mockito.Mock; +import org.mockito.Mockito; +import org.mockito.junit.jupiter.MockitoExtension; +import org.osgi.framework.BundleContext; +import org.osgi.service.typedevent.TypedEventBus; +import org.osgi.service.typedevent.TypedEventHandler; +import org.osgi.test.common.annotation.InjectBundleContext; +import org.osgi.test.common.annotation.InjectService; +import org.osgi.test.junit5.context.BundleContextExtension; +import org.osgi.test.junit5.service.ServiceExtension; + +/** + * This is a JUnit test that will be run inside an OSGi framework. + * + * It can interact with the framework by starting or stopping bundles, + * getting or registering services, or in other ways, and then observing + * the result on the bundle(s) being tested. + */ +@ExtendWith(BundleContextExtension.class) +@ExtendWith(ServiceExtension.class) +@ExtendWith(MockitoExtension.class) +public class RecordIntegrationTest extends AbstractIntegrationTest { + + private static final String TOPIC = "org/apache/aries/test/record"; + + @InjectBundleContext + BundleContext context; + + @InjectService + TypedEventBus eventBus; + + @Mock + TestEventConsumer typedEventHandler; + + @Mock + TestRecordListener recordEventHandler; + + @Test + public void testUnFilteredListenerEventToRecord() throws Exception { + Dictionary<String, Object> props = new Hashtable<>(); + props.put(TYPED_EVENT_TOPICS, TOPIC); + + regs.add(context.registerService(TypedEventHandler.class, typedEventHandler, props)); + + props = new Hashtable<>(); + props.put(TYPED_EVENT_TOPICS, TOPIC); + + regs.add(context.registerService(TypedEventHandler.class, recordEventHandler, props)); + + // Event to record + + TestEvent event = new TestEvent(); + event.message = "foo"; + + eventBus.deliver(TOPIC, event); + + Mockito.verify(typedEventHandler, Mockito.timeout(1000)) + .notify(Mockito.eq(TOPIC), Mockito.argThat(isTestEventWithMessage("foo"))); + + Mockito.verify(recordEventHandler, Mockito.timeout(1000)) + .notify(Mockito.eq(TOPIC), Mockito.argThat(isTestRecordWithMessage("foo"))); + } + + @Test + public void testUnFilteredListenerRecordToEvent() throws Exception { + Dictionary<String, Object> props = new Hashtable<>(); + props.put(TYPED_EVENT_TOPICS, TOPIC); + + regs.add(context.registerService(TypedEventHandler.class, typedEventHandler, props)); + + props = new Hashtable<>(); + props.put(TYPED_EVENT_TOPICS, TOPIC); + + regs.add(context.registerService(TypedEventHandler.class, recordEventHandler, props)); + + // Record to Event + TestRecord testRecord = new TestRecord("bar"); + + eventBus.deliver(TOPIC, testRecord); + + Mockito.verify(typedEventHandler, Mockito.timeout(1000)) + .notify(Mockito.eq(TOPIC), Mockito.argThat(isTestEventWithMessage("bar"))); + + Mockito.verify(recordEventHandler, Mockito.timeout(1000)) + .notify(Mockito.eq(TOPIC), Mockito.argThat(isTestRecordWithMessage("bar"))); + + } + + @Test + public void testUnFilteredListenerRecordToRecord() throws Exception { + Dictionary<String, Object> props = new Hashtable<>(); + props.put(TYPED_EVENT_TOPICS, TOPIC); + + regs.add(context.registerService(TypedEventHandler.class, typedEventHandler, props)); + + props = new Hashtable<>(); + props.put(TYPED_EVENT_TOPICS, TOPIC); + + regs.add(context.registerService(TypedEventHandler.class, recordEventHandler, props)); + + // Record to Record + TestRecord2 testRecord2 = new TestRecord2("foobar", 5); + + eventBus.deliver(TOPIC, testRecord2); + + Mockito.verify(typedEventHandler, Mockito.timeout(1000)) + .notify(Mockito.eq(TOPIC), Mockito.argThat(isTestEventWithMessage("foobar"))); + + Mockito.verify(recordEventHandler, Mockito.timeout(1000)) + .notify(Mockito.eq(TOPIC), Mockito.argThat(isTestRecordWithMessage("foobar"))); + + } + + @Test + public void testFilteredListenerEventToRecord() throws Exception { + Dictionary<String, Object> props = new Hashtable<>(); + props.put(TYPED_EVENT_FILTER, "(message=foo)"); + props.put(TYPED_EVENT_TOPICS, TOPIC); + + regs.add(context.registerService(TypedEventHandler.class, typedEventHandler, props)); + + props = new Hashtable<>(); + props.put(TYPED_EVENT_FILTER, "(message=bar)"); + props.put(TYPED_EVENT_TOPICS, TOPIC); + + regs.add(context.registerService(TypedEventHandler.class, recordEventHandler, props)); + + // Event to record + + TestEvent event = new TestEvent(); + event.message = "foo"; + + eventBus.deliver(TOPIC, event); + + Mockito.verify(typedEventHandler, Mockito.timeout(1000)) + .notify(Mockito.eq(TOPIC), Mockito.argThat(isTestEventWithMessage("foo"))); + + Mockito.verify(recordEventHandler, Mockito.after(1000).never()) + .notify(Mockito.eq(TOPIC), Mockito.argThat(isTestRecordWithMessage("foo"))); + + + event = new TestEvent(); + event.message = "bar"; + + eventBus.deliver(TOPIC, event); + + Mockito.verify(recordEventHandler, Mockito.timeout(1000)) + .notify(Mockito.eq(TOPIC), Mockito.argThat(isTestRecordWithMessage("bar"))); + + Mockito.verify(typedEventHandler, Mockito.after(1000).never()) + .notify(Mockito.eq(TOPIC), Mockito.argThat(isTestEventWithMessage("bar"))); + } + + @Test + public void testFilteredListenerRecordToEvent() throws Exception { + Dictionary<String, Object> props = new Hashtable<>(); + props.put(TYPED_EVENT_FILTER, "(message=foo)"); + props.put(TYPED_EVENT_TOPICS, TOPIC); + + regs.add(context.registerService(TypedEventHandler.class, typedEventHandler, props)); + + props = new Hashtable<>(); + props.put(TYPED_EVENT_FILTER, "(message=bar)"); + props.put(TYPED_EVENT_TOPICS, TOPIC); + + regs.add(context.registerService(TypedEventHandler.class, recordEventHandler, props)); + + // Record to Event + TestRecord testRecord = new TestRecord("foo"); + + eventBus.deliver(TOPIC, testRecord); + + Mockito.verify(typedEventHandler, Mockito.timeout(1000)) + .notify(Mockito.eq(TOPIC), Mockito.argThat(isTestEventWithMessage("foo"))); + + Mockito.verify(recordEventHandler, Mockito.after(1000).never()) + .notify(Mockito.eq(TOPIC), Mockito.argThat(isTestRecordWithMessage("foo"))); + + + testRecord = new TestRecord("bar"); + + eventBus.deliver(TOPIC, testRecord); + + Mockito.verify(recordEventHandler, Mockito.timeout(1000)) + .notify(Mockito.eq(TOPIC), Mockito.argThat(isTestRecordWithMessage("bar"))); + + Mockito.verify(typedEventHandler, Mockito.after(1000).never()) + .notify(Mockito.eq(TOPIC), Mockito.argThat(isTestEventWithMessage("bar"))); + } + + @Test + public void testFilteredListenerRecordToRecord() throws Exception { + Dictionary<String, Object> props = new Hashtable<>(); + props.put(TYPED_EVENT_FILTER, "(message=foo)"); + props.put(TYPED_EVENT_TOPICS, TOPIC); + + regs.add(context.registerService(TypedEventHandler.class, typedEventHandler, props)); + + props = new Hashtable<>(); + props.put(TYPED_EVENT_FILTER, "(message=bar)"); + props.put(TYPED_EVENT_TOPICS, TOPIC); + + regs.add(context.registerService(TypedEventHandler.class, recordEventHandler, props)); + + // Record to Record + TestRecord2 testRecord2 = new TestRecord2("foo", 5); + + eventBus.deliver(TOPIC, testRecord2); + + Mockito.verify(typedEventHandler, Mockito.timeout(1000)) + .notify(Mockito.eq(TOPIC), Mockito.argThat(isTestEventWithMessage("foo"))); + + Mockito.verify(recordEventHandler, Mockito.after(1000).never()) + .notify(Mockito.eq(TOPIC), Mockito.argThat(isTestRecordWithMessage("foo"))); + + + testRecord2 = new TestRecord2("bar", 5); + + eventBus.deliver(TOPIC, testRecord2); + + Mockito.verify(recordEventHandler, Mockito.timeout(1000)) + .notify(Mockito.eq(TOPIC), Mockito.argThat(isTestRecordWithMessage("bar"))); + + Mockito.verify(typedEventHandler, Mockito.after(1000).never()) + .notify(Mockito.eq(TOPIC), Mockito.argThat(isTestEventWithMessage("bar"))); + } + + public interface TestRecordListener extends TypedEventHandler<TestRecord> {} + + public record TestRecord(String message) {} + + public record TestRecord2(String message, int count) {} + + protected ArgumentMatcher<TestRecord> isTestRecordWithMessage(String message) { + return new ArgumentMatcher<TestRecord>() { + + @Override + public boolean matches(TestRecord argument) { + return message.equals(argument.message); + } + }; + } +} \ No newline at end of file diff --git a/pom.xml b/pom.xml index 6884091..ecd1266 100644 --- a/pom.xml +++ b/pom.xml @@ -58,7 +58,7 @@ </repositories> <properties> - <bnd.version>6.4.0</bnd.version> + <bnd.version>7.0.0</bnd.version> <dsl.version>1.2.2</dsl.version> <junit.version>5.10.0</junit.version> <mockito.version>5.5.0</mockito.version>
