This is an automated email from the ASF dual-hosted git repository.

timothyjward pushed a commit to branch feature/v1.1
in repository https://gitbox.apache.org/repos/asf/aries-typedevent.git


The following commit(s) were added to refs/heads/feature/v1.1 by this push:
     new 81675be  Support Event Topic restrictions for UnhandledEventHandler 
services
81675be is described below

commit 81675be2cd4c9fa68d7fe3ddb5c6bb316beebf1c
Author: Tim Ward <[email protected]>
AuthorDate: Wed Mar 6 17:40:06 2024 +0000

    Support Event Topic restrictions for UnhandledEventHandler services
    
    As per the draft specification we should support the event.topics property 
for UnhandledEventHandler services.
    
    Signed-off-by: Tim Ward <[email protected]>
---
 .../typedevent/bus/impl/TypedEventBusImpl.java     | 101 ++++++++++++++++-----
 .../osgi/UnhandledEventHandlerIntegrationTest.java |  47 ++++++++++
 2 files changed, 124 insertions(+), 24 deletions(-)

diff --git 
a/org.apache.aries.typedevent.bus/src/main/java/org/apache/aries/typedevent/bus/impl/TypedEventBusImpl.java
 
b/org.apache.aries.typedevent.bus/src/main/java/org/apache/aries/typedevent/bus/impl/TypedEventBusImpl.java
index a056d0f..c204649 100644
--- 
a/org.apache.aries.typedevent.bus/src/main/java/org/apache/aries/typedevent/bus/impl/TypedEventBusImpl.java
+++ 
b/org.apache.aries.typedevent.bus/src/main/java/org/apache/aries/typedevent/bus/impl/TypedEventBusImpl.java
@@ -25,6 +25,7 @@ import static 
org.osgi.service.typedevent.TypedEventConstants.TYPED_EVENT_FILTER
 import static 
org.osgi.service.typedevent.TypedEventConstants.TYPED_EVENT_HISTORY;
 import static 
org.osgi.service.typedevent.TypedEventConstants.TYPED_EVENT_IMPLEMENTATION;
 import static 
org.osgi.service.typedevent.TypedEventConstants.TYPED_EVENT_SPECIFICATION_VERSION;
+import static 
org.osgi.service.typedevent.TypedEventConstants.TYPED_EVENT_TOPICS;
 import static org.osgi.util.converter.Converters.standardConverter;
 
 import java.lang.reflect.ParameterizedType;
@@ -40,6 +41,7 @@ import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.function.BiFunction;
+import java.util.function.Function;
 
 import org.osgi.annotation.bundle.Capability;
 import org.osgi.framework.Bundle;
@@ -97,9 +99,16 @@ public class TypedEventBusImpl implements TypedEventBus {
     private final Map<String, Map<UntypedEventHandler, EventSelector>> 
wildcardTopicsToUntypedHandlers = new HashMap<>();
 
     /**
-     * List access and mutation must be synchronized on {@link #lock}.
+     * Map access and mutation must be synchronized on {@link #lock}. Values 
from
+     * the map should be copied as the contents are not thread safe.
+     */
+    private final Map<String, Map<UnhandledEventHandler, EventSelector>> 
topicsToUnhandledHandlers = new HashMap<>();
+
+    /**
+     * Map access and mutation must be synchronized on {@link #lock}. Values 
from
+     * the map should be copied as the contents are not thread safe.
      */
-    private final List<UnhandledEventHandler> unhandledEventHandlers = new 
ArrayList<>();
+    private final Map<String, Map<UnhandledEventHandler, EventSelector>> 
wildcardTopicsToUnhandledHandlers = new HashMap<>();
 
     /**
      * Map access and mutation must be synchronized on {@link #lock}. Values 
from
@@ -119,6 +128,12 @@ public class TypedEventBusImpl implements TypedEventBus {
      */
     private final Map<Long, UntypedEventHandler> knownUntypedHandlers = new 
HashMap<>();
 
+    /**
+     * Map access and mutation must be synchronized on {@link #lock}. Values 
from
+     * the map should be copied as the contents are not thread safe.
+     */
+    private final Map<Long, UnhandledEventHandler> knownUnhandledHandlers = 
new HashMap<>();
+
     private final BlockingQueue<EventTask> queue = new LinkedBlockingQueue<>();
 
     /**
@@ -394,15 +409,31 @@ public class TypedEventBusImpl implements TypedEventBus {
     }
 
     void addUnhandledEventHandler(UnhandledEventHandler handler, Map<String, 
Object> properties) {
-        synchronized (lock) {
-            unhandledEventHandlers.add(handler);
-        }
+       properties = clearHistoryAndAddTopic(properties);
+       
+       doAddEventHandler(topicsToUnhandledHandlers, 
wildcardTopicsToUnhandledHandlers, knownUnhandledHandlers, handler, null, 
properties, null);
+    }
+
+       private Map<String, Object> clearHistoryAndAddTopic(Map<String, Object> 
properties) {
+               if(properties.containsKey(TYPED_EVENT_HISTORY) || 
!properties.containsKey(TYPED_EVENT_TOPICS)) {
+               // TODO log warning
+               properties = new HashMap<>(properties);
+               properties.remove(TYPED_EVENT_HISTORY);
+               properties.put(TYPED_EVENT_TOPICS, "*");
+       }
+               return properties;
+       }
+    
+    void updatedUnhandledEventHandler(Map<String, Object> properties) {
+       properties = clearHistoryAndAddTopic(properties);
+       
+        doUpdatedEventHandler(topicsToUnhandledHandlers, 
wildcardTopicsToUnhandledHandlers, knownUnhandledHandlers, null, properties);
     }
 
     void removeUnhandledEventHandler(UnhandledEventHandler handler, 
Map<String, Object> properties) {
-        synchronized (lock) {
-            unhandledEventHandlers.remove(handler);
-        }
+       Long serviceId = getServiceId(properties);
+
+        doRemoveEventHandler(topicsToUnhandledHandlers, 
wildcardTopicsToUnhandledHandlers, knownUnhandledHandlers, handler, serviceId);
     }
 
     void start() {
@@ -468,19 +499,12 @@ public class TypedEventBusImpl implements TypedEventBus {
             List<EventTask> untypedDeliveries = toUntypedEventTasks(
                        topicsToUntypedHandlers.getOrDefault(topic, 
emptyMap()), topic, convertibleEventData);
 
-            List<EventTask> wildcardDeliveries = new ArrayList<>();
-            String truncatedTopic = topic;
-            do {
-               int idx = truncatedTopic.lastIndexOf('/', 
truncatedTopic.length() - 2);
-               truncatedTopic = idx > 0 ? truncatedTopic.substring(0, idx + 1) 
: "";
-               wildcardDeliveries.addAll(toTypedEventTasks(
-                               
wildcardTopicsToTypedHandlers.getOrDefault(truncatedTopic, emptyMap()), 
-                               topic, convertibleEventData));
-               wildcardDeliveries.addAll(toUntypedEventTasks(
-                               
wildcardTopicsToUntypedHandlers.getOrDefault(truncatedTopic, emptyMap()), 
-                               topic, convertibleEventData));
-            } while (truncatedTopic.length() > 0);
-
+            List<EventTask> wildcardDeliveries = findWildcardTasks(topic, 
convertibleEventData,
+                       s -> 
toTypedEventTasks(wildcardTopicsToTypedHandlers.getOrDefault(s, emptyMap()), 
+                                       topic, convertibleEventData),
+                       s -> 
toUntypedEventTasks(wildcardTopicsToUntypedHandlers.getOrDefault(s, 
emptyMap()), 
+                                       topic, convertibleEventData));
+            
             deliveryTasks = new ArrayList<>(typedDeliveries.size() + 
untypedDeliveries.size() + wildcardDeliveries.size());
 
             deliveryTasks.addAll(typedDeliveries);
@@ -490,16 +514,33 @@ public class TypedEventBusImpl implements TypedEventBus {
             if (deliveryTasks.isEmpty()) {
                 // TODO log properly
                 System.out.println("Unhandled Event Handlers are being used 
for event sent to topic " + topic);
-                deliveryTasks = unhandledEventHandlers.stream()
-                        .map(handler -> new UnhandledEventTask(topic, 
convertibleEventData, handler)).collect(toList());
+                
deliveryTasks.addAll(toUnhandledEventTasks(topicsToUnhandledHandlers.getOrDefault(topic,
 emptyMap()), 
+                               topic, convertibleEventData));
+                deliveryTasks.addAll(findWildcardTasks(topic, 
convertibleEventData, 
+                               s -> 
toUnhandledEventTasks(wildcardTopicsToUnhandledHandlers.getOrDefault(s, 
emptyMap()), 
+                                               topic, convertibleEventData)));
             }
             // This occurs inside the lock to ensure history replay doesn't 
miss events
             queue.add(new MonitorEventTask(topic, convertibleEventData, 
monitorImpl));
         }
 
-
         queue.addAll(deliveryTasks);
     }
+
+       @SafeVarargs
+       private final List<EventTask> findWildcardTasks(String topic, 
EventConverter convertibleEventData,
+                       Function<String, List<? extends EventTask>>... 
additions) {
+               List<EventTask> wildcardDeliveries = new ArrayList<>();
+               String truncatedTopic = topic;
+               do {
+                       int idx = truncatedTopic.lastIndexOf('/', 
truncatedTopic.length() - 2);
+                       truncatedTopic = idx > 0 ? truncatedTopic.substring(0, 
idx + 1) : "";
+                       for(Function<String, List<? extends EventTask>> f : 
additions) {
+                               
wildcardDeliveries.addAll(f.apply(truncatedTopic));
+                       }
+               } while (truncatedTopic.length() > 0);
+               return wildcardDeliveries;
+       }
     
     private List<EventTask> toTypedEventTasks(Map<TypedEventHandler<?>, 
EventSelector> map, 
                String topic, EventConverter convertibleEventData) {
@@ -526,6 +567,18 @@ public class TypedEventBusImpl implements TypedEventBus {
        return list;
     }
 
+    private List<EventTask> toUnhandledEventTasks(Map<UnhandledEventHandler, 
EventSelector> map, 
+               String topic, EventConverter convertibleEventData) {
+       List<EventTask> list = new ArrayList<>();
+       for(Entry<UnhandledEventHandler, EventSelector> e : map.entrySet()) {
+               if(e.getValue().matches(topic, convertibleEventData)) {
+                       UnhandledEventHandler handler = e.getKey();
+                       list.add(new UnhandledEventTask(topic, 
convertibleEventData, handler));
+               }
+       }
+       return list;
+    }
+
     static void checkTopicSyntax(String topic) {
        String msg = checkTopicSyntax(topic, false);
        if(msg != null) {
diff --git 
a/org.apache.aries.typedevent.bus/src/test/java/org/apache/aries/typedevent/bus/osgi/UnhandledEventHandlerIntegrationTest.java
 
b/org.apache.aries.typedevent.bus/src/test/java/org/apache/aries/typedevent/bus/osgi/UnhandledEventHandlerIntegrationTest.java
index 5a8675e..c633122 100644
--- 
a/org.apache.aries.typedevent.bus/src/test/java/org/apache/aries/typedevent/bus/osgi/UnhandledEventHandlerIntegrationTest.java
+++ 
b/org.apache.aries.typedevent.bus/src/test/java/org/apache/aries/typedevent/bus/osgi/UnhandledEventHandlerIntegrationTest.java
@@ -25,6 +25,7 @@ import static org.mockito.Mockito.after;
 import static org.mockito.Mockito.timeout;
 import static org.mockito.Mockito.verify;
 import static 
org.osgi.service.typedevent.TypedEventConstants.TYPED_EVENT_FILTER;
+import static 
org.osgi.service.typedevent.TypedEventConstants.TYPED_EVENT_TOPICS;
 
 import java.util.Dictionary;
 import java.util.Hashtable;
@@ -34,6 +35,7 @@ import 
org.apache.aries.typedevent.bus.common.TestEventConsumer;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.extension.ExtendWith;
 import org.mockito.Mock;
+import org.mockito.Mockito;
 import org.mockito.junit.jupiter.MockitoExtension;
 import org.osgi.framework.BundleContext;
 import org.osgi.service.typedevent.TypedEventBus;
@@ -71,6 +73,9 @@ public class UnhandledEventHandlerIntegrationTest extends 
AbstractIntegrationTes
 
     @Mock
     UnhandledEventHandler unhandledEventHandler;
+    
+    @Mock
+    UnhandledEventHandler unhandledEventHandlerB;
 
     /**
      * Tests that the unhandledEventHandler gets called appropriately
@@ -149,5 +154,47 @@ public class UnhandledEventHandlerIntegrationTest extends 
AbstractIntegrationTes
                 argThat(isUntypedTestEventWithMessage("bar")));
         
     }
+
+    /**
+     * Tests that the consumer of last resort gets called appropriately
+     * @throws InterruptedException
+     */
+    @Test
+    public void testTopicFilterUnhandled() throws InterruptedException {
+       
+       Dictionary<String, Object> props = new Hashtable<>();
+       props.put(TYPED_EVENT_TOPICS, TEST_EVENT_TOPIC);
+       
+       regs.add(context.registerService(UnhandledEventHandler.class, 
unhandledEventHandler, props));
+       
+       props = new Hashtable<>();
+       
+       regs.add(context.registerService(UnhandledEventHandler.class, 
unhandledEventHandlerB, props));
+       
+       
+       TestEvent event = new TestEvent();
+       event.message = "foo";
+       
+       eventBus.deliver(TEST_EVENT_TOPIC, event);
+       
+       verify(unhandledEventHandler, 
timeout(1000)).notifyUnhandled(eq(TEST_EVENT_TOPIC), 
+                       argThat(isUntypedTestEventWithMessage("foo")));
+       
+       verify(unhandledEventHandlerB, 
timeout(1000)).notifyUnhandled(anyString(), 
argThat(isUntypedTestEventWithMessage("foo")));
+       
+       
+       event = new TestEvent();
+       event.message = "bar";
+       
+       
+       eventBus.deliver(TEST_EVENT_2_TOPIC, event);
+       
+       verify(unhandledEventHandler, 
after(1000).never()).notifyUnhandled(eq(TEST_EVENT_2_TOPIC), 
+                       Mockito.any());
+       
+       verify(unhandledEventHandlerB, 
timeout(1000)).notifyUnhandled(eq(TEST_EVENT_2_TOPIC), 
+                       argThat(isUntypedTestEventWithMessage("bar")));
+       
+    }
     
 }
\ No newline at end of file

Reply via email to