This is an automated email from the ASF dual-hosted git repository.
timothyjward pushed a commit to branch feature/v1.1
in repository https://gitbox.apache.org/repos/asf/aries-typedevent.git
The following commit(s) were added to refs/heads/feature/v1.1 by this push:
new 81675be Support Event Topic restrictions for UnhandledEventHandler
services
81675be is described below
commit 81675be2cd4c9fa68d7fe3ddb5c6bb316beebf1c
Author: Tim Ward <[email protected]>
AuthorDate: Wed Mar 6 17:40:06 2024 +0000
Support Event Topic restrictions for UnhandledEventHandler services
As per the draft specification we should support the event.topics property
for UnhandledEventHandler services.
Signed-off-by: Tim Ward <[email protected]>
---
.../typedevent/bus/impl/TypedEventBusImpl.java | 101 ++++++++++++++++-----
.../osgi/UnhandledEventHandlerIntegrationTest.java | 47 ++++++++++
2 files changed, 124 insertions(+), 24 deletions(-)
diff --git
a/org.apache.aries.typedevent.bus/src/main/java/org/apache/aries/typedevent/bus/impl/TypedEventBusImpl.java
b/org.apache.aries.typedevent.bus/src/main/java/org/apache/aries/typedevent/bus/impl/TypedEventBusImpl.java
index a056d0f..c204649 100644
---
a/org.apache.aries.typedevent.bus/src/main/java/org/apache/aries/typedevent/bus/impl/TypedEventBusImpl.java
+++
b/org.apache.aries.typedevent.bus/src/main/java/org/apache/aries/typedevent/bus/impl/TypedEventBusImpl.java
@@ -25,6 +25,7 @@ import static
org.osgi.service.typedevent.TypedEventConstants.TYPED_EVENT_FILTER
import static
org.osgi.service.typedevent.TypedEventConstants.TYPED_EVENT_HISTORY;
import static
org.osgi.service.typedevent.TypedEventConstants.TYPED_EVENT_IMPLEMENTATION;
import static
org.osgi.service.typedevent.TypedEventConstants.TYPED_EVENT_SPECIFICATION_VERSION;
+import static
org.osgi.service.typedevent.TypedEventConstants.TYPED_EVENT_TOPICS;
import static org.osgi.util.converter.Converters.standardConverter;
import java.lang.reflect.ParameterizedType;
@@ -40,6 +41,7 @@ import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.BiFunction;
+import java.util.function.Function;
import org.osgi.annotation.bundle.Capability;
import org.osgi.framework.Bundle;
@@ -97,9 +99,16 @@ public class TypedEventBusImpl implements TypedEventBus {
private final Map<String, Map<UntypedEventHandler, EventSelector>>
wildcardTopicsToUntypedHandlers = new HashMap<>();
/**
- * List access and mutation must be synchronized on {@link #lock}.
+ * Map access and mutation must be synchronized on {@link #lock}. Values
from
+ * the map should be copied as the contents are not thread safe.
+ */
+ private final Map<String, Map<UnhandledEventHandler, EventSelector>>
topicsToUnhandledHandlers = new HashMap<>();
+
+ /**
+ * Map access and mutation must be synchronized on {@link #lock}. Values
from
+ * the map should be copied as the contents are not thread safe.
*/
- private final List<UnhandledEventHandler> unhandledEventHandlers = new
ArrayList<>();
+ private final Map<String, Map<UnhandledEventHandler, EventSelector>>
wildcardTopicsToUnhandledHandlers = new HashMap<>();
/**
* Map access and mutation must be synchronized on {@link #lock}. Values
from
@@ -119,6 +128,12 @@ public class TypedEventBusImpl implements TypedEventBus {
*/
private final Map<Long, UntypedEventHandler> knownUntypedHandlers = new
HashMap<>();
+ /**
+ * Map access and mutation must be synchronized on {@link #lock}. Values
from
+ * the map should be copied as the contents are not thread safe.
+ */
+ private final Map<Long, UnhandledEventHandler> knownUnhandledHandlers =
new HashMap<>();
+
private final BlockingQueue<EventTask> queue = new LinkedBlockingQueue<>();
/**
@@ -394,15 +409,31 @@ public class TypedEventBusImpl implements TypedEventBus {
}
void addUnhandledEventHandler(UnhandledEventHandler handler, Map<String,
Object> properties) {
- synchronized (lock) {
- unhandledEventHandlers.add(handler);
- }
+ properties = clearHistoryAndAddTopic(properties);
+
+ doAddEventHandler(topicsToUnhandledHandlers,
wildcardTopicsToUnhandledHandlers, knownUnhandledHandlers, handler, null,
properties, null);
+ }
+
+ private Map<String, Object> clearHistoryAndAddTopic(Map<String, Object>
properties) {
+ if(properties.containsKey(TYPED_EVENT_HISTORY) ||
!properties.containsKey(TYPED_EVENT_TOPICS)) {
+ // TODO log warning
+ properties = new HashMap<>(properties);
+ properties.remove(TYPED_EVENT_HISTORY);
+ properties.put(TYPED_EVENT_TOPICS, "*");
+ }
+ return properties;
+ }
+
+ void updatedUnhandledEventHandler(Map<String, Object> properties) {
+ properties = clearHistoryAndAddTopic(properties);
+
+ doUpdatedEventHandler(topicsToUnhandledHandlers,
wildcardTopicsToUnhandledHandlers, knownUnhandledHandlers, null, properties);
}
void removeUnhandledEventHandler(UnhandledEventHandler handler,
Map<String, Object> properties) {
- synchronized (lock) {
- unhandledEventHandlers.remove(handler);
- }
+ Long serviceId = getServiceId(properties);
+
+ doRemoveEventHandler(topicsToUnhandledHandlers,
wildcardTopicsToUnhandledHandlers, knownUnhandledHandlers, handler, serviceId);
}
void start() {
@@ -468,19 +499,12 @@ public class TypedEventBusImpl implements TypedEventBus {
List<EventTask> untypedDeliveries = toUntypedEventTasks(
topicsToUntypedHandlers.getOrDefault(topic,
emptyMap()), topic, convertibleEventData);
- List<EventTask> wildcardDeliveries = new ArrayList<>();
- String truncatedTopic = topic;
- do {
- int idx = truncatedTopic.lastIndexOf('/',
truncatedTopic.length() - 2);
- truncatedTopic = idx > 0 ? truncatedTopic.substring(0, idx + 1)
: "";
- wildcardDeliveries.addAll(toTypedEventTasks(
-
wildcardTopicsToTypedHandlers.getOrDefault(truncatedTopic, emptyMap()),
- topic, convertibleEventData));
- wildcardDeliveries.addAll(toUntypedEventTasks(
-
wildcardTopicsToUntypedHandlers.getOrDefault(truncatedTopic, emptyMap()),
- topic, convertibleEventData));
- } while (truncatedTopic.length() > 0);
-
+ List<EventTask> wildcardDeliveries = findWildcardTasks(topic,
convertibleEventData,
+ s ->
toTypedEventTasks(wildcardTopicsToTypedHandlers.getOrDefault(s, emptyMap()),
+ topic, convertibleEventData),
+ s ->
toUntypedEventTasks(wildcardTopicsToUntypedHandlers.getOrDefault(s,
emptyMap()),
+ topic, convertibleEventData));
+
deliveryTasks = new ArrayList<>(typedDeliveries.size() +
untypedDeliveries.size() + wildcardDeliveries.size());
deliveryTasks.addAll(typedDeliveries);
@@ -490,16 +514,33 @@ public class TypedEventBusImpl implements TypedEventBus {
if (deliveryTasks.isEmpty()) {
// TODO log properly
System.out.println("Unhandled Event Handlers are being used
for event sent to topic " + topic);
- deliveryTasks = unhandledEventHandlers.stream()
- .map(handler -> new UnhandledEventTask(topic,
convertibleEventData, handler)).collect(toList());
+
deliveryTasks.addAll(toUnhandledEventTasks(topicsToUnhandledHandlers.getOrDefault(topic,
emptyMap()),
+ topic, convertibleEventData));
+ deliveryTasks.addAll(findWildcardTasks(topic,
convertibleEventData,
+ s ->
toUnhandledEventTasks(wildcardTopicsToUnhandledHandlers.getOrDefault(s,
emptyMap()),
+ topic, convertibleEventData)));
}
// This occurs inside the lock to ensure history replay doesn't
miss events
queue.add(new MonitorEventTask(topic, convertibleEventData,
monitorImpl));
}
-
queue.addAll(deliveryTasks);
}
+
+ @SafeVarargs
+ private final List<EventTask> findWildcardTasks(String topic,
EventConverter convertibleEventData,
+ Function<String, List<? extends EventTask>>...
additions) {
+ List<EventTask> wildcardDeliveries = new ArrayList<>();
+ String truncatedTopic = topic;
+ do {
+ int idx = truncatedTopic.lastIndexOf('/',
truncatedTopic.length() - 2);
+ truncatedTopic = idx > 0 ? truncatedTopic.substring(0,
idx + 1) : "";
+ for(Function<String, List<? extends EventTask>> f :
additions) {
+
wildcardDeliveries.addAll(f.apply(truncatedTopic));
+ }
+ } while (truncatedTopic.length() > 0);
+ return wildcardDeliveries;
+ }
private List<EventTask> toTypedEventTasks(Map<TypedEventHandler<?>,
EventSelector> map,
String topic, EventConverter convertibleEventData) {
@@ -526,6 +567,18 @@ public class TypedEventBusImpl implements TypedEventBus {
return list;
}
+ private List<EventTask> toUnhandledEventTasks(Map<UnhandledEventHandler,
EventSelector> map,
+ String topic, EventConverter convertibleEventData) {
+ List<EventTask> list = new ArrayList<>();
+ for(Entry<UnhandledEventHandler, EventSelector> e : map.entrySet()) {
+ if(e.getValue().matches(topic, convertibleEventData)) {
+ UnhandledEventHandler handler = e.getKey();
+ list.add(new UnhandledEventTask(topic,
convertibleEventData, handler));
+ }
+ }
+ return list;
+ }
+
static void checkTopicSyntax(String topic) {
String msg = checkTopicSyntax(topic, false);
if(msg != null) {
diff --git
a/org.apache.aries.typedevent.bus/src/test/java/org/apache/aries/typedevent/bus/osgi/UnhandledEventHandlerIntegrationTest.java
b/org.apache.aries.typedevent.bus/src/test/java/org/apache/aries/typedevent/bus/osgi/UnhandledEventHandlerIntegrationTest.java
index 5a8675e..c633122 100644
---
a/org.apache.aries.typedevent.bus/src/test/java/org/apache/aries/typedevent/bus/osgi/UnhandledEventHandlerIntegrationTest.java
+++
b/org.apache.aries.typedevent.bus/src/test/java/org/apache/aries/typedevent/bus/osgi/UnhandledEventHandlerIntegrationTest.java
@@ -25,6 +25,7 @@ import static org.mockito.Mockito.after;
import static org.mockito.Mockito.timeout;
import static org.mockito.Mockito.verify;
import static
org.osgi.service.typedevent.TypedEventConstants.TYPED_EVENT_FILTER;
+import static
org.osgi.service.typedevent.TypedEventConstants.TYPED_EVENT_TOPICS;
import java.util.Dictionary;
import java.util.Hashtable;
@@ -34,6 +35,7 @@ import
org.apache.aries.typedevent.bus.common.TestEventConsumer;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.Mock;
+import org.mockito.Mockito;
import org.mockito.junit.jupiter.MockitoExtension;
import org.osgi.framework.BundleContext;
import org.osgi.service.typedevent.TypedEventBus;
@@ -71,6 +73,9 @@ public class UnhandledEventHandlerIntegrationTest extends
AbstractIntegrationTes
@Mock
UnhandledEventHandler unhandledEventHandler;
+
+ @Mock
+ UnhandledEventHandler unhandledEventHandlerB;
/**
* Tests that the unhandledEventHandler gets called appropriately
@@ -149,5 +154,47 @@ public class UnhandledEventHandlerIntegrationTest extends
AbstractIntegrationTes
argThat(isUntypedTestEventWithMessage("bar")));
}
+
+ /**
+ * Tests that the consumer of last resort gets called appropriately
+ * @throws InterruptedException
+ */
+ @Test
+ public void testTopicFilterUnhandled() throws InterruptedException {
+
+ Dictionary<String, Object> props = new Hashtable<>();
+ props.put(TYPED_EVENT_TOPICS, TEST_EVENT_TOPIC);
+
+ regs.add(context.registerService(UnhandledEventHandler.class,
unhandledEventHandler, props));
+
+ props = new Hashtable<>();
+
+ regs.add(context.registerService(UnhandledEventHandler.class,
unhandledEventHandlerB, props));
+
+
+ TestEvent event = new TestEvent();
+ event.message = "foo";
+
+ eventBus.deliver(TEST_EVENT_TOPIC, event);
+
+ verify(unhandledEventHandler,
timeout(1000)).notifyUnhandled(eq(TEST_EVENT_TOPIC),
+ argThat(isUntypedTestEventWithMessage("foo")));
+
+ verify(unhandledEventHandlerB,
timeout(1000)).notifyUnhandled(anyString(),
argThat(isUntypedTestEventWithMessage("foo")));
+
+
+ event = new TestEvent();
+ event.message = "bar";
+
+
+ eventBus.deliver(TEST_EVENT_2_TOPIC, event);
+
+ verify(unhandledEventHandler,
after(1000).never()).notifyUnhandled(eq(TEST_EVENT_2_TOPIC),
+ Mockito.any());
+
+ verify(unhandledEventHandlerB,
timeout(1000)).notifyUnhandled(eq(TEST_EVENT_2_TOPIC),
+ argThat(isUntypedTestEventWithMessage("bar")));
+
+ }
}
\ No newline at end of file