This is an automated email from the ASF dual-hosted git repository.
timothyjward pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/aries-typedevent.git
The following commit(s) were added to refs/heads/main by this push:
new 3129108 Add support for wildcard listeners
3129108 is described below
commit 3129108fb810ab0dbbc05961473a69e5491294d4
Author: Tim Ward <[email protected]>
AuthorDate: Wed Oct 12 17:13:59 2022 +0100
Add support for wildcard listeners
---
.../typedevent/bus/impl/TypedEventBusImpl.java | 154 +++++++++++++++++----
.../typedevent/bus/impl/TypedEventBusImplTest.java | 82 +++++++++++
org.apache.aries.typedevent.bus/test.bndrun | 6 +-
.../test.bndrun | 10 +-
4 files changed, 216 insertions(+), 36 deletions(-)
diff --git
a/org.apache.aries.typedevent.bus/src/main/java/org/apache/aries/typedevent/bus/impl/TypedEventBusImpl.java
b/org.apache.aries.typedevent.bus/src/main/java/org/apache/aries/typedevent/bus/impl/TypedEventBusImpl.java
index f7c5729..e055443 100644
---
a/org.apache.aries.typedevent.bus/src/main/java/org/apache/aries/typedevent/bus/impl/TypedEventBusImpl.java
+++
b/org.apache.aries.typedevent.bus/src/main/java/org/apache/aries/typedevent/bus/impl/TypedEventBusImpl.java
@@ -34,11 +34,12 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
+import java.util.NavigableMap;
import java.util.Objects;
+import java.util.TreeMap;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.stream.Stream;
import org.osgi.annotation.bundle.Capability;
import org.osgi.framework.Bundle;
@@ -69,6 +70,12 @@ public class TypedEventBusImpl implements TypedEventBus {
* the map should be copied as the contents are not thread safe.
*/
private final Map<String, Map<TypedEventHandler<?>, Filter>>
topicsToTypedHandlers = new HashMap<>();
+
+ /**
+ * Map access and mutation must be synchronized on {@link #lock}. Values
from
+ * the map should be copied as the contents are not thread safe.
+ */
+ private final NavigableMap<String, Map<TypedEventHandler<?>, Filter>>
wildcardTopicsToTypedHandlers = new TreeMap<>();
/**
* Map access and mutation must be synchronized on {@link #lock}. Values
from
@@ -80,7 +87,13 @@ public class TypedEventBusImpl implements TypedEventBus {
* Map access and mutation must be synchronized on {@link #lock}. Values
from
* the map should be copied as the contents are not thread safe.
*/
- private final Map<String, Map<UntypedEventHandler, Filter>>
topicsToUntypedHandlers = new HashMap<>();
+ private final NavigableMap<String, Map<UntypedEventHandler, Filter>>
topicsToUntypedHandlers = new TreeMap<>();
+
+ /**
+ * Map access and mutation must be synchronized on {@link #lock}. Values
from
+ * the map should be copied as the contents are not thread safe.
+ */
+ private final Map<String, Map<UntypedEventHandler, Filter>>
wildcardTopicsToUntypedHandlers = new HashMap<>();
/**
* List access and mutation must be synchronized on {@link #lock}.
@@ -124,7 +137,7 @@ public class TypedEventBusImpl implements TypedEventBus {
String defaultTopic = clazz == null ? null :
clazz.getName().replace(".", "/");
- doAddEventHandler(topicsToTypedHandlers, knownTypedHandlers, handler,
defaultTopic, properties);
+ doAddEventHandler(topicsToTypedHandlers,
wildcardTopicsToTypedHandlers, knownTypedHandlers, handler, defaultTopic,
properties);
}
private Class<?> discoverTypeForTypedHandler(Bundle registeringBundle,
TypedEventHandler<?> handler, Map<String, Object> properties) {
@@ -193,11 +206,11 @@ public class TypedEventBusImpl implements TypedEventBus {
}
void addUntypedEventHandler(UntypedEventHandler handler, Map<String,
Object> properties) {
- doAddEventHandler(topicsToUntypedHandlers, knownUntypedHandlers,
handler, null, properties);
+ doAddEventHandler(topicsToUntypedHandlers,
wildcardTopicsToUntypedHandlers, knownUntypedHandlers, handler, null,
properties);
}
- private <T> void doAddEventHandler(Map<String, Map<T, Filter>> map,
Map<Long, T> idMap,
- T handler, String defaultTopic, Map<String, Object> properties) {
+ private <T> void doAddEventHandler(Map<String, Map<T, Filter>> map,
Map<String, Map<T, Filter>> wildcardMap,
+ Map<Long, T> idMap, T handler, String defaultTopic, Map<String,
Object> properties) {
Object prop = properties.get(TypedEventConstants.TYPED_EVENT_TOPICS);
@@ -212,7 +225,17 @@ public class TypedEventBusImpl implements TypedEventBus {
} else {
topicList = standardConverter().convert(prop).to(LIST_OF_STRINGS);
}
-
+
+ topicList = topicList.stream()
+ .filter(s -> {
+ String msg = checkTopicSyntax(s, true);
+ if(msg != null) {
+ // TODO log this
+ }
+ return msg == null;
+ })
+ .collect(toList());
+
Long serviceId = getServiceId(properties);
Filter f;
@@ -228,10 +251,19 @@ public class TypedEventBusImpl implements TypedEventBus {
knownHandlers.put(serviceId, topicList);
idMap.put(serviceId, handler);
- topicList.forEach(s -> {
- Map<T, Filter> handlers = map.computeIfAbsent(s, x1 -> new
HashMap<>());
- handlers.put(handler, f);
- });
+ for(String s : topicList) {
+ Map<String, Map<T, Filter>> mapToUse;
+ String topicToUse;
+ if(isWildcard(s)) {
+ mapToUse = wildcardMap;
+ topicToUse = s.length() == 1 ? "" : s.substring(0,
s.length() - 2);
+ } else {
+ mapToUse = map;
+ topicToUse = s;
+ }
+ Map<T, Filter> handlers = mapToUse.computeIfAbsent(topicToUse,
x1 -> new HashMap<>());
+ handlers.put(handler, f);
+ }
}
}
@@ -305,21 +337,21 @@ public class TypedEventBusImpl implements TypedEventBus {
String defaultTopic = clazz == null ? null :
clazz.getName().replace(".", "/");
- doUpdatedEventHandler(topicsToTypedHandlers, knownTypedHandlers,
defaultTopic, properties);
+ doUpdatedEventHandler(topicsToTypedHandlers,
wildcardTopicsToTypedHandlers, knownTypedHandlers, defaultTopic, properties);
}
void updatedUntypedEventHandler(Map<String, Object> properties) {
- doUpdatedEventHandler(topicsToUntypedHandlers, knownUntypedHandlers,
null, properties);
+ doUpdatedEventHandler(topicsToUntypedHandlers,
wildcardTopicsToUntypedHandlers, knownUntypedHandlers, null, properties);
}
- private <T> void doUpdatedEventHandler(Map<String, Map<T, Filter>> map,
Map<Long,T> idToHandler, String defaultTopic,
+ private <T> void doUpdatedEventHandler(Map<String, Map<T, Filter>> map,
Map<String, Map<T, Filter>> wildcardMap, Map<Long,T> idToHandler, String
defaultTopic,
Map<String, Object> properties) {
Long serviceId = getServiceId(properties);
synchronized (lock) {
T handler = idToHandler.get(serviceId);
doRemoveEventHandler(map, idToHandler, handler, serviceId);
- doAddEventHandler(map, idToHandler, handler, defaultTopic,
properties);
+ doAddEventHandler(map, wildcardMap, idToHandler, handler,
defaultTopic, properties);
}
}
@@ -389,20 +421,34 @@ public class TypedEventBusImpl implements TypedEventBus {
private void deliver(String topic, EventConverter convertibleEventData) {
- List<? extends EventTask> deliveryTasks;
+ List<EventTask> deliveryTasks;
synchronized (lock) {
- Stream<EventTask> typedDeliveries =
topicsToTypedHandlers.getOrDefault(topic, emptyMap()).entrySet()
- .stream().filter(e -> e.getValue() == null ||
convertibleEventData.applyFilter(e.getValue()))
- .map(Entry::getKey).map(handler -> new
TypedEventTask(topic, convertibleEventData, handler,
- typedHandlersToTargetClasses.get(handler)));
-
- Stream<EventTask> untypedDeliveries =
topicsToUntypedHandlers.getOrDefault(topic, emptyMap()).entrySet()
- .stream().filter(e -> e.getValue() == null ||
convertibleEventData.applyFilter(e.getValue()))
- .map(Entry::getKey).map(handler -> new
UntypedEventTask(topic, convertibleEventData, handler));
-
- deliveryTasks = Stream.concat(typedDeliveries,
untypedDeliveries).collect(toList());
-
+ List<EventTask> typedDeliveries = toTypedEventTasks(
+ topicsToTypedHandlers.getOrDefault(topic,
emptyMap()), topic, convertibleEventData);
+
+ List<EventTask> untypedDeliveries = toUntypedEventTasks(
+ topicsToUntypedHandlers.getOrDefault(topic,
emptyMap()), topic, convertibleEventData);
+
+ List<EventTask> wildcardDeliveries = new ArrayList<>();
+ String truncatedTopic = topic;
+ do {
+ int idx = truncatedTopic.lastIndexOf('/');
+ truncatedTopic = idx > 0 ? truncatedTopic.substring(0, idx) :
"";
+ wildcardDeliveries.addAll(toTypedEventTasks(
+
wildcardTopicsToTypedHandlers.getOrDefault(truncatedTopic, emptyMap()),
+ topic, convertibleEventData));
+ wildcardDeliveries.addAll(toUntypedEventTasks(
+
wildcardTopicsToUntypedHandlers.getOrDefault(truncatedTopic, emptyMap()),
+ topic, convertibleEventData));
+ } while (truncatedTopic.length() > 0);
+
+ deliveryTasks = new ArrayList<>(typedDeliveries.size() +
untypedDeliveries.size() + wildcardDeliveries.size());
+
+ deliveryTasks.addAll(typedDeliveries);
+ deliveryTasks.addAll(untypedDeliveries);
+ deliveryTasks.addAll(wildcardDeliveries);
+
if (deliveryTasks.isEmpty()) {
// TODO log properly
System.out.println("Unhandled Event Handlers are being used
for event sent to topic " + topic);
@@ -415,8 +461,42 @@ public class TypedEventBusImpl implements TypedEventBus {
queue.addAll(deliveryTasks);
}
+
+ private List<EventTask> toTypedEventTasks(Map<TypedEventHandler<?>,
Filter> map,
+ String topic, EventConverter convertibleEventData) {
+ List<EventTask> list = new ArrayList<>();
+ for(Entry<TypedEventHandler<?>, Filter> e : map.entrySet()) {
+ Filter f = e.getValue();
+ if(f == null || convertibleEventData.applyFilter(f)) {
+ TypedEventHandler<?> handler = e.getKey();
+ list.add(new TypedEventTask(topic,
convertibleEventData, handler,
+ typedHandlersToTargetClasses.get(handler)));
+ }
+ }
+ return list;
+ }
+
+ private List<EventTask> toUntypedEventTasks(Map<UntypedEventHandler,
Filter> map,
+ String topic, EventConverter convertibleEventData) {
+ List<EventTask> list = new ArrayList<>();
+ for(Entry<UntypedEventHandler, Filter> e : map.entrySet()) {
+ Filter f = e.getValue();
+ if(f == null || convertibleEventData.applyFilter(f)) {
+ UntypedEventHandler handler = e.getKey();
+ list.add(new UntypedEventTask(topic,
convertibleEventData, handler));
+ }
+ }
+ return list;
+ }
private static void checkTopicSyntax(String topic) {
+ String msg = checkTopicSyntax(topic, false);
+ if(msg != null) {
+ throw new IllegalArgumentException(msg);
+ }
+ }
+
+ private static String checkTopicSyntax(String topic, boolean
wildcardPermitted) {
if(topic == null) {
throw new IllegalArgumentException("The topic name is not
permitted to be null");
@@ -425,6 +505,19 @@ public class TypedEventBusImpl implements TypedEventBus {
boolean slashPermitted = false;
for(int i = 0; i < topic.length(); i++) {
int c = topic.codePointAt(i);
+ if('*' == c) {
+ if(!wildcardPermitted) {
+ return "Wildcard topics may not be used for
sending events";
+ }
+ if(topic.length() != i + 1) {
+ return "The wildcard * is only permitted at the
end of the topic";
+ }
+ if(topic.length() > 1 && topic.codePointAt(i - 1) !=
'/') {
+ return "The wildcard must be preceded by a /
unless it is the only character in the topic string";
+ }
+ continue;
+ }
+
if('/' == c) {
if(slashPermitted && i != (topic.length() - 1)) {
slashPermitted = false;
@@ -434,8 +527,13 @@ public class TypedEventBusImpl implements TypedEventBus {
slashPermitted = true;
continue;
}
- throw new IllegalArgumentException("Illegal character " + c + "
at index " + i + " of topic string: " + topic);
+ return "Illegal character " + c + " at index " + i + " of topic
string: " + topic;
}
+ return null;
+ }
+
+ private static boolean isWildcard(String topic) {
+ return topic.equals("*") || topic.endsWith("/*");
}
private class EventThread extends Thread {
diff --git
a/org.apache.aries.typedevent.bus/src/test/java/org/apache/aries/typedevent/bus/impl/TypedEventBusImplTest.java
b/org.apache.aries.typedevent.bus/src/test/java/org/apache/aries/typedevent/bus/impl/TypedEventBusImplTest.java
index 9566b6d..017e1be 100644
---
a/org.apache.aries.typedevent.bus/src/test/java/org/apache/aries/typedevent/bus/impl/TypedEventBusImplTest.java
+++
b/org.apache.aries.typedevent.bus/src/test/java/org/apache/aries/typedevent/bus/impl/TypedEventBusImplTest.java
@@ -50,6 +50,8 @@ public class TypedEventBusImplTest {
private static final String SPECIAL_TEST_EVENT_TOPIC =
SpecialTestEvent.class.getName().replace(".", "/");
private static final String TEST_EVENT_TOPIC =
TestEvent.class.getName().replace(".", "/");
+ private static final String TEST_EVENT_WILDCARD_TOPIC =
"org/apache/aries/typedevent/*";
+ private static final String BASE_WILDCARD_TOPIC = "*";
public static class TestEvent {
public String message;
@@ -187,6 +189,86 @@ public class TypedEventBusImplTest {
assertFalse(untypedSemB.tryAcquire(1, TimeUnit.SECONDS));
}
+
+ /**
+ * Tests that events are delivered to Smart Behaviours based on type
+ *
+ * @throws InterruptedException
+ */
+ @Test
+ public void testWildcardEventReceiving() throws InterruptedException {
+
+ TestEvent event = new TestEvent();
+ event.message = "boo";
+
+ Map<String, Object> serviceProperties = new HashMap<>();
+
+ serviceProperties.put(TYPED_EVENT_TOPICS, TEST_EVENT_WILDCARD_TOPIC);
+ serviceProperties.put(TYPED_EVENT_TYPE, TestEvent.class.getName());
+ serviceProperties.put(SERVICE_ID, 42L);
+
+ impl.addTypedEventHandler(registeringBundle, handlerA,
serviceProperties);
+
+ serviceProperties = new HashMap<>();
+
+ serviceProperties.put(TYPED_EVENT_TOPICS, BASE_WILDCARD_TOPIC);
+ serviceProperties.put(TYPED_EVENT_TYPE, TestEvent.class.getName());
+ serviceProperties.put(SERVICE_ID, 43L);
+
+ impl.addTypedEventHandler(registeringBundle, handlerB,
serviceProperties);
+
+ serviceProperties = new HashMap<>();
+
+ serviceProperties.put(TYPED_EVENT_TOPICS, TEST_EVENT_WILDCARD_TOPIC);
+ serviceProperties.put(SERVICE_ID, 44L);
+
+ impl.addUntypedEventHandler(untypedHandlerA, serviceProperties);
+
+ serviceProperties = new HashMap<>();
+
+ serviceProperties.put(TYPED_EVENT_TOPICS, BASE_WILDCARD_TOPIC);
+ serviceProperties.put(SERVICE_ID, 45L);
+
+ impl.addUntypedEventHandler(untypedHandlerB, serviceProperties);
+
+ impl.deliver(event);
+
+ assertTrue(semA.tryAcquire(1, TimeUnit.SECONDS));
+
+
Mockito.verify(handlerA).notify(Mockito.eq(TestEvent.class.getName().replace('.',
'/')),
+ Mockito.argThat(isTestEventWithMessage("boo")));
+
+ assertTrue(semB.tryAcquire(1, TimeUnit.SECONDS));
+
+
Mockito.verify(handlerB).notify(Mockito.eq(TestEvent.class.getName().replace('.',
'/')),
+ Mockito.argThat(isTestEventWithMessage("boo")));
+
+ assertTrue(untypedSemA.tryAcquire(1, TimeUnit.SECONDS));
+
+
Mockito.verify(untypedHandlerA).notifyUntyped(Mockito.eq(TestEvent.class.getName().replace('.',
'/')),
+ Mockito.argThat(isUntypedTestEventWithMessage("boo")));
+
+ assertTrue(untypedSemB.tryAcquire(1, TimeUnit.SECONDS));
+
+
Mockito.verify(untypedHandlerB).notifyUntyped(Mockito.eq(TestEvent.class.getName().replace('.',
'/')),
+ Mockito.argThat(isUntypedTestEventWithMessage("boo")));
+
+ impl.deliver("some/other/topic", event);
+
+ assertFalse(semA.tryAcquire(1, TimeUnit.SECONDS));
+
+ assertTrue(semB.tryAcquire(1, TimeUnit.SECONDS));
+
+
Mockito.verify(handlerB).notify(Mockito.eq(TestEvent.class.getName().replace('.',
'/')),
+ Mockito.argThat(isTestEventWithMessage("boo")));
+
+ assertFalse(untypedSemA.tryAcquire(1, TimeUnit.SECONDS));
+
+ assertTrue(untypedSemB.tryAcquire(1, TimeUnit.SECONDS));
+
+
Mockito.verify(untypedHandlerB).notifyUntyped(Mockito.eq(TestEvent.class.getName().replace('.',
'/')),
+ Mockito.argThat(isUntypedTestEventWithMessage("boo")));
+ }
public static class TestEventHandler implements
TypedEventHandler<TestEvent> {
diff --git a/org.apache.aries.typedevent.bus/test.bndrun
b/org.apache.aries.typedevent.bus/test.bndrun
index 30c45fb..f3ec6ee 100644
--- a/org.apache.aries.typedevent.bus/test.bndrun
+++ b/org.apache.aries.typedevent.bus/test.bndrun
@@ -30,7 +30,7 @@
ch.qos.logback.classic;version='[1.2.3,1.2.4)',\
ch.qos.logback.core;version='[1.2.3,1.2.4)',\
org.apache.aries.component-dsl.component-dsl;version='[1.2.2,1.2.3)',\
- org.apache.aries.typedevent.bus;version='[0.0.1,0.0.2)',\
+ org.apache.aries.typedevent.bus;version='[0.0.2,0.0.3)',\
org.apache.felix.configadmin;version='[1.9.18,1.9.19)',\
org.apache.felix.converter;version='[1.0.14,1.0.15)',\
org.osgi.service.typedevent;version='[1.0.0,1.0.1)',\
@@ -42,7 +42,6 @@
junit-platform-commons;version='[1.6.2,1.6.3)',\
net.bytebuddy.byte-buddy;version='[1.10.13,1.10.14)',\
net.bytebuddy.byte-buddy-agent;version='[1.10.13,1.10.14)',\
- org.apache.aries.typedevent.bus-tests;version='[0.0.1,0.0.2)',\
org.mockito.mockito-core;version='[3.5.10,3.5.11)',\
org.objenesis;version='[3.1.0,3.1.1)',\
org.opentest4j;version='[1.2.0,1.2.1)',\
@@ -50,4 +49,5 @@
org.osgi.test.junit5;version='[0.9.0,0.9.1)',\
junit-platform-engine;version='[1.6.2,1.6.3)',\
junit-platform-launcher;version='[1.6.2,1.6.3)',\
- junit-jupiter-engine;version='[5.6.2,5.6.3)'
\ No newline at end of file
+ junit-jupiter-engine;version='[5.6.2,5.6.3)',\
+ org.apache.aries.typedevent.bus-tests;version='[0.0.2,0.0.3)'
\ No newline at end of file
diff --git
a/org.apache.aries.typedevent.remote/org.apache.aries.typedevent.remote.remoteservices/test.bndrun
b/org.apache.aries.typedevent.remote/org.apache.aries.typedevent.remote.remoteservices/test.bndrun
index 9729e07..dcbf3dd 100644
---
a/org.apache.aries.typedevent.remote/org.apache.aries.typedevent.remote.remoteservices/test.bndrun
+++
b/org.apache.aries.typedevent.remote/org.apache.aries.typedevent.remote.remoteservices/test.bndrun
@@ -49,8 +49,8 @@
junit-platform-engine;version='[1.6.2,1.6.3)',\
junit-platform-launcher;version='[1.6.2,1.6.3)',\
junit-jupiter-engine;version='[5.6.2,5.6.3)',\
- org.apache.aries.typedevent.remote.api;version='[0.0.1,0.0.2)',\
-
org.apache.aries.typedevent.remote.remoteservices;version='[0.0.1,0.0.2)',\
-
org.apache.aries.typedevent.remote.remoteservices-tests;version='[0.0.1,0.0.2)',\
- org.apache.aries.typedevent.remote.spi;version='[0.0.1,0.0.2)',\
- org.apache.aries.typedevent.bus;version='[0.0.1,0.0.2)'
+ org.apache.aries.typedevent.bus;version='[0.0.2,0.0.3)',\
+ org.apache.aries.typedevent.remote.api;version='[0.0.2,0.0.3)',\
+
org.apache.aries.typedevent.remote.remoteservices;version='[0.0.2,0.0.3)',\
+
org.apache.aries.typedevent.remote.remoteservices-tests;version='[0.0.2,0.0.3)',\
+ org.apache.aries.typedevent.remote.spi;version='[0.0.2,0.0.3)'