This is an automated email from the ASF dual-hosted git repository.

timothyjward pushed a commit to branch feature/v1.1
in repository https://gitbox.apache.org/repos/asf/aries-typedevent.git

commit e6d4fdcd47e9a2d1e835f780107917962240b3f7
Author: Tim Ward <[email protected]>
AuthorDate: Wed Mar 6 12:00:25 2024 +0000

    Add support for Event History Configuration
    
    This commit updates the TypedEventMonitor API and implements the necessary 
history configuration methods. There are tests demonstrating that history can 
be dynamically reconfigured on a per-topic basis
    
    Signed-off-by: Tim Ward <[email protected]>
---
 org.apache.aries.typedevent.bus/pom.xml            |   5 +-
 .../aries/typedevent/bus/impl/EventSelector.java   |  82 +++++++-
 .../aries/typedevent/bus/impl/TopicHistory.java    |  73 +++++++
 .../typedevent/bus/impl/TypedEventBusImpl.java     |   4 +-
 .../typedevent/bus/impl/TypedEventMonitorImpl.java | 210 ++++++++++++++++++++-
 .../typedevent/monitor/TypedEventMonitor.java      | 156 +++++++++++++++
 .../bus/osgi/TypedEventMonitorIntegrationTest.java | 165 ++++++++++++++++
 org.apache.aries.typedevent.bus/test.bndrun        |   3 +-
 typedevent-test-bom/pom.xml                        |   6 +
 9 files changed, 690 insertions(+), 14 deletions(-)

diff --git a/org.apache.aries.typedevent.bus/pom.xml 
b/org.apache.aries.typedevent.bus/pom.xml
index 2ee877a..fc6c662 100644
--- a/org.apache.aries.typedevent.bus/pom.xml
+++ b/org.apache.aries.typedevent.bus/pom.xml
@@ -82,6 +82,10 @@
             <groupId>org.osgi</groupId>
             <artifactId>org.osgi.test.junit5</artifactId>
         </dependency>
+        <dependency>
+            <groupId>org.osgi</groupId>
+            <artifactId>org.osgi.test.junit5.cm</artifactId>
+        </dependency>
         <dependency>
             <groupId>ch.qos.logback</groupId>
             <artifactId>logback-classic</artifactId>
@@ -114,7 +118,6 @@
                        <groupId>org.osgi</groupId>
                        <artifactId>org.osgi.util.function</artifactId>
                        <version>1.1.0</version>
-                       <scope>runtime</scope>
                </dependency>
                <dependency>
                        <groupId>org.osgi</groupId>
diff --git 
a/org.apache.aries.typedevent.bus/src/main/java/org/apache/aries/typedevent/bus/impl/EventSelector.java
 
b/org.apache.aries.typedevent.bus/src/main/java/org/apache/aries/typedevent/bus/impl/EventSelector.java
index 39ce712..ca3f6ad 100644
--- 
a/org.apache.aries.typedevent.bus/src/main/java/org/apache/aries/typedevent/bus/impl/EventSelector.java
+++ 
b/org.apache.aries.typedevent.bus/src/main/java/org/apache/aries/typedevent/bus/impl/EventSelector.java
@@ -23,8 +23,10 @@ import java.util.function.Predicate;
 
 import org.osgi.framework.Filter;
 
-public class EventSelector {
+public class EventSelector implements Comparable<EventSelector> {
 
+       private final String topicFilter;
+       
        /** The event filter **/
        private final Filter filter;
        
@@ -62,6 +64,7 @@ public class EventSelector {
         * @param filter
         */
        public EventSelector(String topic, Filter filter) {
+               this.topicFilter = topic;
                this.filter = filter;
                
                if(topic == null) {
@@ -120,6 +123,10 @@ public class EventSelector {
                // Must match the topic, and the filter if set
                return topicMatcher.test(topic) && (filter == null || 
filter.matches(event));
        }
+
+       public boolean matchesTopic(String topic) {
+               return topicMatcher.test(topic);
+       }
        
        private boolean topicMatch(String topic) {
                
@@ -159,4 +166,77 @@ public class EventSelector {
        public String getInitial() {
                return initial;
        }
+       
+       /**
+        * Get the topic filter
+        * @return
+        */
+       public String getTopicFilter() {
+               return topicFilter;
+       }
+       
+       public boolean isWildcard() {
+               return isMultiLevelWildcard || !additionalSegments.isEmpty();
+       }
+
+       @Override
+       public int compareTo(EventSelector o) {
+               if(isWildcard()) { 
+                       if(!o.isWildcard()) {
+                               return 1;
+                       }
+               } else if(o.isWildcard()) {
+                       return -1;
+               } else {
+                       return initial.compareTo(o.initial);
+               }
+               
+               int compare = tokenCount(o.initial) - tokenCount(initial);
+               
+               for(int i = 0; compare == 0 && i < additionalSegments.size(); 
i++) {
+                       if(o.additionalSegments.size() > i) {
+                               compare = 
tokenCount(o.additionalSegments.get(i)) - tokenCount(additionalSegments.get(i));
+                       } else {
+                               // other is out of segments before we are
+                               return 1;
+                       }
+               }
+               
+               if(compare == 0) {
+                       
+                       if(o.additionalSegments.size() > 
additionalSegments.size()) {
+                               return -1;
+                       }
+                       
+                       if(isMultiLevelWildcard) {
+                               if(!o.isMultiLevelWildcard) {
+                                       return 1;
+                               }
+                       } else if(o.isMultiLevelWildcard) {
+                               return -1;
+                       }
+                       
+                       compare = initial.compareTo(o.initial);
+                       
+                       for(int i = 0; compare == 0 && i < 
additionalSegments.size(); i++) {
+                               compare = 
additionalSegments.get(i).compareTo(o.additionalSegments.get(i));
+                       }
+               }
+               
+               return compare;
+       }
+       
+       private int tokenCount(String s) {
+               int count;
+               if("/".equals(s)) {
+                       count = 0;
+               } else {
+                       count = 1;
+                       int idx = 0;
+                       while((idx = s.indexOf('/', idx + 1)) > 0) {
+                               count++;
+                       }
+               }
+               return count;
+       }
 }
diff --git 
a/org.apache.aries.typedevent.bus/src/main/java/org/apache/aries/typedevent/bus/impl/TopicHistory.java
 
b/org.apache.aries.typedevent.bus/src/main/java/org/apache/aries/typedevent/bus/impl/TopicHistory.java
new file mode 100644
index 0000000..8e5703e
--- /dev/null
+++ 
b/org.apache.aries.typedevent.bus/src/main/java/org/apache/aries/typedevent/bus/impl/TopicHistory.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.aries.typedevent.bus.impl;
+
+import java.util.ArrayDeque;
+import java.util.ArrayList;
+import java.util.Deque;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map.Entry;
+
+import org.osgi.service.typedevent.monitor.MonitorEvent;
+
+public class TopicHistory {
+    private final int minRequired;
+    private final int maxRequired;
+    // Oldest first
+    private final Deque<MonitorEvent> events;
+    
+    public TopicHistory(int minRequired, int maxRequired) {
+               this.minRequired = minRequired;
+               this.maxRequired = maxRequired;
+               events = new ArrayDeque<MonitorEvent>(maxRequired);
+       }
+
+       public MonitorEvent addEvent(MonitorEvent event) {
+       MonitorEvent toRemove = events.size() == maxRequired ? 
events.removeFirst() : null;
+       events.offerLast(event);
+       return toRemove;
+    }
+    
+       public boolean clearEvent(MonitorEvent event) {
+               if(events.size() > minRequired) {
+                       MonitorEvent last = events.remove();
+                       if(last != event) {
+                               // TODO log a warning?
+                       }
+                       return true;
+               } else {
+                       return false;
+               }
+       }
+       
+       public boolean policyMatches(Entry<Integer, Integer> policy) {
+               return policy.getKey().intValue() == minRequired && 
policy.getValue().intValue() == maxRequired;
+       }
+
+       public List<MonitorEvent> copyFrom(TopicHistory oldHistory) {
+               List<MonitorEvent> list = new 
ArrayList<MonitorEvent>(oldHistory.events);
+
+               int toCopy = Math.min(list.size(), maxRequired - events.size());
+               int newSize = list.size() - toCopy;
+               for(int i = 0 ; i < toCopy; i++) {
+                       events.offerLast(list.remove(newSize));
+               }
+               return list;
+       }
+}
\ No newline at end of file
diff --git 
a/org.apache.aries.typedevent.bus/src/main/java/org/apache/aries/typedevent/bus/impl/TypedEventBusImpl.java
 
b/org.apache.aries.typedevent.bus/src/main/java/org/apache/aries/typedevent/bus/impl/TypedEventBusImpl.java
index 1a73d73..2f7d360 100644
--- 
a/org.apache.aries.typedevent.bus/src/main/java/org/apache/aries/typedevent/bus/impl/TypedEventBusImpl.java
+++ 
b/org.apache.aries.typedevent.bus/src/main/java/org/apache/aries/typedevent/bus/impl/TypedEventBusImpl.java
@@ -526,14 +526,14 @@ public class TypedEventBusImpl implements TypedEventBus {
        return list;
     }
 
-    private static void checkTopicSyntax(String topic) {
+    static void checkTopicSyntax(String topic) {
        String msg = checkTopicSyntax(topic, false);
        if(msg != null) {
                throw new IllegalArgumentException(msg);
        }
     }
     
-    private static String checkTopicSyntax(String topic, boolean 
wildcardPermitted) {
+    static String checkTopicSyntax(String topic, boolean wildcardPermitted) {
        
        if(topic == null) {
                throw new IllegalArgumentException("The topic name is not 
permitted to be null");
diff --git 
a/org.apache.aries.typedevent.bus/src/main/java/org/apache/aries/typedevent/bus/impl/TypedEventMonitorImpl.java
 
b/org.apache.aries.typedevent.bus/src/main/java/org/apache/aries/typedevent/bus/impl/TypedEventMonitorImpl.java
index 0e2d0b7..4909206 100644
--- 
a/org.apache.aries.typedevent.bus/src/main/java/org/apache/aries/typedevent/bus/impl/TypedEventMonitorImpl.java
+++ 
b/org.apache.aries.typedevent.bus/src/main/java/org/apache/aries/typedevent/bus/impl/TypedEventMonitorImpl.java
@@ -18,17 +18,24 @@
 package org.apache.aries.typedevent.bus.impl;
 
 import java.time.Instant;
+import java.util.AbstractMap.SimpleImmutableEntry;
 import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.Iterator;
+import java.util.LinkedHashMap;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.ListIterator;
 import java.util.Map;
+import java.util.Map.Entry;
+import java.util.SortedMap;
+import java.util.TreeMap;
 import java.util.concurrent.ArrayBlockingQueue;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.locks.ReadWriteLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.function.Consumer;
 import java.util.function.Function;
 import java.util.stream.Stream;
 
@@ -36,6 +43,7 @@ import org.osgi.annotation.bundle.Capability;
 import org.osgi.namespace.service.ServiceNamespace;
 import org.osgi.service.typedevent.monitor.MonitorEvent;
 import org.osgi.service.typedevent.monitor.TypedEventMonitor;
+import org.osgi.util.function.Predicate;
 import org.osgi.util.pushstream.PushEvent;
 import org.osgi.util.pushstream.PushEventConsumer;
 import org.osgi.util.pushstream.PushEventSource;
@@ -48,6 +56,8 @@ import org.osgi.util.pushstream.SimplePushEventSource;
 @Capability(namespace = ServiceNamespace.SERVICE_NAMESPACE, attribute = 
"objectClass:List<String>=org.osgi.service.typedevent.monitor.TypedEventMonitor",
 uses = TypedEventMonitor.class)
 public class TypedEventMonitorImpl implements TypedEventMonitor {
 
+       private static final Entry<Integer, Integer> EMPTY = new 
SimpleImmutableEntry<>(0,0);
+       
     private final LinkedList<MonitorEvent> historicEvents = new 
LinkedList<MonitorEvent>();
 
     private final ExecutorService monitoringWorker;
@@ -59,9 +69,18 @@ public class TypedEventMonitorImpl implements 
TypedEventMonitor {
     private final SimplePushEventSource<MonitorEvent> source;
 
     private final int historySize = 1024;
+    
+    private final SortedMap<EventSelector, Entry<Integer, Integer>> 
historyConfiguration = new TreeMap<>();
+    
+    private final Map<String, TopicHistory> topicsWithRestrictedHistories = 
new HashMap<>();
 
     TypedEventMonitorImpl(Map<String, ?> props) {
 
+       Object object = props.get("event.history.enable.at.start");
+       if(object == null || "true".equals(object.toString())) {
+               historyConfiguration.put(new EventSelector("*", null), new 
SimpleImmutableEntry<>(0, Integer.MAX_VALUE));
+       }
+       
         monitoringWorker = Executors.newCachedThreadPool();
 
         psp = new PushStreamProvider();
@@ -75,24 +94,57 @@ public class TypedEventMonitorImpl implements 
TypedEventMonitor {
     }
 
     void event(String topic, Map<String, Object> eventData) throws 
InterruptedException {
-        MonitorEvent me = new MonitorEvent();
-        me.eventData = eventData;
-        me.topic = topic;
-        me.publicationTime = Instant.now();
 
+       MonitorEvent me = null;
         lock.writeLock().lockInterruptibly();
         try {
-               historicEvents.addFirst(me);
-               int toRemove = historicEvents.size() - historySize;
-               for (; toRemove > 0; toRemove--) {
-                       historicEvents.removeLast();
+               Entry<Integer, Integer> policy = 
doGetEffectiveHistoryStorage(topic);
+               
+               if(policy.getValue() > 0) {
+                               me = getMonitorEvent(topic, eventData);
+                       
+                       historicEvents.addFirst(me);
+
+                       if(policy.getValue() < historySize) {
+                               TopicHistory th = 
topicsWithRestrictedHistories.computeIfAbsent(topic, 
+                                               t -> new 
TopicHistory(policy.getKey(), policy.getValue()));
+                               MonitorEvent old = th.addEvent(me);
+                               if(old != null) {
+                                       historicEvents.remove(old);
+                               }
+                       }
+                       
+
+                       int toRemove = historicEvents.size() - historySize;
+                       if(toRemove > 0) {
+                               Iterator<MonitorEvent> it = 
historicEvents.descendingIterator();
+                               for (; toRemove > 0 && it.hasNext();) {
+                                       MonitorEvent toCheck = it.next();
+                                       TopicHistory th = 
topicsWithRestrictedHistories.get(toCheck.topic);
+                                       if(th == null || 
th.clearEvent(toCheck)) {
+                                               it.remove();
+                                               toRemove--;
+                                       }
+                               }
+                       }
                }
         } finally {
                lock.writeLock().unlock();
         }
-        source.publish(me);
+        
+        if(source.isConnected()) {
+               source.publish(me == null? getMonitorEvent(topic, eventData) : 
me);
+        }
     }
 
+       private MonitorEvent getMonitorEvent(String topic, Map<String, Object> 
eventData) {
+               MonitorEvent me = new MonitorEvent();
+               me.eventData = eventData;
+               me.topic = topic;
+               me.publicationTime = Instant.now();
+               return me;
+       }
+
     @Override
     public PushStream<MonitorEvent> monitorEvents() {
         return monitorEvents(0);
@@ -196,4 +248,144 @@ public class TypedEventMonitorImpl implements 
TypedEventMonitor {
                lock.readLock().unlock();
        }
     }
+
+       @Override
+       public Predicate<String> topicFilterMatches(String topicFilter) {
+               TypedEventBusImpl.checkTopicSyntax(topicFilter, true);
+               EventSelector selector = new EventSelector(topicFilter, null);
+               return selector::matchesTopic;
+       }
+
+       @Override
+       public boolean topicFilterMatches(String topicName, String topicFilter) 
{
+               TypedEventBusImpl.checkTopicSyntax(topicFilter, true);
+               TypedEventBusImpl.checkTopicSyntax(topicName);
+               EventSelector selector = new EventSelector(topicFilter, null);
+               return selector.matchesTopic(topicName);
+       }
+
+       @Override
+       public long getMaximumEventStorage() {
+               return historySize;
+       }
+
+       @Override
+       public Map<String, Entry<Integer, Integer>> 
getConfiguredHistoryStorage() {
+               Map<String, Entry<Integer, Integer>> copy = new 
LinkedHashMap<>();
+               lock.readLock().lock();
+               try {
+                       for (Entry<EventSelector, Entry<Integer, Integer>> e : 
historyConfiguration.entrySet()) {
+                               copy.put(e.getKey().getTopicFilter(), 
e.getValue());
+                       }
+               } finally {
+                       lock.readLock().unlock();
+               }
+               return copy;
+       }
+
+       @Override
+       public Entry<Integer, Integer> getConfiguredHistoryStorage(String 
topicFilter) {
+               TypedEventBusImpl.checkTopicSyntax(topicFilter, true);
+               EventSelector selector = new EventSelector(topicFilter, null);
+               lock.readLock().lock();
+               try {
+                       return historyConfiguration.get(selector);
+               } finally {
+                       lock.readLock().unlock();
+               }
+       }
+
+       @Override
+       public Entry<Integer, Integer> getEffectiveHistoryStorage(String 
topicName) {
+               TypedEventBusImpl.checkTopicSyntax(topicName);
+               lock.readLock().lock();
+               try {
+                       return doGetEffectiveHistoryStorage(topicName);
+               } finally {
+                       lock.readLock().unlock();
+               }
+       }
+
+       private Entry<Integer, Integer> doGetEffectiveHistoryStorage(String 
topicName) {
+               return historyConfiguration.entrySet().stream()
+                               .filter(e -> e.getKey().matchesTopic(topicName))
+                               .map(Entry::getValue)
+                               .findFirst()
+                               .orElse(EMPTY);
+       }
+
+       @Override
+       public int configureHistoryStorage(String topicFilter, int minRequired, 
int maxRequired) {
+               
+               if(minRequired < 0 || maxRequired < 0) {
+                       throw new IllegalArgumentException("The minimum and 
maxium stored events must be greater than zero");
+               }
+               if(minRequired > maxRequired) {
+                       throw new IllegalArgumentException("The minimum stored 
events must not be greater than the maximum stored events");
+               }
+               
+               if(minRequired > 0) {
+                       TypedEventBusImpl.checkTopicSyntax(topicFilter);
+               } else {
+                       TypedEventBusImpl.checkTopicSyntax(topicFilter, true);
+               }
+               
+               EventSelector key = new EventSelector(topicFilter, null);
+               Entry<Integer, Integer> val = new 
SimpleImmutableEntry<>(minRequired, maxRequired);
+               long available;
+               lock.writeLock().lock();
+               try {
+                       available = historySize - 
historyConfiguration.entrySet().stream()
+                                       .filter(e -> 
!e.getKey().getTopicFilter().equals(topicFilter))
+                                       .mapToLong(e -> 
e.getValue().getKey()).sum();
+                       if(available < minRequired) {
+                               throw new IllegalStateException("Insufficient 
space available for " + minRequired + " events");
+                       }
+                       
+                       Entry<Integer, Integer> old = 
historyConfiguration.put(key, val);
+                       if(key.isWildcard()) {
+                               if(!val.equals(old)) {
+                                       
+                                       Consumer<String> action = (minRequired 
> 0 || maxRequired < historySize) ?
+                                                       s -> 
updateRestrictedHistory(s, minRequired, maxRequired) :
+                                                       
topicsWithRestrictedHistories::remove;
+                                       for(Entry<String, TopicHistory> e : 
topicsWithRestrictedHistories.entrySet()) {
+                                               
if(key.matchesTopic(e.getKey())) {
+                                                       Entry<Integer, Integer> 
policy = getEffectiveHistoryStorage(e.getKey());
+                                                       
if(!e.getValue().policyMatches(policy)) {
+                                                               
action.accept(e.getKey());
+                                                       }
+                                               }
+                                       }
+                               }
+                       } else if(minRequired > 0 || maxRequired < historySize){
+                               updateRestrictedHistory(topicFilter, 
minRequired, maxRequired);
+                       } else {
+                               
topicsWithRestrictedHistories.remove(topicFilter);
+                       }
+               } finally {
+                       lock.writeLock().unlock();
+               }
+               return (int) Math.min(maxRequired, available);
+       }
+
+       private void updateRestrictedHistory(String topicFilter, int 
minRequired, int maxRequired) {
+               TopicHistory newHistory = new TopicHistory(minRequired, 
maxRequired);
+               TopicHistory oldHistory = 
topicsWithRestrictedHistories.put(topicFilter, newHistory);
+               if(oldHistory != null) {
+                       List<MonitorEvent> toRemove = 
newHistory.copyFrom(oldHistory);
+                       historicEvents.removeAll(toRemove);
+               }
+       }
+
+       @Override
+       public void removeHistoryStorage(String topicFilter) {
+               EventSelector selector = new EventSelector(topicFilter, null);
+               lock.readLock().lock();
+               try {
+                       historyConfiguration.remove(selector);
+               } finally {
+                       lock.readLock().unlock();
+               }
+       }
 }
diff --git 
a/org.apache.aries.typedevent.bus/src/main/java/org/osgi/service/typedevent/monitor/TypedEventMonitor.java
 
b/org.apache.aries.typedevent.bus/src/main/java/org/osgi/service/typedevent/monitor/TypedEventMonitor.java
index 37a0372..5331125 100644
--- 
a/org.apache.aries.typedevent.bus/src/main/java/org/osgi/service/typedevent/monitor/TypedEventMonitor.java
+++ 
b/org.apache.aries.typedevent.bus/src/main/java/org/osgi/service/typedevent/monitor/TypedEventMonitor.java
@@ -18,8 +18,11 @@
 package org.osgi.service.typedevent.monitor;
 
 import java.time.Instant;
+import java.util.Map;
+import java.util.Map.Entry;
 
 import org.osgi.annotation.versioning.ProviderType;
+import org.osgi.util.function.Predicate;
 import org.osgi.util.pushstream.PushStream;
 
 /**
@@ -94,4 +97,157 @@ public interface TypedEventMonitor {
        PushStream<MonitorEvent> monitorEvents(Instant history,
                        boolean historyOnly);
 
+       /**
+        * Get a Predicate which will match the supplied topic filter against a
+        * topic name.
+        * 
+        * @param topicFilter The topic filter to match against
+        * @return A predicate that will return true if the topic name being 
tested
+        *         matches the supplied topic filter.
+        * @since 1.1
+        * @throws NullPointerException if the topic filter is <code>null</code>
+        * @throws IllegalArgumentException if the topic filter contains invalid
+        *             syntax
+        */
+       Predicate<String> topicFilterMatches(String topicFilter);
+
+       /**
+        * Test the supplied topic filter against the supplied topic name.
+        * 
+        * @param topicName The topic name to match against
+        * @param topicFilter The topic filter to match against
+        * @return A predicate that will return true if the topic name being 
tested
+        *         matches the supplied topic filter.
+        * @since 1.1
+        * @throws NullPointerException if the topic filter is <code>null</code>
+        * @throws IllegalArgumentException if the topic filter or topic name
+        *             contain invalid syntax
+        */
+       boolean topicFilterMatches(String topicName, String topicFilter);
+
+       /**
+        * Get an estimate of the maximum number of historic events that can be
+        * stored by the TypedEvent implementation. If there is no fixed limit 
then
+        * -1 is returned. If no history storage is supported then zero is 
returned.
+        * 
+        * @return The maximum number of historic events that can be stored.
+        * @since 1.1
+        */
+       long getMaximumEventStorage();
+
+       /**
+        * Get the configured history storage for the Typed Events 
implementation.
+        * <p>
+        * The returned {@link Map} uses topic filter strings as keys. These 
filter
+        * strings may contain wildcards. If multiple filter strings are able to
+        * match then the most specific match applies with the following 
ordering:
+        * <ol>
+        * <li>An exact topic match</li>
+        * <li>An exact match of the parent topic and a single level wildcard 
as the
+        * final token</li>
+        * <li>An exact match of the parent topic and multi-level wildcard as 
the
+        * final token</li>
+        * </ol>
+        * <p>
+        * This ordering is applied recursively starting with the first topic 
token
+        * until only one candidate remains. The keys in the returned map are
+        * ordered such that the first encountered key which matches a given 
topic
+        * name is the configuration that will apply to that topic name.
+        * <p>
+        * The value associated with each key is an {@link Entry} where the key 
is
+        * the minimum required number of stored events for the topic and the 
value
+        * is the maximum number of events that will be stored.
+        *
+        * @return The configured history storage
+        * @since 1.1
+        */
+       Map<String,Entry<Integer,Integer>> getConfiguredHistoryStorage();
+
+       /**
+        * Get the configured history storage for a given topic filter. This 
method
+        * looks for an exact match in the history configuration. If no
+        * configuration is found for the supplied topic filter then
+        * <code>null</code> is returned.
+        * 
+        * @param topicFilter the topic filter
+        * @return An {@link Entry} where the key is the minimum required 
number of
+        *         stored events for the topic and the value is the maximum 
number
+        *         of events that will be stored. If no configuration is set 
for the
+        *         topic filter then <code>null</code> will be returned.
+        * @throws NullPointerException if the topic filter is <code>null</code>
+        * @throws IllegalArgumentException if the topic filter contains invalid
+        *             syntax
+        * @since 1.1
+        */
+       Entry<Integer,Integer> getConfiguredHistoryStorage(String topicFilter);
+
+       /**
+        * Get the history storage rule that applies to a given topic name. This
+        * method takes into account the necessary precedence rules to find the
+        * correct configuration for the named method, and so will never return
+        * <code>null</code>.
+        * 
+        * @param topicName the topic name
+        * @return An {@link Entry} where the key is the minimum required 
number of
+        *         stored events for the topic and the value is the maximum 
number
+        *         of events that will be stored. If no configuration is set 
for the
+        *         topic filter then an entry with key and value set to zero 
will be
+        *         returned.
+        * @throws NullPointerException if the topic name is <code>null</code>
+        * @throws IllegalArgumentException if the topic name contains invalid
+        *             syntax or wildcards
+        * @since 1.1
+        */
+       Entry<Integer,Integer> getEffectiveHistoryStorage(String topicName);
+
+       /**
+        * Configure history storage for a given topic filter.
+        * <p>
+        * Minimum storage settings may only be set for exact matches. It is an
+        * error to use a filter containing wildcards with a non-zero minimum
+        * history requirement.
+        * <p>
+        * If a minimum storage requirement is set then the Typed Events
+        * implementation must guarantee sufficient storage space to hold those
+        * events. If, after accounting for all other pre-existing miniumum 
storage
+        * requirements, there is insufficent storage left for this new
+        * configuration then an {@link IllegalStateException} must be thrown.
+        * 
+        * @param topicFilter the topic filter
+        * @param minRequired the minimum number of historical events to keep
+        *            available for this filter
+        * @param maxRequired the maximum number of historical events to keep
+        *            available for this filter
+        * @return An int indicating the number of events that can be kept for 
this
+        *         topic given the current configuration. This will always be at
+        *         least <code>minRequired</code> and at most
+        *         <code>maxRequired</code>.
+        * @throws NullPointerException if the topic filter is <code>null</code>
+        * @throws IllegalArgumentException if:
+        *             <ul>
+        *             <li>The topic filter contains invalid syntax</li>
+        *             <li><code>minRequired</code> or <code>maxRequired</code> 
are
+        *             less than <code>0</code>.
+        *             <li>The topic filter contains wildcard(s) and
+        *             <code>minRequired</code> is not <code>0</code>.</li>
+        *             <li><code>minRequired</code> is greater than
+        *             <code>maxRequired</code></li>
+        * @throws IllegalStateException if there is insufficient available 
space to
+        *             provide the additional <code>minRequired</code> stored
+        *             events.
+        * @since 1.1
+        */
+       int configureHistoryStorage(String topicFilter, int minRequired,
+                       int maxRequired);
+
+       /**
+        * Delete history storage configuration for a given topic filter.
+        * 
+        * @param topicFilter the topic filter
+        * @throws NullPointerException if the topic filter is <code>null</code>
+        * @throws IllegalArgumentException if the topic filter contains invalid
+        *             syntax
+        * @since 1.1
+        */
+       void removeHistoryStorage(String topicFilter);
 }
diff --git 
a/org.apache.aries.typedevent.bus/src/test/java/org/apache/aries/typedevent/bus/osgi/TypedEventMonitorIntegrationTest.java
 
b/org.apache.aries.typedevent.bus/src/test/java/org/apache/aries/typedevent/bus/osgi/TypedEventMonitorIntegrationTest.java
index 8632823..fed370a 100644
--- 
a/org.apache.aries.typedevent.bus/src/test/java/org/apache/aries/typedevent/bus/osgi/TypedEventMonitorIntegrationTest.java
+++ 
b/org.apache.aries.typedevent.bus/src/test/java/org/apache/aries/typedevent/bus/osgi/TypedEventMonitorIntegrationTest.java
@@ -16,6 +16,7 @@
  */
 package org.apache.aries.typedevent.bus.osgi;
 
+import static java.util.stream.Collectors.toList;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 
@@ -26,9 +27,12 @@ import java.util.Arrays;
 import java.util.Dictionary;
 import java.util.Hashtable;
 import java.util.List;
+import java.util.Map.Entry;
 import java.util.stream.Collectors;
 
 import org.apache.aries.typedevent.bus.common.TestEvent;
+import org.apache.aries.typedevent.bus.common.TestEvent2;
+import org.apache.aries.typedevent.bus.common.TestEvent2.EventType;
 import org.apache.aries.typedevent.bus.common.TestEventConsumer;
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeAll;
@@ -45,6 +49,10 @@ import org.osgi.service.typedevent.monitor.MonitorEvent;
 import org.osgi.service.typedevent.monitor.TypedEventMonitor;
 import org.osgi.test.common.annotation.InjectBundleContext;
 import org.osgi.test.common.annotation.InjectService;
+import org.osgi.test.common.annotation.Property;
+import org.osgi.test.common.annotation.Property.Scalar;
+import org.osgi.test.common.annotation.config.WithConfiguration;
+import org.osgi.test.junit5.cm.ConfigurationExtension;
 import org.osgi.test.junit5.context.BundleContextExtension;
 import org.osgi.test.junit5.service.ServiceExtension;
 import org.osgi.util.promise.Promise;
@@ -53,6 +61,7 @@ import org.osgi.util.promise.Promise;
 @ExtendWith(BundleContextExtension.class)
 @ExtendWith(ServiceExtension.class)
 @ExtendWith(MockitoExtension.class)
+@ExtendWith(ConfigurationExtension.class)
 public class TypedEventMonitorIntegrationTest extends AbstractIntegrationTest {
 
     @Mock
@@ -424,4 +433,160 @@ public class TypedEventMonitorIntegrationTest extends 
AbstractIntegrationTest {
        assertTrue(events.isEmpty());
     }
 
+    
+    @WithConfiguration(pid = "org.apache.aries.typedevent.bus", properties = 
@Property(key = "event.history.enable.at.start", value = "false", scalar = 
Scalar.Boolean))
+    @Test
+    public void testTopicConfig(@InjectService TypedEventMonitor monitor, 
+               @InjectService TypedEventBus eventBus) throws Exception {
+       
+       Entry<Integer, Integer> historyStorage = 
monitor.getEffectiveHistoryStorage(TEST_EVENT_TOPIC);
+       assertEquals(0, historyStorage.getKey());
+       assertEquals(0, historyStorage.getValue());
+       
+       TestEvent event = new TestEvent();
+        event.message = "boo";
+
+        eventBus.deliver(event);
+
+        event = new TestEvent();
+        event.message = "bam";
+
+        eventBus.deliver(event);
+
+        Thread.sleep(500);
+
+        Promise<List<MonitorEvent>> eventsPromise = monitor.monitorEvents(5, 
true)
+                .collect(Collectors.toList())
+                .timeout(2000);
+
+        List<MonitorEvent> events = eventsPromise.getValue();
+        assertTrue(events.isEmpty());
+        
+        monitor.configureHistoryStorage(TEST_EVENT_TOPIC, 0, 1);
+        
+        event = new TestEvent();
+        event.message = "boo";
+
+        eventBus.deliver(event);
+
+        event = new TestEvent();
+        event.message = "bam";
+
+        eventBus.deliver(event);
+
+        Thread.sleep(500);
+        
+
+        eventsPromise = monitor.monitorEvents(5, true)
+                .collect(Collectors.toList())
+                .timeout(2000);
+
+        events = eventsPromise.getValue();
+        
+        assertEquals(1, events.size(), events.toString());
+
+        assertEquals(TEST_EVENT_TOPIC, events.get(0).topic);
+
+        assertEquals("bam", events.get(0).eventData.get("message"));
+    }
+    
+    @WithConfiguration(pid = "org.apache.aries.typedevent.bus", properties = 
@Property(key = "event.history.enable.at.start", value = "false", scalar = 
Scalar.Boolean))
+    @Test
+    public void testMinimumRetention(@InjectService TypedEventMonitor monitor, 
+               @InjectService TypedEventBus eventBus) throws Exception {
+       
+       monitor.configureHistoryStorage(TEST_EVENT_TOPIC, 3, 5);
+       monitor.configureHistoryStorage("*", 0, Integer.MAX_VALUE);
+
+       TestEvent event = new TestEvent();
+        event.message = "boo";
+
+        eventBus.deliver(event);
+
+        event = new TestEvent();
+        event.message = "bam";
+
+        eventBus.deliver(event);
+        
+        event = new TestEvent();
+        event.message = "boom";
+
+        eventBus.deliver(event);
+
+        event = new TestEvent();
+        event.message = "foo";
+        
+        eventBus.deliver(event);
+
+        event = new TestEvent();
+        event.message = "bar";
+        
+        eventBus.deliver(event);
+
+        event = new TestEvent();
+        event.message = "foobar";
+        
+        eventBus.deliver(event);
+
+        Thread.sleep(500);
+
+        Promise<List<MonitorEvent>> eventsPromise = monitor.monitorEvents(6, 
true)
+                .collect(Collectors.toList())
+                .timeout(2000);
+
+        List<MonitorEvent> events = eventsPromise.getValue();
+        assertEquals(5, events.size(), events.toString());
+
+        assertEquals(TEST_EVENT_TOPIC, events.get(0).topic);
+        assertEquals(TEST_EVENT_TOPIC, events.get(1).topic);
+        assertEquals(TEST_EVENT_TOPIC, events.get(2).topic);
+        assertEquals(TEST_EVENT_TOPIC, events.get(3).topic);
+        assertEquals(TEST_EVENT_TOPIC, events.get(4).topic);
+
+        assertEquals("bam", events.get(0).eventData.get("message"));
+        assertEquals("boom", events.get(1).eventData.get("message"));
+        assertEquals("foo", events.get(2).eventData.get("message"));
+        assertEquals("bar", events.get(3).eventData.get("message"));
+        assertEquals("foobar", events.get(4).eventData.get("message"));
+        
+        long maximumEventStorage = monitor.getMaximumEventStorage();
+        for(long i = 0; i < maximumEventStorage; i++) {
+               TestEvent2 event2 = new TestEvent2();
+               event2.eventType = EventType.GREEN;
+               event2.subEvent = new TestEvent();
+               event2.subEvent.message = "Hello " + i;
+               eventBus.deliver(event2);
+        }
+        
+        Thread.sleep(500);
+        
+        eventsPromise = monitor.monitorEvents((int) (maximumEventStorage + 
100), true)
+                .collect(Collectors.toList())
+                .timeout(2000);
+        events = eventsPromise.getValue();
+        assertEquals(maximumEventStorage, events.size(), events.toString());
+        
+        events = events.stream()
+                       .filter(me -> me.topic.equals(TEST_EVENT_TOPIC))
+                       .collect(toList());
+        assertEquals(3, events.size(), events.toString());
+        assertEquals("foo", events.get(0).eventData.get("message"));
+        assertEquals("bar", events.get(1).eventData.get("message"));
+        assertEquals("foobar", events.get(2).eventData.get("message"));
+        
+        monitor.configureHistoryStorage(TEST_EVENT_TOPIC, 1, 2);
+        
+        eventsPromise = monitor.monitorEvents((int) maximumEventStorage + 100, 
true)
+                .collect(Collectors.toList())
+                .timeout(2000);
+        events = eventsPromise.getValue();
+        assertEquals(maximumEventStorage - 1, events.size(), 
events.toString());
+        
+        events = events.stream()
+                       .filter(me -> me.topic.equals(TEST_EVENT_TOPIC))
+                       .collect(toList());
+        assertEquals(2, events.size(), events.toString());
+        assertEquals("bar", events.get(0).eventData.get("message"));
+        assertEquals("foobar", events.get(1).eventData.get("message"));
+    }
 }
diff --git a/org.apache.aries.typedevent.bus/test.bndrun 
b/org.apache.aries.typedevent.bus/test.bndrun
index b8cbb94..86dda27 100644
--- a/org.apache.aries.typedevent.bus/test.bndrun
+++ b/org.apache.aries.typedevent.bus/test.bndrun
@@ -19,7 +19,7 @@
 
 -runfw: org.apache.felix.framework
 
--runee: JavaSE-11
+-runee: JavaSE-17
 
 -runrequires: bnd.identity;id="org.apache.aries.typedevent.bus",\
   bnd.identity;id="org.apache.aries.typedevent.bus-tests",\
@@ -48,6 +48,7 @@
        org.mockito.mockito-core;version='[5.5.0,5.5.1)',\
        org.osgi.test.common;version='[1.2.1,1.2.2)',\
        org.osgi.test.junit5;version='[1.2.1,1.2.2)',\
+       org.osgi.test.junit5.cm;version='[1.2.1,1.2.2)',\
        ch.qos.logback.classic;version='[1.2.3,1.2.4)',\
        ch.qos.logback.core;version='[1.2.3,1.2.4)',\
        org.apache.aries.component-dsl.component-dsl;version='[1.2.2,1.2.3)',\
diff --git a/typedevent-test-bom/pom.xml b/typedevent-test-bom/pom.xml
index 808c481..76285ae 100644
--- a/typedevent-test-bom/pom.xml
+++ b/typedevent-test-bom/pom.xml
@@ -45,6 +45,12 @@
                 <version>1.2.1</version>
                 <scope>test</scope>
             </dependency>
+            <dependency>
+                <groupId>org.osgi</groupId>
+                <artifactId>org.osgi.test.junit5.cm</artifactId>
+                <version>1.2.1</version>
+                <scope>test</scope>
+            </dependency>
             <dependency>
                 <groupId>org.apache.felix</groupId>
                 <artifactId>org.apache.felix.framework</artifactId>


Reply via email to