This is an automated email from the ASF dual-hosted git repository.
timothyjward pushed a commit to branch feature/v1.1
in repository https://gitbox.apache.org/repos/asf/aries-typedevent.git
The following commit(s) were added to refs/heads/feature/v1.1 by this push:
new 8ed6ad3 Add support for an event conversion plugin
8ed6ad3 is described below
commit 8ed6ad311a570ace2737e3ec7cbbded855c6c9c3
Author: Tim Ward <[email protected]>
AuthorDate: Thu Dec 12 18:25:54 2024 +0000
Add support for an event conversion plugin
Event Conversion is left up to the implementation, and Aries uses the OSGi
converter. There are, however, users that want to use richer types than are
supported by the specification. They should be able to register an override
which lets them perform the conversion that they need.
Forward ported from the main branch
Signed-off-by: Tim Ward <[email protected]>
(cherry picked from commit 262c1c1861bf439b10bf2cc501f577ea47e8465b)
---
.../aries/typedevent/bus/impl/EventConverter.java | 38 +++++--
.../{TypedHistoryReplayTask.java => TypeData.java} | 43 ++++---
.../bus/impl/TypedEventBusActivator.java | 5 +-
.../typedevent/bus/impl/TypedEventBusImpl.java | 95 ++++++++++------
.../aries/typedevent/bus/impl/TypedEventTask.java | 4 +-
.../bus/impl/TypedHistoryReplayTask.java | 11 +-
.../aries/typedevent/bus/spi/AriesTypedEvents.java | 45 ++++++++
.../typedevent/bus/spi/CustomEventConverter.java | 45 ++++++++
.../aries/typedevent/bus/spi/package-info.java | 18 +++
.../typedevent/bus/impl/EventConverterTest.java | 124 ++++++++++++++++++---
.../bus/osgi/EventDeliveryIntegrationTest.java | 48 ++++++++
11 files changed, 394 insertions(+), 82 deletions(-)
diff --git
a/org.apache.aries.typedevent.bus/src/main/java/org/apache/aries/typedevent/bus/impl/EventConverter.java
b/org.apache.aries.typedevent.bus/src/main/java/org/apache/aries/typedevent/bus/impl/EventConverter.java
index 4ca11a5..c9419db 100644
---
a/org.apache.aries.typedevent.bus/src/main/java/org/apache/aries/typedevent/bus/impl/EventConverter.java
+++
b/org.apache.aries.typedevent.bus/src/main/java/org/apache/aries/typedevent/bus/impl/EventConverter.java
@@ -43,6 +43,7 @@ import java.util.Map;
import java.util.Set;
import java.util.UUID;
+import org.apache.aries.typedevent.bus.spi.CustomEventConverter;
import org.osgi.framework.Filter;
import org.osgi.util.converter.Converter;
import org.osgi.util.converter.ConverterFunction;
@@ -194,24 +195,27 @@ public class EventConverter {
}
private final Object originalEvent;
+ private final CustomEventConverter custom;
private Map<String, ?> untypedEventDataForFiltering;
- private EventConverter(Object event) {
+ private EventConverter(Object event, CustomEventConverter custom) {
this.originalEvent = event;
+ this.custom = custom;
}
- private EventConverter(Map<String, ?> untypedEvent) {
+ private EventConverter(Map<String, ?> untypedEvent, CustomEventConverter
custom) {
this.originalEvent = untypedEvent;
+ this.custom = custom;
this.untypedEventDataForFiltering = untypedEvent;
}
- public static EventConverter forTypedEvent(Object event) {
- return new EventConverter(event);
+ public static EventConverter forTypedEvent(Object event,
CustomEventConverter custom) {
+ return new EventConverter(event, custom);
}
- public static EventConverter forUntypedEvent(Map<String, ?> event) {
- return new EventConverter(event);
+ public static EventConverter forUntypedEvent(Map<String, ?> event,
CustomEventConverter custom) {
+ return new EventConverter(event, custom);
}
public boolean applyFilter(Filter f) {
@@ -227,16 +231,28 @@ public class EventConverter {
public Map<String, ?> toUntypedEvent() {
if (untypedEventDataForFiltering == null) {
- untypedEventDataForFiltering =
eventConverter.convert(originalEvent).sourceAsDTO().to(MAP_WITH_STRING_KEYS);
+ if(custom == null ||
+ (untypedEventDataForFiltering =
custom.toUntypedEvent(originalEvent)) == null ) {
+ untypedEventDataForFiltering =
eventConverter.convert(originalEvent).sourceAsDTO().to(MAP_WITH_STRING_KEYS);
+ }
}
return untypedEventDataForFiltering;
}
- public <T> T toTypedEvent(Class<T> clazz) {
- if (clazz.isInstance(originalEvent)) {
- return clazz.cast(originalEvent);
+ @SuppressWarnings("unchecked")
+ public <T> T toTypedEvent(TypeData targetEventClass) {
+ Class<?> rawType = targetEventClass.getRawType();
+ Type genericTarget = targetEventClass.getType();
+ if (rawType.isInstance(originalEvent) && rawType ==
genericTarget) {
+ return (T) originalEvent;
} else {
- return
eventConverter.convert(originalEvent).targetAsDTO().to(clazz);
+ if(custom != null) {
+ Object result =
custom.toTypedEvent(originalEvent, rawType, genericTarget);
+ if(result != null) {
+ return (T) result;
+ }
+ }
+ return
eventConverter.convert(originalEvent).targetAsDTO().to(genericTarget);
}
}
diff --git
a/org.apache.aries.typedevent.bus/src/main/java/org/apache/aries/typedevent/bus/impl/TypedHistoryReplayTask.java
b/org.apache.aries.typedevent.bus/src/main/java/org/apache/aries/typedevent/bus/impl/TypeData.java
similarity index 52%
copy from
org.apache.aries.typedevent.bus/src/main/java/org/apache/aries/typedevent/bus/impl/TypedHistoryReplayTask.java
copy to
org.apache.aries.typedevent.bus/src/main/java/org/apache/aries/typedevent/bus/impl/TypeData.java
index 7525fd2..af58df0 100644
---
a/org.apache.aries.typedevent.bus/src/main/java/org/apache/aries/typedevent/bus/impl/TypedHistoryReplayTask.java
+++
b/org.apache.aries.typedevent.bus/src/main/java/org/apache/aries/typedevent/bus/impl/TypeData.java
@@ -14,28 +14,39 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.aries.typedevent.bus.impl;
-
-import java.util.List;
-import org.osgi.service.typedevent.TypedEventHandler;
-import org.osgi.service.typedevent.monitor.MonitorEvent;
+package org.apache.aries.typedevent.bus.impl;
-public class TypedHistoryReplayTask extends HistoryReplayTask {
+import java.lang.reflect.ParameterizedType;
+import java.lang.reflect.Type;
- private final TypedEventHandler<Object> handler;
- private final Class<?> eventType;
+/**
+ * The generic type information for the event data
+ */
+public final class TypeData {
+
+ private final Class<?> rawType;
+
+ private final Type type;
- @SuppressWarnings("unchecked")
- public TypedHistoryReplayTask(TypedEventMonitorImpl monitorImpl,
TypedEventHandler<?> handler, Class<?> eventType, List<EventSelector>
selectors, Integer history) {
- super(monitorImpl, selectors, history);
- this.handler = (TypedEventHandler<Object>) handler;
- this.eventType = eventType;
+ public TypeData(Type type) {
+ super();
+ this.type = type;
+ if(type instanceof Class) {
+ this.rawType = (Class<?>) type;
+ } else if (type instanceof ParameterizedType) {
+ this.rawType = (Class<?>) ((ParameterizedType)
type).getRawType();
+ } else {
+ throw new IllegalArgumentException("The type " + type +
+ " is not acceptable. Must be a raw
Class or Parameterized Type");
+ }
}
- @Override
- protected void notifyListener(MonitorEvent me) {
- handler.notify(me.topic, (Object)
EventConverter.forUntypedEvent(me.eventData).toTypedEvent(eventType));
+ public Class<?> getRawType() {
+ return rawType;
}
+ public Type getType() {
+ return type;
+ }
}
diff --git
a/org.apache.aries.typedevent.bus/src/main/java/org/apache/aries/typedevent/bus/impl/TypedEventBusActivator.java
b/org.apache.aries.typedevent.bus/src/main/java/org/apache/aries/typedevent/bus/impl/TypedEventBusActivator.java
index 43a7b04..28468d0 100644
---
a/org.apache.aries.typedevent.bus/src/main/java/org/apache/aries/typedevent/bus/impl/TypedEventBusActivator.java
+++
b/org.apache.aries.typedevent.bus/src/main/java/org/apache/aries/typedevent/bus/impl/TypedEventBusActivator.java
@@ -38,6 +38,7 @@ import java.util.stream.Collectors;
import org.apache.aries.component.dsl.OSGi;
import org.apache.aries.component.dsl.OSGiResult;
+import org.apache.aries.typedevent.bus.spi.AriesTypedEvents;
import org.osgi.annotation.bundle.Header;
import org.osgi.framework.BundleActivator;
import org.osgi.framework.BundleContext;
@@ -123,7 +124,9 @@ public class TypedEventBusActivator implements
BundleActivator {
getServiceProps(csr.getServiceReference())),
handler ->
tebi.removeUnhandledEventHandler(handler,
getServiceProps(csr.getServiceReference())))),
- register(TypedEventBus.class, tebi,
serviceProps)
+ register(new String[] {
+
TypedEventBus.class.getName(), AriesTypedEvents.class.getName()
+ }, tebi, serviceProps)
.flatMap(x -> nothing())));
diff --git
a/org.apache.aries.typedevent.bus/src/main/java/org/apache/aries/typedevent/bus/impl/TypedEventBusImpl.java
b/org.apache.aries.typedevent.bus/src/main/java/org/apache/aries/typedevent/bus/impl/TypedEventBusImpl.java
index c204649..9436d0f 100644
---
a/org.apache.aries.typedevent.bus/src/main/java/org/apache/aries/typedevent/bus/impl/TypedEventBusImpl.java
+++
b/org.apache.aries.typedevent.bus/src/main/java/org/apache/aries/typedevent/bus/impl/TypedEventBusImpl.java
@@ -29,6 +29,7 @@ import static
org.osgi.service.typedevent.TypedEventConstants.TYPED_EVENT_TOPICS
import static org.osgi.util.converter.Converters.standardConverter;
import java.lang.reflect.ParameterizedType;
+import java.lang.reflect.Type;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
@@ -43,6 +44,8 @@ import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.BiFunction;
import java.util.function.Function;
+import org.apache.aries.typedevent.bus.spi.AriesTypedEvents;
+import org.apache.aries.typedevent.bus.spi.CustomEventConverter;
import org.osgi.annotation.bundle.Capability;
import org.osgi.framework.Bundle;
import org.osgi.framework.Constants;
@@ -59,7 +62,7 @@ import org.osgi.util.converter.TypeReference;
@Capability(namespace=SERVICE_NAMESPACE,
attribute="objectClass:List<String>=org.osgi.service.typedevent.TypedEventBus",
uses=TypedEventBus.class)
@Capability(namespace=IMPLEMENTATION_NAMESPACE,
name=TYPED_EVENT_IMPLEMENTATION, version=TYPED_EVENT_SPECIFICATION_VERSION,
uses=TypedEventBus.class)
-public class TypedEventBusImpl implements TypedEventBus {
+public class TypedEventBusImpl implements TypedEventBus, AriesTypedEvents {
private static final TypeReference<List<String>> LIST_OF_STRINGS = new
TypeReference<List<String>>() {
};
@@ -84,7 +87,7 @@ public class TypedEventBusImpl implements TypedEventBus {
* Map access and mutation must be synchronized on {@link #lock}. Values
from
* the map should be copied as the contents are not thread safe.
*/
- private final Map<TypedEventHandler<?>, Class<?>>
typedHandlersToTargetClasses = new HashMap<>();
+ private final Map<TypedEventHandler<?>, TypeData>
typedHandlersToTargetClasses = new HashMap<>();
/**
* Map access and mutation must be synchronized on {@link #lock}. Values
from
@@ -144,41 +147,59 @@ public class TypedEventBusImpl implements TypedEventBus {
private final Object threadLock = new Object();
+ /**
+ * Access and mutation must be synchronized on {@link #lock}.
+ */
+ private CustomEventConverter customEventConverter;
+
public TypedEventBusImpl(TypedEventMonitorImpl monitorImpl, Map<String, ?>
config) {
this.monitorImpl = monitorImpl;
}
+ public void registerGlobalEventConverter(CustomEventConverter converter,
boolean force) {
+ synchronized (lock) {
+ if(customEventConverter != null && !force) {
+ throw new IllegalStateException("A custom converter is
already set");
+ }
+ customEventConverter = converter;
+ }
+ }
+
void addTypedEventHandler(Bundle registeringBundle, TypedEventHandler<?>
handler, Map<String, Object> properties) {
- Class<?> clazz = discoverTypeForTypedHandler(registeringBundle,
handler, properties);
+ TypeData genType = discoverTypeForTypedHandler(registeringBundle,
handler, properties);
- String defaultTopic = clazz == null ? null :
clazz.getName().replace(".", "/");
+ String defaultTopic = genType == null ? null :
genType.getRawType().getName().replace(".", "/");
- doAddEventHandler(topicsToTypedHandlers,
wildcardTopicsToTypedHandlers, knownTypedHandlers, handler, defaultTopic,
properties,
- (l,i) -> new TypedHistoryReplayTask(monitorImpl,
handler, clazz, l, i));
+ // This function must only ever be called while holding #lock
+ BiFunction<List<EventSelector>, Integer, ? extends EventTask>
replayTaskFactory =
+ (l,i) -> new TypedHistoryReplayTask(monitorImpl,
customEventConverter, handler, genType, l, i);
+
+ doAddEventHandler(topicsToTypedHandlers,
wildcardTopicsToTypedHandlers, knownTypedHandlers, handler, defaultTopic,
properties,
+ replayTaskFactory);
}
- private Class<?> discoverTypeForTypedHandler(Bundle registeringBundle,
TypedEventHandler<?> handler, Map<String, Object> properties) {
- Class<?> clazz = null;
+ private TypeData discoverTypeForTypedHandler(Bundle registeringBundle,
TypedEventHandler<?> handler, Map<String, Object> properties) {
+ Type genType = null;
Object type = properties.get(TypedEventConstants.TYPED_EVENT_TYPE);
if (type != null) {
try {
- clazz = registeringBundle.loadClass(String.valueOf(type));
+ genType = registeringBundle.loadClass(String.valueOf(type));
} catch (ClassNotFoundException e) {
// TODO Blow up
e.printStackTrace();
}
} else {
Class<?> toCheck = handler.getClass();
- outer: while(clazz == null && toCheck != null) {
- clazz = findDirectlyImplemented(toCheck);
+ outer: while(genType == null && toCheck != null) {
+ genType = findDirectlyImplemented(toCheck);
- if(clazz != null) {
+ if(genType != null) {
break outer;
}
- clazz = processInterfaceHierarchyForClass(toCheck);
+ genType = processInterfaceHierarchyForClass(toCheck);
- if(clazz != null) {
+ if(genType != null) {
break outer;
}
@@ -186,40 +207,42 @@ public class TypedEventBusImpl implements TypedEventBus {
}
}
- if (clazz != null) {
+ if (genType != null) {
+ TypeData typeData = new TypeData(genType);
synchronized (lock) {
- typedHandlersToTargetClasses.put(handler, clazz);
+ typedHandlersToTargetClasses.put(handler,
typeData);
}
+ return typeData;
} else {
- // TODO Blow Up
+ // TODO log
+ throw new IllegalArgumentException("Unable to determine handled
type for " + handler);
}
- return clazz;
}
- private Class<?> processInterfaceHierarchyForClass(Class<?> toCheck) {
- Class<?> clazz = null;
+ private Type processInterfaceHierarchyForClass(Class<?> toCheck) {
+ Type type = null;
for (Class<?> iface : toCheck.getInterfaces()) {
- clazz = findDirectlyImplemented(iface);
+ type = findDirectlyImplemented(iface);
- if(clazz != null) {
+ if(type != null) {
break;
}
- clazz = processInterfaceHierarchyForClass(iface);
+ type = processInterfaceHierarchyForClass(iface);
- if(clazz != null) {
+ if(type != null) {
break;
}
}
- return clazz;
+ return type;
}
- private Class<?> findDirectlyImplemented(Class<?> toCheck) {
+ private Type findDirectlyImplemented(Class<?> toCheck) {
return Arrays.stream(toCheck.getGenericInterfaces())
.filter(ParameterizedType.class::isInstance)
.map(ParameterizedType.class::cast)
.filter(t -> TypedEventHandler.class.equals(t.getRawType())).map(t
-> t.getActualTypeArguments()[0])
- .findFirst().map(Class.class::cast).orElse(null);
+ .findFirst().orElse(null);
}
void addUntypedEventHandler(UntypedEventHandler handler, Map<String,
Object> properties) {
@@ -380,9 +403,9 @@ public class TypedEventBusImpl implements TypedEventBus {
handler = knownTypedHandlers.get(serviceId);
}
- Class<?> clazz = discoverTypeForTypedHandler(registeringBundle,
handler, properties);
+ TypeData genType = discoverTypeForTypedHandler(registeringBundle,
handler, properties);
- String defaultTopic = clazz == null ? null :
clazz.getName().replace(".", "/");
+ String defaultTopic = genType == null ? null :
genType.getRawType().getName().replace(".", "/");
doUpdatedEventHandler(topicsToTypedHandlers,
wildcardTopicsToTypedHandlers, knownTypedHandlers, defaultTopic, properties);
}
@@ -478,21 +501,25 @@ public class TypedEventBusImpl implements TypedEventBus {
public void deliver(String topic, Object event) {
checkTopicSyntax(topic);
Objects.requireNonNull(event, "The event object must not be null");
- deliver(topic, EventConverter.forTypedEvent(event));
+ deliver(topic, event, EventConverter::forTypedEvent);
}
@Override
public void deliverUntyped(String topic, Map<String, ?> eventData) {
checkTopicSyntax(topic);
Objects.requireNonNull(eventData, "The event object must not be null");
- deliver(topic, EventConverter.forUntypedEvent(eventData));
+ deliver(topic, eventData, EventConverter::forUntypedEvent);
}
- private void deliver(String topic, EventConverter convertibleEventData) {
+ private <T> void deliver(String topic, T event,
+ BiFunction<T, CustomEventConverter, EventConverter>
eventConversionFactory) {
List<EventTask> deliveryTasks;
+ EventConverter convertibleEventData;
synchronized (lock) {
+ convertibleEventData =
eventConversionFactory.apply(event, customEventConverter);
+
List<EventTask> typedDeliveries = toTypedEventTasks(
topicsToTypedHandlers.getOrDefault(topic,
emptyMap()), topic, convertibleEventData);
@@ -728,13 +755,13 @@ public class TypedEventBusImpl implements TypedEventBus {
@Override
public void deliver(T event) {
checkOpen();
- TypedEventBusImpl.this.deliver(topic,
EventConverter.forTypedEvent(event));
+ TypedEventBusImpl.this.deliver(topic, event,
EventConverter::forTypedEvent);
}
@Override
public void deliverUntyped(Map<String, ?> event) {
checkOpen();
- TypedEventBusImpl.this.deliver(topic,
EventConverter.forUntypedEvent(event));
+ TypedEventBusImpl.this.deliver(topic, event,
EventConverter::forUntypedEvent);
}
@Override
diff --git
a/org.apache.aries.typedevent.bus/src/main/java/org/apache/aries/typedevent/bus/impl/TypedEventTask.java
b/org.apache.aries.typedevent.bus/src/main/java/org/apache/aries/typedevent/bus/impl/TypedEventTask.java
index e5a2459..40cd64c 100644
---
a/org.apache.aries.typedevent.bus/src/main/java/org/apache/aries/typedevent/bus/impl/TypedEventTask.java
+++
b/org.apache.aries.typedevent.bus/src/main/java/org/apache/aries/typedevent/bus/impl/TypedEventTask.java
@@ -21,13 +21,13 @@ import org.osgi.service.typedevent.TypedEventHandler;
class TypedEventTask extends EventTask {
private final String topic;
- private final Class<?> targetEventClass;
+ private final TypeData targetEventClass;
private final EventConverter eventData;
private final TypedEventHandler<Object> eventProcessor;
@SuppressWarnings("unchecked")
public TypedEventTask(String topic, EventConverter eventData,
TypedEventHandler<?> eventProcessor,
- Class<?> targetEventClass) {
+ TypeData targetEventClass) {
super();
this.topic = topic;
this.targetEventClass = targetEventClass;
diff --git
a/org.apache.aries.typedevent.bus/src/main/java/org/apache/aries/typedevent/bus/impl/TypedHistoryReplayTask.java
b/org.apache.aries.typedevent.bus/src/main/java/org/apache/aries/typedevent/bus/impl/TypedHistoryReplayTask.java
index 7525fd2..a9e0287 100644
---
a/org.apache.aries.typedevent.bus/src/main/java/org/apache/aries/typedevent/bus/impl/TypedHistoryReplayTask.java
+++
b/org.apache.aries.typedevent.bus/src/main/java/org/apache/aries/typedevent/bus/impl/TypedHistoryReplayTask.java
@@ -18,24 +18,29 @@ package org.apache.aries.typedevent.bus.impl;
import java.util.List;
+import org.apache.aries.typedevent.bus.spi.CustomEventConverter;
import org.osgi.service.typedevent.TypedEventHandler;
import org.osgi.service.typedevent.monitor.MonitorEvent;
public class TypedHistoryReplayTask extends HistoryReplayTask {
+ private final CustomEventConverter customEventConverter;
private final TypedEventHandler<Object> handler;
- private final Class<?> eventType;
+ private final TypeData eventType;
@SuppressWarnings("unchecked")
- public TypedHistoryReplayTask(TypedEventMonitorImpl monitorImpl,
TypedEventHandler<?> handler, Class<?> eventType, List<EventSelector>
selectors, Integer history) {
+ public TypedHistoryReplayTask(TypedEventMonitorImpl monitorImpl,
+ CustomEventConverter customEventConverter,
TypedEventHandler<?> handler,
+ TypeData eventType, List<EventSelector> selectors,
Integer history) {
super(monitorImpl, selectors, history);
+ this.customEventConverter = customEventConverter;
this.handler = (TypedEventHandler<Object>) handler;
this.eventType = eventType;
}
@Override
protected void notifyListener(MonitorEvent me) {
- handler.notify(me.topic, (Object)
EventConverter.forUntypedEvent(me.eventData).toTypedEvent(eventType));
+ handler.notify(me.topic, (Object)
EventConverter.forUntypedEvent(me.eventData,
customEventConverter).toTypedEvent(eventType));
}
}
diff --git
a/org.apache.aries.typedevent.bus/src/main/java/org/apache/aries/typedevent/bus/spi/AriesTypedEvents.java
b/org.apache.aries.typedevent.bus/src/main/java/org/apache/aries/typedevent/bus/spi/AriesTypedEvents.java
new file mode 100644
index 0000000..f17d491
--- /dev/null
+++
b/org.apache.aries.typedevent.bus/src/main/java/org/apache/aries/typedevent/bus/spi/AriesTypedEvents.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aries.typedevent.bus.spi;
+
+import org.osgi.annotation.versioning.ProviderType;
+import org.osgi.service.typedevent.TypedEventBus;
+
+@ProviderType
+public interface AriesTypedEvents extends TypedEventBus {
+
+ /**
+ * Register a global event converter for this event bus. Equivalent to
+ * {@link #registerGlobalEventConverter(CustomEventConverter, boolean)}
passing
+ * <code>false</code> as the <em>force</em> flag.
+ * @param converter the converter to register
+ * @throws IllegalStateException if a converter has already been
registered
+ */
+ default public void registerGlobalEventConverter(CustomEventConverter
converter) throws IllegalStateException {
+ registerGlobalEventConverter(converter, false);
+ }
+
+ /**
+ * Register a global event converter for this event bus.
+ * @param converter - the converter to register
+ * @param force - if true then any existing converter will be replaced
+ * @throws IllegalStateException if <em>force</em> is
<code>false</code>
+ * and a converter has already been registered
+ */
+ public void registerGlobalEventConverter(CustomEventConverter
converter, boolean force) throws IllegalStateException;
+
+}
diff --git
a/org.apache.aries.typedevent.bus/src/main/java/org/apache/aries/typedevent/bus/spi/CustomEventConverter.java
b/org.apache.aries.typedevent.bus/src/main/java/org/apache/aries/typedevent/bus/spi/CustomEventConverter.java
new file mode 100644
index 0000000..21a50f7
--- /dev/null
+++
b/org.apache.aries.typedevent.bus/src/main/java/org/apache/aries/typedevent/bus/spi/CustomEventConverter.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aries.typedevent.bus.spi;
+
+import java.lang.reflect.Type;
+import java.util.Map;
+
+import org.osgi.annotation.versioning.ConsumerType;
+
+/**
+ * A Custom Event Converter can be registered to provide customised
+ * conversion of event data.
+ */
+@ConsumerType
+public interface CustomEventConverter {
+ /**
+ * Convert the supplied event to the target type
+ * @param event - the event to convert
+ * @param rawTarget - the target class
+ * @param genericTarget - the target type, including any generics
information
+ * @return A converted event, or <code>null</code> if the event cannot
be converted
+ */
+ public <T> T toTypedEvent(Object event, Class<?> rawTarget, Type
genericTarget);
+
+ /**
+ * Convert the supplied event to an untyped event (nested maps of
scalar types)
+ * @param event - the event to convert
+ * @return A converted event, or <code>null</code> if the event cannot
be converted
+ */
+ public Map<String, Object> toUntypedEvent(Object event);
+}
diff --git
a/org.apache.aries.typedevent.bus/src/main/java/org/apache/aries/typedevent/bus/spi/package-info.java
b/org.apache.aries.typedevent.bus/src/main/java/org/apache/aries/typedevent/bus/spi/package-info.java
new file mode 100644
index 0000000..2937eca
--- /dev/null
+++
b/org.apache.aries.typedevent.bus/src/main/java/org/apache/aries/typedevent/bus/spi/package-info.java
@@ -0,0 +1,18 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
[email protected]
+package org.apache.aries.typedevent.bus.spi;
\ No newline at end of file
diff --git
a/org.apache.aries.typedevent.bus/src/test/java/org/apache/aries/typedevent/bus/impl/EventConverterTest.java
b/org.apache.aries.typedevent.bus/src/test/java/org/apache/aries/typedevent/bus/impl/EventConverterTest.java
index 98b4d19..089a3bf 100644
---
a/org.apache.aries.typedevent.bus/src/test/java/org/apache/aries/typedevent/bus/impl/EventConverterTest.java
+++
b/org.apache.aries.typedevent.bus/src/test/java/org/apache/aries/typedevent/bus/impl/EventConverterTest.java
@@ -17,13 +17,19 @@
package org.apache.aries.typedevent.bus.impl;
+import static org.junit.jupiter.api.Assertions.assertSame;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;
+import java.lang.reflect.ParameterizedType;
+import java.lang.reflect.Type;
import java.util.Map;
+import org.apache.aries.typedevent.bus.spi.CustomEventConverter;
import org.junit.jupiter.api.Test;
+import org.mockito.Mockito;
+import org.osgi.service.typedevent.TypedEventHandler;
import org.osgi.util.converter.ConversionException;
public class EventConverterTest {
@@ -49,7 +55,15 @@ public class EventConverterTest {
return holder;
}
}
+
+ public static class ParameterizedEvent<T> {
+ public T parameterisedMessage;
+ }
+
+ public static interface IntegerTestHandler extends
TypedEventHandler<ParameterizedEvent<Integer>> {}
+ public static interface DoubleTestHandler extends
TypedEventHandler<ParameterizedEvent<Double>> {}
+
public static class DoublyNestedEventHolderWithIssues {
public NestedEventHolderNotAProperDTO event;
}
@@ -60,11 +74,12 @@ public class EventConverterTest {
TestEvent te = new TestEvent();
te.message = "FOO";
- Map<String, ?> map = EventConverter.forTypedEvent(te).toUntypedEvent();
+ Map<String, ?> map = EventConverter.forTypedEvent(te,
null).toUntypedEvent();
assertEquals("FOO", map.get("message"));
- TestEvent testEvent =
EventConverter.forUntypedEvent(map).toTypedEvent(TestEvent.class);
+ TestEvent testEvent = EventConverter.forUntypedEvent(map, null)
+ .toTypedEvent(new TypeData(TestEvent.class));
assertEquals(te.message, testEvent.message);
@@ -79,13 +94,14 @@ public class EventConverterTest {
NestedEventHolder holder = new NestedEventHolder();
holder.event = te;
- Map<String, ?> map =
EventConverter.forTypedEvent(holder).toUntypedEvent();
+ Map<String, ?> map = EventConverter.forTypedEvent(holder,
null).toUntypedEvent();
@SuppressWarnings({ "unchecked" })
Map<String, Object> nested = (Map<String, Object>) map.get("event");
assertEquals("FOO", nested.get("message"));
- NestedEventHolder testEvent =
EventConverter.forUntypedEvent(map).toTypedEvent(NestedEventHolder.class);
+ NestedEventHolder testEvent = EventConverter.forUntypedEvent(map, null)
+ .toTypedEvent(new TypeData(NestedEventHolder.class));
assertEquals(te.message, testEvent.event.message);
@@ -100,14 +116,15 @@ public class EventConverterTest {
NestedEventHolder holder = new NestedEventHolder();
holder.event = te;
- Map<String, ?> map =
EventConverter.forTypedEvent(holder).toUntypedEvent();
+ Map<String, ?> map = EventConverter.forTypedEvent(holder,
null).toUntypedEvent();
@SuppressWarnings("unchecked")
Map<String, Object> nested = (Map<String, Object>) map.get("event");
assertEquals("FOO", nested.get("message"));
try {
-
EventConverter.forUntypedEvent(map).toTypedEvent(DefaultVisibilityNestedEventHolder.class);
+ EventConverter.forUntypedEvent(map, null).toTypedEvent(
+ new TypeData(DefaultVisibilityNestedEventHolder.class));
fail("Should not succeed in creating a Default Visibility type");
} catch (ConversionException ce) {
assertEquals(IllegalAccessException.class,
ce.getCause().getClass());
@@ -123,14 +140,14 @@ public class EventConverterTest {
NestedEventHolderNotAProperDTO holder = new
NestedEventHolderNotAProperDTO();
holder.event = te;
- Map<String, ?> map =
EventConverter.forTypedEvent(holder).toUntypedEvent();
+ Map<String, ?> map = EventConverter.forTypedEvent(holder,
null).toUntypedEvent();
@SuppressWarnings("unchecked")
Map<String, Object> nested = (Map<String, Object>) map.get("event");
assertEquals("FOO", nested.get("message"));
- NestedEventHolderNotAProperDTO testEvent =
EventConverter.forUntypedEvent(map)
- .toTypedEvent(NestedEventHolderNotAProperDTO.class);
+ NestedEventHolderNotAProperDTO testEvent =
EventConverter.forUntypedEvent(map, null)
+ .toTypedEvent(new
TypeData(NestedEventHolderNotAProperDTO.class));
assertEquals(te.message, testEvent.event.message);
@@ -149,15 +166,15 @@ public class EventConverterTest {
DoublyNestedEventHolderWithIssues doubleHolder = new
DoublyNestedEventHolderWithIssues();
doubleHolder.event = holder;
- Map<String, ?> map =
EventConverter.forTypedEvent(doubleHolder).toUntypedEvent();
+ Map<String, ?> map = EventConverter.forTypedEvent(doubleHolder,
null).toUntypedEvent();
Map<String, Object> nested = (Map<String, Object>) map.get("event");
assertTrue(nested.containsKey("event"));
nested = (Map<String, Object>) nested.get("event");
assertEquals("FOO", nested.get("message"));
- DoublyNestedEventHolderWithIssues testEvent =
EventConverter.forUntypedEvent(map)
- .toTypedEvent(DoublyNestedEventHolderWithIssues.class);
+ DoublyNestedEventHolderWithIssues testEvent =
EventConverter.forUntypedEvent(map, null)
+ .toTypedEvent(new
TypeData(DoublyNestedEventHolderWithIssues.class));
assertEquals(te.message, testEvent.event.event.message);
@@ -172,17 +189,94 @@ public class EventConverterTest {
NestedEventHolder holder = new NestedEventHolder();
holder.event = te;
- Map<String, ?> map =
EventConverter.forTypedEvent(holder).toUntypedEvent();
+ Map<String, ?> map = EventConverter.forTypedEvent(holder,
null).toUntypedEvent();
@SuppressWarnings("unchecked")
Map<String, Object> nested = (Map<String, Object>) map.get("event");
assertEquals("FOO", nested.get("message"));
- NestedEventHolderNotAProperDTO testEvent =
EventConverter.forUntypedEvent(map)
- .toTypedEvent(NestedEventHolderNotAProperDTO.class);
+ NestedEventHolderNotAProperDTO testEvent =
EventConverter.forUntypedEvent(map, null)
+ .toTypedEvent(new
TypeData(NestedEventHolderNotAProperDTO.class));
assertEquals(te.message, testEvent.event.message);
}
+ @Test
+ public void testGenericFlattenAndReconstituteIntoADifferentType() {
+
+ ParameterizedEvent<Integer> te = new ParameterizedEvent<>();
+ te.parameterisedMessage = 42;
+
+ Type integerType =
((ParameterizedType)IntegerTestHandler.class.getGenericInterfaces()[0])
+ .getActualTypeArguments()[0];
+ Type doubleType =
((ParameterizedType)DoubleTestHandler.class.getGenericInterfaces()[0])
+ .getActualTypeArguments()[0];
+
+ EventConverter eventConverter = EventConverter.forTypedEvent(te, null);
+
+ Map<String, ?> map = eventConverter.toUntypedEvent();
+ assertEquals(42, map.get("parameterisedMessage"));
+
+ ParameterizedEvent<Double> converted = eventConverter.toTypedEvent(new
TypeData(doubleType));
+ assertEquals(42d, converted.parameterisedMessage, 0.00001);
+
+ ParameterizedEvent<Integer> testEvent = EventConverter.forUntypedEvent(
+ Map.of("parameterisedMessage", "17"), null)
+ .toTypedEvent(new TypeData(integerType));
+
+ assertEquals(17, testEvent.parameterisedMessage);
+
+ }
+
+ @Test
+ public void testCustomConverterRawTypes() {
+
+ TestEvent te = new TestEvent();
+ te.message = "FOO";
+ CustomEventConverter cec = Mockito.mock(CustomEventConverter.class);
+
+ Mockito.when(cec.toUntypedEvent(te)).thenReturn(Map.of("message",
"BAR"));
+ Mockito.when(cec.toTypedEvent(te, String.class,
String.class)).thenReturn("FOOBAR");
+ Mockito.when(cec.toTypedEvent(te, TestEvent.class,
TestEvent.class)).thenReturn("FIZZBUZZ");
+
+ EventConverter eventConverter = EventConverter.forTypedEvent(te, cec);
+
+ Map<String, ?> map = eventConverter.toUntypedEvent();
+ assertEquals("BAR", map.get("message"));
+ assertEquals("FOOBAR", eventConverter.toTypedEvent(new
TypeData(String.class)));
+
+ // Bypass the conversion if identity
+ assertSame(te, eventConverter.toTypedEvent(new
TypeData(TestEvent.class)));
+
+ }
+
+ @Test
+ public void testCustomConverterWithGenerics() {
+
+ ParameterizedEvent<Integer> te = new ParameterizedEvent<>();
+ te.parameterisedMessage = 42;
+
+ Type integerType =
((ParameterizedType)IntegerTestHandler.class.getGenericInterfaces()[0])
+ .getActualTypeArguments()[0];
+ Type doubleType =
((ParameterizedType)DoubleTestHandler.class.getGenericInterfaces()[0])
+ .getActualTypeArguments()[0];
+
+ CustomEventConverter cec = Mockito.mock(CustomEventConverter.class);
+
+
Mockito.when(cec.toUntypedEvent(te)).thenReturn(Map.of("parameterisedMessage",
"21"));
+ Mockito.when(cec.toTypedEvent(te, ParameterizedEvent.class,
integerType)).thenReturn(21d);
+ Mockito.when(cec.toTypedEvent(te, ParameterizedEvent.class,
doubleType)).thenReturn(63d);
+
+ EventConverter eventConverter = EventConverter.forTypedEvent(te, cec);
+
+ Map<String, ?> map = eventConverter.toUntypedEvent();
+ assertEquals("21", map.get("parameterisedMessage"));
+
+ // Never bypass as we can't easily verify the generics
+ assertEquals(21d, eventConverter.toTypedEvent(new
TypeData(integerType)), 0.00001);
+ assertEquals(63d, eventConverter.toTypedEvent(new
TypeData(doubleType)), 0.00001);
+
+ }
+
}
diff --git
a/org.apache.aries.typedevent.bus/src/test/java/org/apache/aries/typedevent/bus/osgi/EventDeliveryIntegrationTest.java
b/org.apache.aries.typedevent.bus/src/test/java/org/apache/aries/typedevent/bus/osgi/EventDeliveryIntegrationTest.java
index 59dbb0d..b421adb 100644
---
a/org.apache.aries.typedevent.bus/src/test/java/org/apache/aries/typedevent/bus/osgi/EventDeliveryIntegrationTest.java
+++
b/org.apache.aries.typedevent.bus/src/test/java/org/apache/aries/typedevent/bus/osgi/EventDeliveryIntegrationTest.java
@@ -30,6 +30,9 @@ import org.apache.aries.typedevent.bus.common.TestEvent2;
import org.apache.aries.typedevent.bus.common.TestEvent2.EventType;
import org.apache.aries.typedevent.bus.common.TestEvent2Consumer;
import org.apache.aries.typedevent.bus.common.TestEventConsumer;
+import org.apache.aries.typedevent.bus.spi.AriesTypedEvents;
+import org.apache.aries.typedevent.bus.spi.CustomEventConverter;
+import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.ArgumentCaptor;
@@ -73,6 +76,14 @@ public class EventDeliveryIntegrationTest extends
AbstractIntegrationTest {
@Mock
UntypedEventHandler untypedEventHandler, untypedEventHandler2;
+ @Mock
+ CustomEventConverter customConverter;
+
+ @AfterEach
+ public void stop() throws Exception {
+ ((AriesTypedEvents) eventBus).registerGlobalEventConverter(null, true);
+ }
+
/**
* Tests that events are delivered to untyped Event Handlers
* based on topic
@@ -362,4 +373,41 @@ public class EventDeliveryIntegrationTest extends
AbstractIntegrationTest {
}
+ /**
+ * Tests that events are delivered to untyped Event Handlers
+ * based on topic
+ *
+ * @throws InterruptedException
+ */
+ @Test
+ public void testCustomEventReceiving() throws InterruptedException {
+
+ TestEvent event = new TestEvent();
+ event.message = "boo";
+
+ ((AriesTypedEvents)
eventBus).registerGlobalEventConverter(customConverter);
+
+
Mockito.when(customConverter.toUntypedEvent(event)).thenReturn(Map.of("message",
"BOO"));
+
+ Dictionary<String, Object> props = new Hashtable<>();
+ props.put(TypedEventConstants.TYPED_EVENT_TOPICS, TEST_EVENT_TOPIC);
+
+ regs.add(context.registerService(UntypedEventHandler.class,
untypedEventHandler, props));
+
+ props = new Hashtable<>();
+
+ props.put(TypedEventConstants.TYPED_EVENT_TOPICS, TEST_EVENT_2_TOPIC);
+
+ regs.add(context.registerService(UntypedEventHandler.class,
untypedEventHandler2, props));
+
+ eventBus.deliver(event);
+
+ Mockito.verify(untypedEventHandler,
Mockito.timeout(1000)).notifyUntyped(
+ Mockito.eq(TEST_EVENT_TOPIC),
Mockito.argThat(isUntypedTestEventWithMessage("BOO")));
+
+ Mockito.verify(untypedEventHandler2,
Mockito.after(1000).never()).notifyUntyped(
+ Mockito.eq(TEST_EVENT_TOPIC),
Mockito.argThat(isUntypedTestEventWithMessage("BOO")));
+
+ }
+
}
\ No newline at end of file