This is an automated email from the ASF dual-hosted git repository.

timothyjward pushed a commit to branch feature/v1.1
in repository https://gitbox.apache.org/repos/asf/aries-typedevent.git


The following commit(s) were added to refs/heads/feature/v1.1 by this push:
     new 4e16796  Add support for replaying history to registered listeners
4e16796 is described below

commit 4e16796a9af8aceb80e091078aba40435390545d
Author: Tim Ward <[email protected]>
AuthorDate: Mon Sep 11 15:46:08 2023 +0100

    Add support for replaying history to registered listeners
    
    This commit updates the implementation to support the event.history service 
property for event handler services. Event history is replayed up to the number 
of requested historical events. Note that to efficiently handle this the event 
history buffer had to be re-ordered to be most recent event first rather than 
oldest event first
    
    Signed-off-by: Tim Ward <[email protected]>
---
 .../aries/typedevent/bus/impl/EventSelector.java   |   6 +
 .../typedevent/bus/impl/HistoryReplayTask.java     |  62 ++++++++++
 .../typedevent/bus/impl/TypedEventBusImpl.java     |  38 +++++-
 .../typedevent/bus/impl/TypedEventMonitorImpl.java | 133 ++++++++++++---------
 .../bus/impl/TypedHistoryReplayTask.java           |  41 +++++++
 .../bus/impl/UntypedHistoryReplayTask.java         |  38 ++++++
 .../typedevent/bus/impl/EventSelectorTest.java     |   3 +-
 .../typedevent/bus/impl/TypedEventBusImplTest.java | 103 ++++++++++++++++
 8 files changed, 360 insertions(+), 64 deletions(-)

diff --git 
a/org.apache.aries.typedevent.bus/src/main/java/org/apache/aries/typedevent/bus/impl/EventSelector.java
 
b/org.apache.aries.typedevent.bus/src/main/java/org/apache/aries/typedevent/bus/impl/EventSelector.java
index aa44052..39ce712 100644
--- 
a/org.apache.aries.typedevent.bus/src/main/java/org/apache/aries/typedevent/bus/impl/EventSelector.java
+++ 
b/org.apache.aries.typedevent.bus/src/main/java/org/apache/aries/typedevent/bus/impl/EventSelector.java
@@ -18,6 +18,7 @@ package org.apache.aries.typedevent.bus.impl;
 
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Map;
 import java.util.function.Predicate;
 
 import org.osgi.framework.Filter;
@@ -114,6 +115,11 @@ public class EventSelector {
                // Must match the topic, and the filter if set
                return topicMatcher.test(topic) && (filter == null || 
event.applyFilter(filter));
        }
+
+       public boolean matches(String topic, Map<String, Object> event) {
+               // Must match the topic, and the filter if set
+               return topicMatcher.test(topic) && (filter == null || 
filter.matches(event));
+       }
        
        private boolean topicMatch(String topic) {
                
diff --git 
a/org.apache.aries.typedevent.bus/src/main/java/org/apache/aries/typedevent/bus/impl/HistoryReplayTask.java
 
b/org.apache.aries.typedevent.bus/src/main/java/org/apache/aries/typedevent/bus/impl/HistoryReplayTask.java
new file mode 100644
index 0000000..f0940e2
--- /dev/null
+++ 
b/org.apache.aries.typedevent.bus/src/main/java/org/apache/aries/typedevent/bus/impl/HistoryReplayTask.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aries.typedevent.bus.impl;
+
+import static java.util.stream.Collectors.toList;
+
+import java.util.List;
+import java.util.ListIterator;
+import java.util.stream.Stream;
+
+import org.osgi.service.typedevent.monitor.MonitorEvent;
+
+public abstract class HistoryReplayTask extends EventTask {
+
+       private final TypedEventMonitorImpl monitorImpl;
+       private final List<EventSelector> selectors;
+       private final Integer history;
+
+       public <T> HistoryReplayTask(TypedEventMonitorImpl monitorImpl, 
List<EventSelector> selectors,
+                       Integer history) {
+                               this.monitorImpl = monitorImpl;
+                               this.selectors = selectors;
+                               this.history = history;
+       }
+
+       @Override
+       public void notifyListener() {
+               // In most recent first order
+               List<MonitorEvent> events = 
monitorImpl.copyOfHistory(this::filterHistory);
+               
+               ListIterator<MonitorEvent> li = 
events.listIterator(events.size());
+               
+               while(li.hasPrevious()) {
+                       notifyListener(li.previous());
+               }
+       }
+       
+       private List<MonitorEvent> filterHistory(Stream<MonitorEvent> s) {
+               return 
s.filter(this::selected).limit(history).collect(toList());
+       }
+       
+       private boolean selected(MonitorEvent me) {
+               return selectors.stream().anyMatch(es -> es.matches(me.topic, 
me.eventData));
+       }
+       
+       protected abstract void notifyListener(MonitorEvent me);
+
+}
diff --git 
a/org.apache.aries.typedevent.bus/src/main/java/org/apache/aries/typedevent/bus/impl/TypedEventBusImpl.java
 
b/org.apache.aries.typedevent.bus/src/main/java/org/apache/aries/typedevent/bus/impl/TypedEventBusImpl.java
index 2d7926d..e221e2f 100644
--- 
a/org.apache.aries.typedevent.bus/src/main/java/org/apache/aries/typedevent/bus/impl/TypedEventBusImpl.java
+++ 
b/org.apache.aries.typedevent.bus/src/main/java/org/apache/aries/typedevent/bus/impl/TypedEventBusImpl.java
@@ -22,6 +22,7 @@ import static java.util.stream.Collectors.toList;
 import static 
org.osgi.namespace.implementation.ImplementationNamespace.IMPLEMENTATION_NAMESPACE;
 import static org.osgi.namespace.service.ServiceNamespace.SERVICE_NAMESPACE;
 import static 
org.osgi.service.typedevent.TypedEventConstants.TYPED_EVENT_FILTER;
+import static 
org.osgi.service.typedevent.TypedEventConstants.TYPED_EVENT_HISTORY;
 import static 
org.osgi.service.typedevent.TypedEventConstants.TYPED_EVENT_IMPLEMENTATION;
 import static 
org.osgi.service.typedevent.TypedEventConstants.TYPED_EVENT_SPECIFICATION_VERSION;
 import static org.osgi.util.converter.Converters.standardConverter;
@@ -38,6 +39,7 @@ import java.util.Objects;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.BiFunction;
 
 import org.osgi.annotation.bundle.Capability;
 import org.osgi.framework.Bundle;
@@ -136,7 +138,8 @@ public class TypedEventBusImpl implements TypedEventBus {
         
         String defaultTopic = clazz == null ? null : 
clazz.getName().replace(".", "/");
 
-        doAddEventHandler(topicsToTypedHandlers, 
wildcardTopicsToTypedHandlers, knownTypedHandlers, handler, defaultTopic, 
properties);
+        doAddEventHandler(topicsToTypedHandlers, 
wildcardTopicsToTypedHandlers, knownTypedHandlers, handler, defaultTopic, 
properties,
+                       (l,i) -> new TypedHistoryReplayTask(monitorImpl, 
handler, clazz, l, i));
     }
 
     private Class<?> discoverTypeForTypedHandler(Bundle registeringBundle, 
TypedEventHandler<?> handler, Map<String, Object> properties) {
@@ -205,11 +208,13 @@ public class TypedEventBusImpl implements TypedEventBus {
     }
 
     void addUntypedEventHandler(UntypedEventHandler handler, Map<String, 
Object> properties) {
-        doAddEventHandler(topicsToUntypedHandlers, 
wildcardTopicsToUntypedHandlers, knownUntypedHandlers, handler, null, 
properties);
+        doAddEventHandler(topicsToUntypedHandlers, 
wildcardTopicsToUntypedHandlers, knownUntypedHandlers, handler, null, 
properties,
+                       (l,i) -> new UntypedHistoryReplayTask(monitorImpl, 
handler, l, i));
     }
 
     private <T> void doAddEventHandler(Map<String, Map<T, EventSelector>> map, 
Map<String, Map<T, EventSelector>> wildcardMap, 
-               Map<Long, T> idMap, T handler, String defaultTopic, Map<String, 
Object> properties) {
+               Map<Long, T> idMap, T handler, String defaultTopic, Map<String, 
Object> properties,
+               BiFunction<List<EventSelector>, Integer, ? extends EventTask> 
historicalEvents) {
 
         Object prop = properties.get(TypedEventConstants.TYPED_EVENT_TOPICS);
 
@@ -246,10 +251,22 @@ public class TypedEventBusImpl implements TypedEventBus {
             return;
         }
 
+        Integer history = 0;
+        prop = properties.get(TYPED_EVENT_HISTORY);
+        if(prop != null) {
+               try {
+                       history = Integer.parseInt(String.valueOf(prop));
+               } catch (NumberFormatException nfe) {
+                       // TODO log a bad history property
+               }
+        }
+        
         synchronized (lock) {
             knownHandlers.put(serviceId, topicList);
             idMap.put(serviceId, handler);
         
+            List<EventSelector> selectors = new ArrayList<>(topicList.size());
+            
             for(String s : topicList) {
                Map<String, Map<T, EventSelector>> mapToUse;
                String topicToUse;
@@ -265,6 +282,10 @@ public class TypedEventBusImpl implements TypedEventBus {
                }
                Map<T, EventSelector> handlers = 
mapToUse.computeIfAbsent(topicToUse, x1 -> new HashMap<>());
                handlers.put(handler, selector);
+               selectors.add(selector);
+            }
+            if(history > 0) {
+               queue.add(historicalEvents.apply(selectors, history));
             }
         }
     }
@@ -353,7 +374,13 @@ public class TypedEventBusImpl implements TypedEventBus {
         synchronized (lock) {
             T handler = idToHandler.get(serviceId);
             doRemoveEventHandler(map, idToHandler, handler, serviceId);
-            doAddEventHandler(map, wildcardMap, idToHandler, handler, 
defaultTopic, properties);
+            doAddEventHandler(map, wildcardMap, idToHandler, handler, 
defaultTopic, properties,
+                       (a,b) -> new EventTask() {
+                                               @Override
+                                               public void notifyListener() {
+                                                       // no-op as history 
will already have been played
+                                               }
+                                       });
         }
     }
 
@@ -457,9 +484,10 @@ public class TypedEventBusImpl implements TypedEventBus {
                 deliveryTasks = unhandledEventHandlers.stream()
                         .map(handler -> new UnhandledEventTask(topic, 
convertibleEventData, handler)).collect(toList());
             }
+            // This occurs inside the lock to ensure history replay doesn't 
miss events
+            queue.add(new MonitorEventTask(topic, convertibleEventData, 
monitorImpl));
         }
 
-        queue.add(new MonitorEventTask(topic, convertibleEventData, 
monitorImpl));
 
         queue.addAll(deliveryTasks);
     }
diff --git 
a/org.apache.aries.typedevent.bus/src/main/java/org/apache/aries/typedevent/bus/impl/TypedEventMonitorImpl.java
 
b/org.apache.aries.typedevent.bus/src/main/java/org/apache/aries/typedevent/bus/impl/TypedEventMonitorImpl.java
index 309f384..1d177fe 100644
--- 
a/org.apache.aries.typedevent.bus/src/main/java/org/apache/aries/typedevent/bus/impl/TypedEventMonitorImpl.java
+++ 
b/org.apache.aries.typedevent.bus/src/main/java/org/apache/aries/typedevent/bus/impl/TypedEventMonitorImpl.java
@@ -18,6 +18,8 @@
 package org.apache.aries.typedevent.bus.impl;
 
 import java.time.Instant;
+import java.util.ArrayList;
+import java.util.Iterator;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.ListIterator;
@@ -25,12 +27,17 @@ import java.util.Map;
 import java.util.concurrent.ArrayBlockingQueue;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.function.Function;
+import java.util.stream.Stream;
 
 import org.osgi.annotation.bundle.Capability;
 import org.osgi.namespace.service.ServiceNamespace;
 import org.osgi.service.typedevent.monitor.MonitorEvent;
 import org.osgi.service.typedevent.monitor.TypedEventMonitor;
 import org.osgi.util.pushstream.PushEvent;
+import org.osgi.util.pushstream.PushEventConsumer;
 import org.osgi.util.pushstream.PushEventSource;
 import org.osgi.util.pushstream.PushStream;
 import org.osgi.util.pushstream.PushStreamProvider;
@@ -45,7 +52,7 @@ public class TypedEventMonitorImpl implements 
TypedEventMonitor {
 
     private final ExecutorService monitoringWorker;
 
-    private final Object lock = new Object();
+    private final ReadWriteLock lock = new ReentrantReadWriteLock();
 
     private final PushStreamProvider psp;
 
@@ -53,7 +60,7 @@ public class TypedEventMonitorImpl implements 
TypedEventMonitor {
 
     private final int historySize = 1024;
 
-    public TypedEventMonitorImpl(Map<String, ?> props) {
+    TypedEventMonitorImpl(Map<String, ?> props) {
 
         monitoringWorker = Executors.newCachedThreadPool();
 
@@ -62,25 +69,28 @@ public class TypedEventMonitorImpl implements 
TypedEventMonitor {
                 .withQueuePolicy(QueuePolicyOption.BLOCK).build();
     }
 
-    public void destroy() {
+    void destroy() {
         source.close();
         monitoringWorker.shutdown();
     }
 
-    public void event(String topic, Map<String, Object> eventData) {
+    void event(String topic, Map<String, Object> eventData) throws 
InterruptedException {
         MonitorEvent me = new MonitorEvent();
         me.eventData = eventData;
         me.topic = topic;
         me.publicationTime = Instant.now();
 
-        synchronized (lock) {
-            historicEvents.add(me);
-            int toRemove = historicEvents.size() - historySize;
-            for (; toRemove > 0; toRemove--) {
-                historicEvents.poll();
-            }
-            source.publish(me);
+        lock.writeLock().lockInterruptibly();
+        try {
+               historicEvents.addFirst(me);
+               int toRemove = historicEvents.size() - historySize;
+               for (; toRemove > 0; toRemove--) {
+                       historicEvents.removeLast();
+               }
+        } finally {
+               lock.writeLock().unlock();
         }
+        source.publish(me);
     }
 
     @Override
@@ -106,60 +116,67 @@ public class TypedEventMonitorImpl implements 
TypedEventMonitor {
     PushEventSource<MonitorEvent> eventSource(int events) {
 
         return pec -> {
-            synchronized (lock) {
-
-                int size = historicEvents.size();
-                int start = Math.max(0, size - events);
-
-                List<MonitorEvent> list = historicEvents.subList(start, size);
-
-                for (MonitorEvent me : list) {
-                    try {
-                        if (pec.accept(PushEvent.data(me)) < 0) {
-                            return () -> {
-                            };
-                        }
-                    } catch (Exception e) {
-                        return () -> {
-                        };
-                    }
-                }
-                return source.open(pec);
-            }
-
+               List<MonitorEvent> list;
+
+               lock.readLock().lockInterruptibly();
+               try {
+                       int toSend = Math.min(historicEvents.size(), events);
+                       list = new ArrayList<>(historicEvents.subList(0, 
toSend));
+               } finally {
+                       lock.readLock().unlock();
+               }
+               return pushBackwards(pec, list);
         };
     }
 
     PushEventSource<MonitorEvent> eventSource(Instant since) {
 
         return pec -> {
-            synchronized (lock) {
-
-                ListIterator<MonitorEvent> it = historicEvents.listIterator();
-
-                while (it.hasNext()) {
-                    MonitorEvent next = it.next();
-                    if (next.publicationTime.isAfter(since)) {
-                        it.previous();
-                        break;
-                    }
-                }
-
-                while (it.hasNext()) {
-                    try {
-                        if (pec.accept(PushEvent.data(it.next())) < 0) {
-                            return () -> {
-                            };
-                        }
-                    } catch (Exception e) {
-                        return () -> {
-                        };
-                    }
-                }
-                return source.open(pec);
-            }
-
+               List<MonitorEvent> list = new ArrayList<>();
+               lock.readLock().lockInterruptibly();
+               try {
+                       Iterator<MonitorEvent> it = historicEvents.iterator();
+                       while(it.hasNext()) {
+                               MonitorEvent next = it.next();
+                               if (!next.publicationTime.isAfter(since)) {
+                                       break;
+                               } else {
+                                       list.add(next);
+                               }
+                       }
+               } finally {
+                       lock.readLock().unlock();
+               }
+               return pushBackwards(pec, list);
         };
     }
 
+       private AutoCloseable pushBackwards(PushEventConsumer<? super 
MonitorEvent> pec, List<MonitorEvent> list)
+                       throws Exception {
+               ListIterator<MonitorEvent> li = list.listIterator(list.size());
+               while (li.hasPrevious()) {
+                       try {
+                               if (pec.accept(PushEvent.data(li.previous())) < 
0) {
+                                       return () -> {
+                                       };
+                               }
+                       } catch (Exception e) {
+                               return () -> {
+                               };
+                       }
+               }
+               return source.open(pec);
+       }
+
+    <T> T copyOfHistory(Function<Stream<MonitorEvent>, T> events) {
+       lock.readLock().lock();
+       try {
+               Stream<MonitorEvent> s = historicEvents.stream();
+               T t = events.apply(s);
+               s.close();
+               return t;
+       } finally {
+               lock.readLock().unlock();
+       }
+    }
 }
diff --git 
a/org.apache.aries.typedevent.bus/src/main/java/org/apache/aries/typedevent/bus/impl/TypedHistoryReplayTask.java
 
b/org.apache.aries.typedevent.bus/src/main/java/org/apache/aries/typedevent/bus/impl/TypedHistoryReplayTask.java
new file mode 100644
index 0000000..7525fd2
--- /dev/null
+++ 
b/org.apache.aries.typedevent.bus/src/main/java/org/apache/aries/typedevent/bus/impl/TypedHistoryReplayTask.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aries.typedevent.bus.impl;
+
+import java.util.List;
+
+import org.osgi.service.typedevent.TypedEventHandler;
+import org.osgi.service.typedevent.monitor.MonitorEvent;
+
+public class TypedHistoryReplayTask extends HistoryReplayTask {
+
+       private final TypedEventHandler<Object> handler;
+       private final Class<?> eventType;
+
+       @SuppressWarnings("unchecked")
+       public TypedHistoryReplayTask(TypedEventMonitorImpl monitorImpl, 
TypedEventHandler<?> handler, Class<?> eventType, List<EventSelector> 
selectors, Integer history) {
+               super(monitorImpl, selectors, history);
+               this.handler = (TypedEventHandler<Object>) handler;
+               this.eventType = eventType;
+       }
+
+       @Override
+       protected void notifyListener(MonitorEvent me) {
+               handler.notify(me.topic, (Object) 
EventConverter.forUntypedEvent(me.eventData).toTypedEvent(eventType));
+       }
+
+}
diff --git 
a/org.apache.aries.typedevent.bus/src/main/java/org/apache/aries/typedevent/bus/impl/UntypedHistoryReplayTask.java
 
b/org.apache.aries.typedevent.bus/src/main/java/org/apache/aries/typedevent/bus/impl/UntypedHistoryReplayTask.java
new file mode 100644
index 0000000..08bf89b
--- /dev/null
+++ 
b/org.apache.aries.typedevent.bus/src/main/java/org/apache/aries/typedevent/bus/impl/UntypedHistoryReplayTask.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aries.typedevent.bus.impl;
+
+import java.util.List;
+
+import org.osgi.service.typedevent.UntypedEventHandler;
+import org.osgi.service.typedevent.monitor.MonitorEvent;
+
+public class UntypedHistoryReplayTask extends HistoryReplayTask {
+
+       private final UntypedEventHandler handler;
+
+       public UntypedHistoryReplayTask(TypedEventMonitorImpl monitorImpl, 
UntypedEventHandler handler, List<EventSelector> selectors, Integer history) {
+               super(monitorImpl, selectors, history);
+               this.handler = handler;
+       }
+
+       @Override
+       protected void notifyListener(MonitorEvent me) {
+               handler.notifyUntyped(me.topic, me.eventData);
+       }
+
+}
diff --git 
a/org.apache.aries.typedevent.bus/src/test/java/org/apache/aries/typedevent/bus/impl/EventSelectorTest.java
 
b/org.apache.aries.typedevent.bus/src/test/java/org/apache/aries/typedevent/bus/impl/EventSelectorTest.java
index ba6b152..21d4e0f 100644
--- 
a/org.apache.aries.typedevent.bus/src/test/java/org/apache/aries/typedevent/bus/impl/EventSelectorTest.java
+++ 
b/org.apache.aries.typedevent.bus/src/test/java/org/apache/aries/typedevent/bus/impl/EventSelectorTest.java
@@ -21,6 +21,7 @@ import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 
+import java.util.HashMap;
 import java.util.List;
 import java.util.stream.Stream;
 
@@ -51,7 +52,7 @@ public class EventSelectorTest {
     @ParameterizedTest
     @MethodSource("getTopicMatchingData")
     public void testTopicMatching(String topic, String topicFilter, boolean 
expectedResult) {
-        assertEquals(expectedResult, new EventSelector(topicFilter, 
null).matches(topic, null));
+        assertEquals(expectedResult, new EventSelector(topicFilter, 
null).matches(topic, new HashMap<>()));
     }
     
     /**
diff --git 
a/org.apache.aries.typedevent.bus/src/test/java/org/apache/aries/typedevent/bus/impl/TypedEventBusImplTest.java
 
b/org.apache.aries.typedevent.bus/src/test/java/org/apache/aries/typedevent/bus/impl/TypedEventBusImplTest.java
index dbf2895..b7058f6 100644
--- 
a/org.apache.aries.typedevent.bus/src/test/java/org/apache/aries/typedevent/bus/impl/TypedEventBusImplTest.java
+++ 
b/org.apache.aries.typedevent.bus/src/test/java/org/apache/aries/typedevent/bus/impl/TypedEventBusImplTest.java
@@ -25,6 +25,7 @@ import static org.mockito.ArgumentMatchers.eq;
 import static org.mockito.Mock.Strictness.LENIENT;
 import static org.osgi.framework.Constants.SERVICE_ID;
 import static 
org.osgi.service.typedevent.TypedEventConstants.TYPED_EVENT_FILTER;
+import static 
org.osgi.service.typedevent.TypedEventConstants.TYPED_EVENT_HISTORY;
 import static 
org.osgi.service.typedevent.TypedEventConstants.TYPED_EVENT_TOPICS;
 import static org.osgi.service.typedevent.TypedEventConstants.TYPED_EVENT_TYPE;
 import static org.osgi.util.converter.Converters.standardConverter;
@@ -39,6 +40,7 @@ import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.extension.ExtendWith;
 import org.mockito.ArgumentMatcher;
+import org.mockito.InOrder;
 import org.mockito.Mock;
 import org.mockito.Mockito;
 import org.mockito.junit.jupiter.MockitoExtension;
@@ -732,6 +734,107 @@ public class TypedEventBusImplTest {
         assertThrows(IllegalStateException.class, () -> 
publisher.deliverUntyped(eventMap));
     }
 
+    /**
+     * Tests that filtering is applied when delivering historical events
+     * 
+     * @throws InterruptedException
+     */
+    @Test
+    public void testEventHistoryFiltering() throws InterruptedException {
+
+       TestEvent event = new TestEvent();
+       event.message = "foo";
+       
+       impl.deliver(event);
+       
+       event = new TestEvent();
+        event.message = "bar";
+
+        impl.deliver(event);
+        
+        event = new TestEvent();
+        event.message = "foobar";
+
+        impl.deliver(event);
+
+        event = new TestEvent();
+        event.message = "barfoo";
+        
+        impl.deliver(event);
+       
+       Map<String, Object> serviceProperties = new HashMap<>();
+
+        serviceProperties.put(TYPED_EVENT_TOPICS, TEST_EVENT_TOPIC);
+        serviceProperties.put(TYPED_EVENT_TYPE, TestEvent.class.getName());
+        serviceProperties.put(TYPED_EVENT_FILTER, "(message=foo*)");
+        serviceProperties.put(TYPED_EVENT_HISTORY, 4);
+        serviceProperties.put(SERVICE_ID, 42L);
+
+        impl.addTypedEventHandler(registeringBundle, handlerA, 
serviceProperties);
+        
+        assertTrue(semA.tryAcquire(2, 1, TimeUnit.SECONDS));
+        assertFalse(semA.tryAcquire(1, TimeUnit.SECONDS));
+        
+        InOrder order = Mockito.inOrder(handlerA);
+        
order.verify(handlerA).notify(Mockito.eq(TestEvent.class.getName().replace('.', 
'/')),
+                Mockito.argThat(isTestEventWithMessage("foo")));
+        
order.verify(handlerA).notify(Mockito.eq(TestEvent.class.getName().replace('.', 
'/')),
+                       Mockito.argThat(isTestEventWithMessage("foobar")));
+
+        serviceProperties = new HashMap<>();
+
+        serviceProperties.put(TYPED_EVENT_TOPICS, TEST_EVENT_TOPIC);
+        serviceProperties.put(TYPED_EVENT_TYPE, TestEvent.class.getName());
+        serviceProperties.put(TYPED_EVENT_FILTER, 
"(|(message=bar*)(message=foo*))");
+        serviceProperties.put(TYPED_EVENT_HISTORY, 3);
+        serviceProperties.put(SERVICE_ID, 43L);
+
+        impl.addTypedEventHandler(registeringBundle, handlerB, 
serviceProperties);
+
+        assertTrue(semB.tryAcquire(3, 1, TimeUnit.SECONDS));
+        assertFalse(semB.tryAcquire(1, TimeUnit.SECONDS));
+        
+        order = Mockito.inOrder(handlerB);
+        
order.verify(handlerB).notify(Mockito.eq(TestEvent.class.getName().replace('.', 
'/')),
+                       Mockito.argThat(isTestEventWithMessage("bar")));
+        
order.verify(handlerB).notify(Mockito.eq(TestEvent.class.getName().replace('.', 
'/')),
+                       Mockito.argThat(isTestEventWithMessage("foobar")));
+        
order.verify(handlerB).notify(Mockito.eq(TestEvent.class.getName().replace('.', 
'/')),
+                       Mockito.argThat(isTestEventWithMessage("barfoo")));
+        
+        serviceProperties = new HashMap<>();
+
+        serviceProperties.put(TYPED_EVENT_TOPICS, TEST_EVENT_TOPIC);
+        serviceProperties.put(TYPED_EVENT_FILTER, 
"(|(message=foo)(message=bar))");
+        serviceProperties.put(TYPED_EVENT_HISTORY, 4);
+        serviceProperties.put(SERVICE_ID, 44L);
+
+        impl.addUntypedEventHandler(untypedHandlerA, serviceProperties);
+
+        assertTrue(untypedSemA.tryAcquire(2, 1, TimeUnit.SECONDS));
+        assertFalse(untypedSemA.tryAcquire(1, TimeUnit.SECONDS));
+        
+        
Mockito.verify(untypedHandlerA).notifyUntyped(Mockito.eq(TestEvent.class.getName().replace('.',
 '/')),
+                       Mockito.argThat(isUntypedTestEventWithMessage("foo")));
+        
Mockito.verify(untypedHandlerA).notifyUntyped(Mockito.eq(TestEvent.class.getName().replace('.',
 '/')),
+                       Mockito.argThat(isUntypedTestEventWithMessage("bar")));
+
+        serviceProperties = new HashMap<>();
+
+        serviceProperties.put(TYPED_EVENT_TOPICS, TEST_EVENT_TOPIC);
+        serviceProperties.put(TYPED_EVENT_FILTER, "(message=*)");
+        serviceProperties.put(TYPED_EVENT_HISTORY, 1);
+        serviceProperties.put(SERVICE_ID, 45L);
+
+        impl.addUntypedEventHandler(untypedHandlerB, serviceProperties);
+
+        assertTrue(untypedSemB.tryAcquire(1, TimeUnit.SECONDS));
+        assertFalse(untypedSemB.tryAcquire(1, TimeUnit.SECONDS));
+
+        
Mockito.verify(untypedHandlerB).notifyUntyped(Mockito.eq(TestEvent.class.getName().replace('.',
 '/')),
+                Mockito.argThat(isUntypedTestEventWithMessage("barfoo")));
+    }
+
     ArgumentMatcher<TestEvent> isTestEventWithMessage(String message) {
         return new ArgumentMatcher<TestEvent>() {
 

Reply via email to