This is an automated email from the ASF dual-hosted git repository.

timothyjward pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/aries-typedevent.git


The following commit(s) were added to refs/heads/main by this push:
     new 3129108  Add support for wildcard listeners
3129108 is described below

commit 3129108fb810ab0dbbc05961473a69e5491294d4
Author: Tim Ward <[email protected]>
AuthorDate: Wed Oct 12 17:13:59 2022 +0100

    Add support for wildcard listeners
---
 .../typedevent/bus/impl/TypedEventBusImpl.java     | 154 +++++++++++++++++----
 .../typedevent/bus/impl/TypedEventBusImplTest.java |  82 +++++++++++
 org.apache.aries.typedevent.bus/test.bndrun        |   6 +-
 .../test.bndrun                                    |  10 +-
 4 files changed, 216 insertions(+), 36 deletions(-)

diff --git 
a/org.apache.aries.typedevent.bus/src/main/java/org/apache/aries/typedevent/bus/impl/TypedEventBusImpl.java
 
b/org.apache.aries.typedevent.bus/src/main/java/org/apache/aries/typedevent/bus/impl/TypedEventBusImpl.java
index f7c5729..e055443 100644
--- 
a/org.apache.aries.typedevent.bus/src/main/java/org/apache/aries/typedevent/bus/impl/TypedEventBusImpl.java
+++ 
b/org.apache.aries.typedevent.bus/src/main/java/org/apache/aries/typedevent/bus/impl/TypedEventBusImpl.java
@@ -34,11 +34,12 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
+import java.util.NavigableMap;
 import java.util.Objects;
+import java.util.TreeMap;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.stream.Stream;
 
 import org.osgi.annotation.bundle.Capability;
 import org.osgi.framework.Bundle;
@@ -69,6 +70,12 @@ public class TypedEventBusImpl implements TypedEventBus {
      * the map should be copied as the contents are not thread safe.
      */
     private final Map<String, Map<TypedEventHandler<?>, Filter>> 
topicsToTypedHandlers = new HashMap<>();
+  
+    /**
+     * Map access and mutation must be synchronized on {@link #lock}. Values 
from
+     * the map should be copied as the contents are not thread safe.
+     */
+    private final NavigableMap<String, Map<TypedEventHandler<?>, Filter>> 
wildcardTopicsToTypedHandlers = new TreeMap<>();
 
     /**
      * Map access and mutation must be synchronized on {@link #lock}. Values 
from
@@ -80,7 +87,13 @@ public class TypedEventBusImpl implements TypedEventBus {
      * Map access and mutation must be synchronized on {@link #lock}. Values 
from
      * the map should be copied as the contents are not thread safe.
      */
-    private final Map<String, Map<UntypedEventHandler, Filter>> 
topicsToUntypedHandlers = new HashMap<>();
+    private final NavigableMap<String, Map<UntypedEventHandler, Filter>> 
topicsToUntypedHandlers = new TreeMap<>();
+
+    /**
+     * Map access and mutation must be synchronized on {@link #lock}. Values 
from
+     * the map should be copied as the contents are not thread safe.
+     */
+    private final Map<String, Map<UntypedEventHandler, Filter>> 
wildcardTopicsToUntypedHandlers = new HashMap<>();
 
     /**
      * List access and mutation must be synchronized on {@link #lock}.
@@ -124,7 +137,7 @@ public class TypedEventBusImpl implements TypedEventBus {
         
         String defaultTopic = clazz == null ? null : 
clazz.getName().replace(".", "/");
 
-        doAddEventHandler(topicsToTypedHandlers, knownTypedHandlers, handler, 
defaultTopic, properties);
+        doAddEventHandler(topicsToTypedHandlers, 
wildcardTopicsToTypedHandlers, knownTypedHandlers, handler, defaultTopic, 
properties);
     }
 
     private Class<?> discoverTypeForTypedHandler(Bundle registeringBundle, 
TypedEventHandler<?> handler, Map<String, Object> properties) {
@@ -193,11 +206,11 @@ public class TypedEventBusImpl implements TypedEventBus {
     }
 
     void addUntypedEventHandler(UntypedEventHandler handler, Map<String, 
Object> properties) {
-        doAddEventHandler(topicsToUntypedHandlers, knownUntypedHandlers, 
handler, null, properties);
+        doAddEventHandler(topicsToUntypedHandlers, 
wildcardTopicsToUntypedHandlers, knownUntypedHandlers, handler, null, 
properties);
     }
 
-    private <T> void doAddEventHandler(Map<String, Map<T, Filter>> map, 
Map<Long, T> idMap, 
-            T handler, String defaultTopic, Map<String, Object> properties) {
+    private <T> void doAddEventHandler(Map<String, Map<T, Filter>> map, 
Map<String, Map<T, Filter>> wildcardMap, 
+               Map<Long, T> idMap, T handler, String defaultTopic, Map<String, 
Object> properties) {
 
         Object prop = properties.get(TypedEventConstants.TYPED_EVENT_TOPICS);
 
@@ -212,7 +225,17 @@ public class TypedEventBusImpl implements TypedEventBus {
         } else {
             topicList = standardConverter().convert(prop).to(LIST_OF_STRINGS);
         }
-
+        
+        topicList = topicList.stream()
+                       .filter(s -> {
+                               String msg = checkTopicSyntax(s, true);
+                               if(msg != null) {
+                                       // TODO log this
+                               }
+                               return msg == null;
+                       })
+                       .collect(toList());
+        
         Long serviceId = getServiceId(properties);
 
         Filter f;
@@ -228,10 +251,19 @@ public class TypedEventBusImpl implements TypedEventBus {
             knownHandlers.put(serviceId, topicList);
             idMap.put(serviceId, handler);
         
-            topicList.forEach(s -> {
-                Map<T, Filter> handlers = map.computeIfAbsent(s, x1 -> new 
HashMap<>());
-                handlers.put(handler, f);
-            });
+            for(String s : topicList) {
+               Map<String, Map<T, Filter>> mapToUse;
+               String topicToUse;
+               if(isWildcard(s)) {
+                       mapToUse = wildcardMap;
+                       topicToUse = s.length() == 1 ? "" : s.substring(0, 
s.length() - 2);
+               } else {
+                       mapToUse = map;
+                       topicToUse = s;
+               }
+               Map<T, Filter> handlers = mapToUse.computeIfAbsent(topicToUse, 
x1 -> new HashMap<>());
+               handlers.put(handler, f);
+            }
         }
     }
 
@@ -305,21 +337,21 @@ public class TypedEventBusImpl implements TypedEventBus {
         
         String defaultTopic = clazz == null ? null : 
clazz.getName().replace(".", "/");
         
-        doUpdatedEventHandler(topicsToTypedHandlers, knownTypedHandlers, 
defaultTopic, properties);
+        doUpdatedEventHandler(topicsToTypedHandlers, 
wildcardTopicsToTypedHandlers, knownTypedHandlers, defaultTopic, properties);
     }
 
     void updatedUntypedEventHandler(Map<String, Object> properties) {
-        doUpdatedEventHandler(topicsToUntypedHandlers, knownUntypedHandlers, 
null, properties);
+        doUpdatedEventHandler(topicsToUntypedHandlers, 
wildcardTopicsToUntypedHandlers, knownUntypedHandlers, null, properties);
     }
 
-    private <T> void doUpdatedEventHandler(Map<String, Map<T, Filter>> map, 
Map<Long,T> idToHandler, String defaultTopic,
+    private <T> void doUpdatedEventHandler(Map<String, Map<T, Filter>> map, 
Map<String, Map<T, Filter>> wildcardMap, Map<Long,T> idToHandler, String 
defaultTopic,
             Map<String, Object> properties) {
         Long serviceId = getServiceId(properties);
 
         synchronized (lock) {
             T handler = idToHandler.get(serviceId);
             doRemoveEventHandler(map, idToHandler, handler, serviceId);
-            doAddEventHandler(map, idToHandler, handler, defaultTopic, 
properties);
+            doAddEventHandler(map, wildcardMap, idToHandler, handler, 
defaultTopic, properties);
         }
     }
 
@@ -389,20 +421,34 @@ public class TypedEventBusImpl implements TypedEventBus {
 
     private void deliver(String topic, EventConverter convertibleEventData) {
 
-        List<? extends EventTask> deliveryTasks;
+        List<EventTask> deliveryTasks;
 
         synchronized (lock) {
-            Stream<EventTask> typedDeliveries = 
topicsToTypedHandlers.getOrDefault(topic, emptyMap()).entrySet()
-                    .stream().filter(e -> e.getValue() == null || 
convertibleEventData.applyFilter(e.getValue()))
-                    .map(Entry::getKey).map(handler -> new 
TypedEventTask(topic, convertibleEventData, handler,
-                            typedHandlersToTargetClasses.get(handler)));
-
-            Stream<EventTask> untypedDeliveries = 
topicsToUntypedHandlers.getOrDefault(topic, emptyMap()).entrySet()
-                    .stream().filter(e -> e.getValue() == null || 
convertibleEventData.applyFilter(e.getValue()))
-                    .map(Entry::getKey).map(handler -> new 
UntypedEventTask(topic, convertibleEventData, handler));
-
-            deliveryTasks = Stream.concat(typedDeliveries, 
untypedDeliveries).collect(toList());
-
+               List<EventTask> typedDeliveries = toTypedEventTasks(
+                               topicsToTypedHandlers.getOrDefault(topic, 
emptyMap()), topic, convertibleEventData);
+
+            List<EventTask> untypedDeliveries = toUntypedEventTasks(
+                       topicsToUntypedHandlers.getOrDefault(topic, 
emptyMap()), topic, convertibleEventData);
+
+            List<EventTask> wildcardDeliveries = new ArrayList<>();
+            String truncatedTopic = topic;
+            do {
+               int idx = truncatedTopic.lastIndexOf('/');
+               truncatedTopic = idx > 0 ? truncatedTopic.substring(0, idx) : 
"";
+               wildcardDeliveries.addAll(toTypedEventTasks(
+                               
wildcardTopicsToTypedHandlers.getOrDefault(truncatedTopic, emptyMap()), 
+                               topic, convertibleEventData));
+               wildcardDeliveries.addAll(toUntypedEventTasks(
+                               
wildcardTopicsToUntypedHandlers.getOrDefault(truncatedTopic, emptyMap()), 
+                               topic, convertibleEventData));
+            } while (truncatedTopic.length() > 0);
+
+            deliveryTasks = new ArrayList<>(typedDeliveries.size() + 
untypedDeliveries.size() + wildcardDeliveries.size());
+
+            deliveryTasks.addAll(typedDeliveries);
+            deliveryTasks.addAll(untypedDeliveries);
+            deliveryTasks.addAll(wildcardDeliveries);
+            
             if (deliveryTasks.isEmpty()) {
                 // TODO log properly
                 System.out.println("Unhandled Event Handlers are being used 
for event sent to topic " + topic);
@@ -415,8 +461,42 @@ public class TypedEventBusImpl implements TypedEventBus {
 
         queue.addAll(deliveryTasks);
     }
+    
+    private List<EventTask> toTypedEventTasks(Map<TypedEventHandler<?>, 
Filter> map, 
+               String topic, EventConverter convertibleEventData) {
+       List<EventTask> list = new ArrayList<>();
+       for(Entry<TypedEventHandler<?>, Filter> e : map.entrySet()) {
+               Filter f = e.getValue();
+               if(f == null || convertibleEventData.applyFilter(f)) {
+                       TypedEventHandler<?> handler = e.getKey();
+                       list.add(new TypedEventTask(topic, 
convertibleEventData, handler,
+                typedHandlersToTargetClasses.get(handler)));
+               }
+       }
+       return list;
+    }
+
+    private List<EventTask> toUntypedEventTasks(Map<UntypedEventHandler, 
Filter> map, 
+               String topic, EventConverter convertibleEventData) {
+       List<EventTask> list = new ArrayList<>();
+       for(Entry<UntypedEventHandler, Filter> e : map.entrySet()) {
+               Filter f = e.getValue();
+               if(f == null || convertibleEventData.applyFilter(f)) {
+                       UntypedEventHandler handler = e.getKey();
+                       list.add(new UntypedEventTask(topic, 
convertibleEventData, handler));
+               }
+       }
+       return list;
+    }
 
     private static void checkTopicSyntax(String topic) {
+       String msg = checkTopicSyntax(topic, false);
+       if(msg != null) {
+               throw new IllegalArgumentException(msg);
+       }
+    }
+    
+    private static String checkTopicSyntax(String topic, boolean 
wildcardPermitted) {
        
        if(topic == null) {
                throw new IllegalArgumentException("The topic name is not 
permitted to be null");
@@ -425,6 +505,19 @@ public class TypedEventBusImpl implements TypedEventBus {
        boolean slashPermitted = false;
        for(int i = 0; i < topic.length(); i++) {
                int c = topic.codePointAt(i);
+               if('*' == c) {
+                       if(!wildcardPermitted) {
+                               return "Wildcard topics may not be used for 
sending events";
+                       }
+                       if(topic.length() != i + 1) {
+                               return "The wildcard * is only permitted at the 
end of the topic";
+                       }
+                       if(topic.length() > 1 && topic.codePointAt(i - 1) != 
'/') {
+                               return "The wildcard must be preceded by a / 
unless it is the only character in the topic string";
+                       }
+                       continue;
+               }
+               
                if('/' == c) {
                        if(slashPermitted && i != (topic.length() - 1)) {
                                slashPermitted = false;
@@ -434,8 +527,13 @@ public class TypedEventBusImpl implements TypedEventBus {
                        slashPermitted = true;
                        continue;
                }
-               throw new IllegalArgumentException("Illegal character " + c + " 
at index " + i + " of topic string: " + topic);
+               return "Illegal character " + c + " at index " + i + " of topic 
string: " + topic;
        }
+       return null;
+    }
+    
+    private static boolean isWildcard(String topic) {
+       return topic.equals("*") || topic.endsWith("/*");
     }
     
     private class EventThread extends Thread {
diff --git 
a/org.apache.aries.typedevent.bus/src/test/java/org/apache/aries/typedevent/bus/impl/TypedEventBusImplTest.java
 
b/org.apache.aries.typedevent.bus/src/test/java/org/apache/aries/typedevent/bus/impl/TypedEventBusImplTest.java
index 9566b6d..017e1be 100644
--- 
a/org.apache.aries.typedevent.bus/src/test/java/org/apache/aries/typedevent/bus/impl/TypedEventBusImplTest.java
+++ 
b/org.apache.aries.typedevent.bus/src/test/java/org/apache/aries/typedevent/bus/impl/TypedEventBusImplTest.java
@@ -50,6 +50,8 @@ public class TypedEventBusImplTest {
     private static final String SPECIAL_TEST_EVENT_TOPIC = 
SpecialTestEvent.class.getName().replace(".", "/");
 
     private static final String TEST_EVENT_TOPIC = 
TestEvent.class.getName().replace(".", "/");
+    private static final String TEST_EVENT_WILDCARD_TOPIC = 
"org/apache/aries/typedevent/*";
+    private static final String BASE_WILDCARD_TOPIC = "*";
 
     public static class TestEvent {
         public String message;
@@ -187,6 +189,86 @@ public class TypedEventBusImplTest {
         assertFalse(untypedSemB.tryAcquire(1, TimeUnit.SECONDS));
 
     }
+
+    /**
+     * Tests that events are delivered to Smart Behaviours based on type
+     * 
+     * @throws InterruptedException
+     */
+    @Test
+    public void testWildcardEventReceiving() throws InterruptedException {
+       
+       TestEvent event = new TestEvent();
+       event.message = "boo";
+       
+       Map<String, Object> serviceProperties = new HashMap<>();
+       
+       serviceProperties.put(TYPED_EVENT_TOPICS, TEST_EVENT_WILDCARD_TOPIC);
+       serviceProperties.put(TYPED_EVENT_TYPE, TestEvent.class.getName());
+       serviceProperties.put(SERVICE_ID, 42L);
+       
+       impl.addTypedEventHandler(registeringBundle, handlerA, 
serviceProperties);
+       
+       serviceProperties = new HashMap<>();
+       
+       serviceProperties.put(TYPED_EVENT_TOPICS, BASE_WILDCARD_TOPIC);
+       serviceProperties.put(TYPED_EVENT_TYPE, TestEvent.class.getName());
+       serviceProperties.put(SERVICE_ID, 43L);
+       
+       impl.addTypedEventHandler(registeringBundle, handlerB, 
serviceProperties);
+       
+       serviceProperties = new HashMap<>();
+       
+       serviceProperties.put(TYPED_EVENT_TOPICS, TEST_EVENT_WILDCARD_TOPIC);
+       serviceProperties.put(SERVICE_ID, 44L);
+       
+       impl.addUntypedEventHandler(untypedHandlerA, serviceProperties);
+       
+       serviceProperties = new HashMap<>();
+       
+       serviceProperties.put(TYPED_EVENT_TOPICS, BASE_WILDCARD_TOPIC);
+       serviceProperties.put(SERVICE_ID, 45L);
+       
+       impl.addUntypedEventHandler(untypedHandlerB, serviceProperties);
+       
+       impl.deliver(event);
+       
+       assertTrue(semA.tryAcquire(1, TimeUnit.SECONDS));
+       
+       
Mockito.verify(handlerA).notify(Mockito.eq(TestEvent.class.getName().replace('.',
 '/')),
+                       Mockito.argThat(isTestEventWithMessage("boo")));
+       
+       assertTrue(semB.tryAcquire(1, TimeUnit.SECONDS));
+       
+       
Mockito.verify(handlerB).notify(Mockito.eq(TestEvent.class.getName().replace('.',
 '/')),
+                       Mockito.argThat(isTestEventWithMessage("boo")));
+       
+       assertTrue(untypedSemA.tryAcquire(1, TimeUnit.SECONDS));
+       
+       
Mockito.verify(untypedHandlerA).notifyUntyped(Mockito.eq(TestEvent.class.getName().replace('.',
 '/')),
+                       Mockito.argThat(isUntypedTestEventWithMessage("boo")));
+       
+       assertTrue(untypedSemB.tryAcquire(1, TimeUnit.SECONDS));
+       
+       
Mockito.verify(untypedHandlerB).notifyUntyped(Mockito.eq(TestEvent.class.getName().replace('.',
 '/')),
+                       Mockito.argThat(isUntypedTestEventWithMessage("boo")));
+
+       impl.deliver("some/other/topic", event);
+       
+       assertFalse(semA.tryAcquire(1, TimeUnit.SECONDS));
+       
+       assertTrue(semB.tryAcquire(1, TimeUnit.SECONDS));
+       
+       
Mockito.verify(handlerB).notify(Mockito.eq(TestEvent.class.getName().replace('.',
 '/')),
+                       Mockito.argThat(isTestEventWithMessage("boo")));
+       
+       assertFalse(untypedSemA.tryAcquire(1, TimeUnit.SECONDS));
+       
+       assertTrue(untypedSemB.tryAcquire(1, TimeUnit.SECONDS));
+       
+       
Mockito.verify(untypedHandlerB).notifyUntyped(Mockito.eq(TestEvent.class.getName().replace('.',
 '/')),
+                       Mockito.argThat(isUntypedTestEventWithMessage("boo")));
+    }
     
     public static class TestEventHandler implements 
TypedEventHandler<TestEvent> {
 
diff --git a/org.apache.aries.typedevent.bus/test.bndrun 
b/org.apache.aries.typedevent.bus/test.bndrun
index 30c45fb..f3ec6ee 100644
--- a/org.apache.aries.typedevent.bus/test.bndrun
+++ b/org.apache.aries.typedevent.bus/test.bndrun
@@ -30,7 +30,7 @@
        ch.qos.logback.classic;version='[1.2.3,1.2.4)',\
        ch.qos.logback.core;version='[1.2.3,1.2.4)',\
        org.apache.aries.component-dsl.component-dsl;version='[1.2.2,1.2.3)',\
-       org.apache.aries.typedevent.bus;version='[0.0.1,0.0.2)',\
+       org.apache.aries.typedevent.bus;version='[0.0.2,0.0.3)',\
        org.apache.felix.configadmin;version='[1.9.18,1.9.19)',\
        org.apache.felix.converter;version='[1.0.14,1.0.15)',\
        org.osgi.service.typedevent;version='[1.0.0,1.0.1)',\
@@ -42,7 +42,6 @@
        junit-platform-commons;version='[1.6.2,1.6.3)',\
        net.bytebuddy.byte-buddy;version='[1.10.13,1.10.14)',\
        net.bytebuddy.byte-buddy-agent;version='[1.10.13,1.10.14)',\
-       org.apache.aries.typedevent.bus-tests;version='[0.0.1,0.0.2)',\
        org.mockito.mockito-core;version='[3.5.10,3.5.11)',\
        org.objenesis;version='[3.1.0,3.1.1)',\
        org.opentest4j;version='[1.2.0,1.2.1)',\
@@ -50,4 +49,5 @@
        org.osgi.test.junit5;version='[0.9.0,0.9.1)',\
        junit-platform-engine;version='[1.6.2,1.6.3)',\
        junit-platform-launcher;version='[1.6.2,1.6.3)',\
-       junit-jupiter-engine;version='[5.6.2,5.6.3)'
\ No newline at end of file
+       junit-jupiter-engine;version='[5.6.2,5.6.3)',\
+       org.apache.aries.typedevent.bus-tests;version='[0.0.2,0.0.3)'
\ No newline at end of file
diff --git 
a/org.apache.aries.typedevent.remote/org.apache.aries.typedevent.remote.remoteservices/test.bndrun
 
b/org.apache.aries.typedevent.remote/org.apache.aries.typedevent.remote.remoteservices/test.bndrun
index 9729e07..dcbf3dd 100644
--- 
a/org.apache.aries.typedevent.remote/org.apache.aries.typedevent.remote.remoteservices/test.bndrun
+++ 
b/org.apache.aries.typedevent.remote/org.apache.aries.typedevent.remote.remoteservices/test.bndrun
@@ -49,8 +49,8 @@
        junit-platform-engine;version='[1.6.2,1.6.3)',\
        junit-platform-launcher;version='[1.6.2,1.6.3)',\
        junit-jupiter-engine;version='[5.6.2,5.6.3)',\
-       org.apache.aries.typedevent.remote.api;version='[0.0.1,0.0.2)',\
-       
org.apache.aries.typedevent.remote.remoteservices;version='[0.0.1,0.0.2)',\
-       
org.apache.aries.typedevent.remote.remoteservices-tests;version='[0.0.1,0.0.2)',\
-       org.apache.aries.typedevent.remote.spi;version='[0.0.1,0.0.2)',\
-       org.apache.aries.typedevent.bus;version='[0.0.1,0.0.2)'
+       org.apache.aries.typedevent.bus;version='[0.0.2,0.0.3)',\
+       org.apache.aries.typedevent.remote.api;version='[0.0.2,0.0.3)',\
+       
org.apache.aries.typedevent.remote.remoteservices;version='[0.0.2,0.0.3)',\
+       
org.apache.aries.typedevent.remote.remoteservices-tests;version='[0.0.2,0.0.3)',\
+       org.apache.aries.typedevent.remote.spi;version='[0.0.2,0.0.3)'

Reply via email to