This is an automated email from the ASF dual-hosted git repository. timothyjward pushed a commit to branch feature/v1.1 in repository https://gitbox.apache.org/repos/asf/aries-typedevent.git
commit e6d4fdcd47e9a2d1e835f780107917962240b3f7 Author: Tim Ward <[email protected]> AuthorDate: Wed Mar 6 12:00:25 2024 +0000 Add support for Event History Configuration This commit updates the TypedEventMonitor API and implements the necessary history configuration methods. There are tests demonstrating that history can be dynamically reconfigured on a per-topic basis Signed-off-by: Tim Ward <[email protected]> --- org.apache.aries.typedevent.bus/pom.xml | 5 +- .../aries/typedevent/bus/impl/EventSelector.java | 82 +++++++- .../aries/typedevent/bus/impl/TopicHistory.java | 73 +++++++ .../typedevent/bus/impl/TypedEventBusImpl.java | 4 +- .../typedevent/bus/impl/TypedEventMonitorImpl.java | 210 ++++++++++++++++++++- .../typedevent/monitor/TypedEventMonitor.java | 156 +++++++++++++++ .../bus/osgi/TypedEventMonitorIntegrationTest.java | 165 ++++++++++++++++ org.apache.aries.typedevent.bus/test.bndrun | 3 +- typedevent-test-bom/pom.xml | 6 + 9 files changed, 690 insertions(+), 14 deletions(-) diff --git a/org.apache.aries.typedevent.bus/pom.xml b/org.apache.aries.typedevent.bus/pom.xml index 2ee877a..fc6c662 100644 --- a/org.apache.aries.typedevent.bus/pom.xml +++ b/org.apache.aries.typedevent.bus/pom.xml @@ -82,6 +82,10 @@ <groupId>org.osgi</groupId> <artifactId>org.osgi.test.junit5</artifactId> </dependency> + <dependency> + <groupId>org.osgi</groupId> + <artifactId>org.osgi.test.junit5.cm</artifactId> + </dependency> <dependency> <groupId>ch.qos.logback</groupId> <artifactId>logback-classic</artifactId> @@ -114,7 +118,6 @@ <groupId>org.osgi</groupId> <artifactId>org.osgi.util.function</artifactId> <version>1.1.0</version> - <scope>runtime</scope> </dependency> <dependency> <groupId>org.osgi</groupId> diff --git a/org.apache.aries.typedevent.bus/src/main/java/org/apache/aries/typedevent/bus/impl/EventSelector.java b/org.apache.aries.typedevent.bus/src/main/java/org/apache/aries/typedevent/bus/impl/EventSelector.java index 39ce712..ca3f6ad 100644 --- a/org.apache.aries.typedevent.bus/src/main/java/org/apache/aries/typedevent/bus/impl/EventSelector.java +++ b/org.apache.aries.typedevent.bus/src/main/java/org/apache/aries/typedevent/bus/impl/EventSelector.java @@ -23,8 +23,10 @@ import java.util.function.Predicate; import org.osgi.framework.Filter; -public class EventSelector { +public class EventSelector implements Comparable<EventSelector> { + private final String topicFilter; + /** The event filter **/ private final Filter filter; @@ -62,6 +64,7 @@ public class EventSelector { * @param filter */ public EventSelector(String topic, Filter filter) { + this.topicFilter = topic; this.filter = filter; if(topic == null) { @@ -120,6 +123,10 @@ public class EventSelector { // Must match the topic, and the filter if set return topicMatcher.test(topic) && (filter == null || filter.matches(event)); } + + public boolean matchesTopic(String topic) { + return topicMatcher.test(topic); + } private boolean topicMatch(String topic) { @@ -159,4 +166,77 @@ public class EventSelector { public String getInitial() { return initial; } + + /** + * Get the topic filter + * @return + */ + public String getTopicFilter() { + return topicFilter; + } + + public boolean isWildcard() { + return isMultiLevelWildcard || !additionalSegments.isEmpty(); + } + + @Override + public int compareTo(EventSelector o) { + if(isWildcard()) { + if(!o.isWildcard()) { + return 1; + } + } else if(o.isWildcard()) { + return -1; + } else { + return initial.compareTo(o.initial); + } + + int compare = tokenCount(o.initial) - tokenCount(initial); + + for(int i = 0; compare == 0 && i < additionalSegments.size(); i++) { + if(o.additionalSegments.size() > i) { + compare = tokenCount(o.additionalSegments.get(i)) - tokenCount(additionalSegments.get(i)); + } else { + // other is out of segments before we are + return 1; + } + } + + if(compare == 0) { + + if(o.additionalSegments.size() > additionalSegments.size()) { + return -1; + } + + if(isMultiLevelWildcard) { + if(!o.isMultiLevelWildcard) { + return 1; + } + } else if(o.isMultiLevelWildcard) { + return -1; + } + + compare = initial.compareTo(o.initial); + + for(int i = 0; compare == 0 && i < additionalSegments.size(); i++) { + compare = additionalSegments.get(i).compareTo(o.additionalSegments.get(i)); + } + } + + return compare; + } + + private int tokenCount(String s) { + int count; + if("/".equals(s)) { + count = 0; + } else { + count = 1; + int idx = 0; + while((idx = s.indexOf('/', idx + 1)) > 0) { + count++; + } + } + return count; + } } diff --git a/org.apache.aries.typedevent.bus/src/main/java/org/apache/aries/typedevent/bus/impl/TopicHistory.java b/org.apache.aries.typedevent.bus/src/main/java/org/apache/aries/typedevent/bus/impl/TopicHistory.java new file mode 100644 index 0000000..8e5703e --- /dev/null +++ b/org.apache.aries.typedevent.bus/src/main/java/org/apache/aries/typedevent/bus/impl/TopicHistory.java @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.aries.typedevent.bus.impl; + +import java.util.ArrayDeque; +import java.util.ArrayList; +import java.util.Deque; +import java.util.Iterator; +import java.util.List; +import java.util.Map.Entry; + +import org.osgi.service.typedevent.monitor.MonitorEvent; + +public class TopicHistory { + private final int minRequired; + private final int maxRequired; + // Oldest first + private final Deque<MonitorEvent> events; + + public TopicHistory(int minRequired, int maxRequired) { + this.minRequired = minRequired; + this.maxRequired = maxRequired; + events = new ArrayDeque<MonitorEvent>(maxRequired); + } + + public MonitorEvent addEvent(MonitorEvent event) { + MonitorEvent toRemove = events.size() == maxRequired ? events.removeFirst() : null; + events.offerLast(event); + return toRemove; + } + + public boolean clearEvent(MonitorEvent event) { + if(events.size() > minRequired) { + MonitorEvent last = events.remove(); + if(last != event) { + // TODO log a warning? + } + return true; + } else { + return false; + } + } + + public boolean policyMatches(Entry<Integer, Integer> policy) { + return policy.getKey().intValue() == minRequired && policy.getValue().intValue() == maxRequired; + } + + public List<MonitorEvent> copyFrom(TopicHistory oldHistory) { + List<MonitorEvent> list = new ArrayList<MonitorEvent>(oldHistory.events); + + int toCopy = Math.min(list.size(), maxRequired - events.size()); + int newSize = list.size() - toCopy; + for(int i = 0 ; i < toCopy; i++) { + events.offerLast(list.remove(newSize)); + } + return list; + } +} \ No newline at end of file diff --git a/org.apache.aries.typedevent.bus/src/main/java/org/apache/aries/typedevent/bus/impl/TypedEventBusImpl.java b/org.apache.aries.typedevent.bus/src/main/java/org/apache/aries/typedevent/bus/impl/TypedEventBusImpl.java index 1a73d73..2f7d360 100644 --- a/org.apache.aries.typedevent.bus/src/main/java/org/apache/aries/typedevent/bus/impl/TypedEventBusImpl.java +++ b/org.apache.aries.typedevent.bus/src/main/java/org/apache/aries/typedevent/bus/impl/TypedEventBusImpl.java @@ -526,14 +526,14 @@ public class TypedEventBusImpl implements TypedEventBus { return list; } - private static void checkTopicSyntax(String topic) { + static void checkTopicSyntax(String topic) { String msg = checkTopicSyntax(topic, false); if(msg != null) { throw new IllegalArgumentException(msg); } } - private static String checkTopicSyntax(String topic, boolean wildcardPermitted) { + static String checkTopicSyntax(String topic, boolean wildcardPermitted) { if(topic == null) { throw new IllegalArgumentException("The topic name is not permitted to be null"); diff --git a/org.apache.aries.typedevent.bus/src/main/java/org/apache/aries/typedevent/bus/impl/TypedEventMonitorImpl.java b/org.apache.aries.typedevent.bus/src/main/java/org/apache/aries/typedevent/bus/impl/TypedEventMonitorImpl.java index 0e2d0b7..4909206 100644 --- a/org.apache.aries.typedevent.bus/src/main/java/org/apache/aries/typedevent/bus/impl/TypedEventMonitorImpl.java +++ b/org.apache.aries.typedevent.bus/src/main/java/org/apache/aries/typedevent/bus/impl/TypedEventMonitorImpl.java @@ -18,17 +18,24 @@ package org.apache.aries.typedevent.bus.impl; import java.time.Instant; +import java.util.AbstractMap.SimpleImmutableEntry; import java.util.ArrayList; +import java.util.HashMap; import java.util.Iterator; +import java.util.LinkedHashMap; import java.util.LinkedList; import java.util.List; import java.util.ListIterator; import java.util.Map; +import java.util.Map.Entry; +import java.util.SortedMap; +import java.util.TreeMap; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock; +import java.util.function.Consumer; import java.util.function.Function; import java.util.stream.Stream; @@ -36,6 +43,7 @@ import org.osgi.annotation.bundle.Capability; import org.osgi.namespace.service.ServiceNamespace; import org.osgi.service.typedevent.monitor.MonitorEvent; import org.osgi.service.typedevent.monitor.TypedEventMonitor; +import org.osgi.util.function.Predicate; import org.osgi.util.pushstream.PushEvent; import org.osgi.util.pushstream.PushEventConsumer; import org.osgi.util.pushstream.PushEventSource; @@ -48,6 +56,8 @@ import org.osgi.util.pushstream.SimplePushEventSource; @Capability(namespace = ServiceNamespace.SERVICE_NAMESPACE, attribute = "objectClass:List<String>=org.osgi.service.typedevent.monitor.TypedEventMonitor", uses = TypedEventMonitor.class) public class TypedEventMonitorImpl implements TypedEventMonitor { + private static final Entry<Integer, Integer> EMPTY = new SimpleImmutableEntry<>(0,0); + private final LinkedList<MonitorEvent> historicEvents = new LinkedList<MonitorEvent>(); private final ExecutorService monitoringWorker; @@ -59,9 +69,18 @@ public class TypedEventMonitorImpl implements TypedEventMonitor { private final SimplePushEventSource<MonitorEvent> source; private final int historySize = 1024; + + private final SortedMap<EventSelector, Entry<Integer, Integer>> historyConfiguration = new TreeMap<>(); + + private final Map<String, TopicHistory> topicsWithRestrictedHistories = new HashMap<>(); TypedEventMonitorImpl(Map<String, ?> props) { + Object object = props.get("event.history.enable.at.start"); + if(object == null || "true".equals(object.toString())) { + historyConfiguration.put(new EventSelector("*", null), new SimpleImmutableEntry<>(0, Integer.MAX_VALUE)); + } + monitoringWorker = Executors.newCachedThreadPool(); psp = new PushStreamProvider(); @@ -75,24 +94,57 @@ public class TypedEventMonitorImpl implements TypedEventMonitor { } void event(String topic, Map<String, Object> eventData) throws InterruptedException { - MonitorEvent me = new MonitorEvent(); - me.eventData = eventData; - me.topic = topic; - me.publicationTime = Instant.now(); + MonitorEvent me = null; lock.writeLock().lockInterruptibly(); try { - historicEvents.addFirst(me); - int toRemove = historicEvents.size() - historySize; - for (; toRemove > 0; toRemove--) { - historicEvents.removeLast(); + Entry<Integer, Integer> policy = doGetEffectiveHistoryStorage(topic); + + if(policy.getValue() > 0) { + me = getMonitorEvent(topic, eventData); + + historicEvents.addFirst(me); + + if(policy.getValue() < historySize) { + TopicHistory th = topicsWithRestrictedHistories.computeIfAbsent(topic, + t -> new TopicHistory(policy.getKey(), policy.getValue())); + MonitorEvent old = th.addEvent(me); + if(old != null) { + historicEvents.remove(old); + } + } + + + int toRemove = historicEvents.size() - historySize; + if(toRemove > 0) { + Iterator<MonitorEvent> it = historicEvents.descendingIterator(); + for (; toRemove > 0 && it.hasNext();) { + MonitorEvent toCheck = it.next(); + TopicHistory th = topicsWithRestrictedHistories.get(toCheck.topic); + if(th == null || th.clearEvent(toCheck)) { + it.remove(); + toRemove--; + } + } + } } } finally { lock.writeLock().unlock(); } - source.publish(me); + + if(source.isConnected()) { + source.publish(me == null? getMonitorEvent(topic, eventData) : me); + } } + private MonitorEvent getMonitorEvent(String topic, Map<String, Object> eventData) { + MonitorEvent me = new MonitorEvent(); + me.eventData = eventData; + me.topic = topic; + me.publicationTime = Instant.now(); + return me; + } + @Override public PushStream<MonitorEvent> monitorEvents() { return monitorEvents(0); @@ -196,4 +248,144 @@ public class TypedEventMonitorImpl implements TypedEventMonitor { lock.readLock().unlock(); } } + + @Override + public Predicate<String> topicFilterMatches(String topicFilter) { + TypedEventBusImpl.checkTopicSyntax(topicFilter, true); + EventSelector selector = new EventSelector(topicFilter, null); + return selector::matchesTopic; + } + + @Override + public boolean topicFilterMatches(String topicName, String topicFilter) { + TypedEventBusImpl.checkTopicSyntax(topicFilter, true); + TypedEventBusImpl.checkTopicSyntax(topicName); + EventSelector selector = new EventSelector(topicFilter, null); + return selector.matchesTopic(topicName); + } + + @Override + public long getMaximumEventStorage() { + return historySize; + } + + @Override + public Map<String, Entry<Integer, Integer>> getConfiguredHistoryStorage() { + Map<String, Entry<Integer, Integer>> copy = new LinkedHashMap<>(); + lock.readLock().lock(); + try { + for (Entry<EventSelector, Entry<Integer, Integer>> e : historyConfiguration.entrySet()) { + copy.put(e.getKey().getTopicFilter(), e.getValue()); + } + } finally { + lock.readLock().unlock(); + } + return copy; + } + + @Override + public Entry<Integer, Integer> getConfiguredHistoryStorage(String topicFilter) { + TypedEventBusImpl.checkTopicSyntax(topicFilter, true); + EventSelector selector = new EventSelector(topicFilter, null); + lock.readLock().lock(); + try { + return historyConfiguration.get(selector); + } finally { + lock.readLock().unlock(); + } + } + + @Override + public Entry<Integer, Integer> getEffectiveHistoryStorage(String topicName) { + TypedEventBusImpl.checkTopicSyntax(topicName); + lock.readLock().lock(); + try { + return doGetEffectiveHistoryStorage(topicName); + } finally { + lock.readLock().unlock(); + } + } + + private Entry<Integer, Integer> doGetEffectiveHistoryStorage(String topicName) { + return historyConfiguration.entrySet().stream() + .filter(e -> e.getKey().matchesTopic(topicName)) + .map(Entry::getValue) + .findFirst() + .orElse(EMPTY); + } + + @Override + public int configureHistoryStorage(String topicFilter, int minRequired, int maxRequired) { + + if(minRequired < 0 || maxRequired < 0) { + throw new IllegalArgumentException("The minimum and maxium stored events must be greater than zero"); + } + if(minRequired > maxRequired) { + throw new IllegalArgumentException("The minimum stored events must not be greater than the maximum stored events"); + } + + if(minRequired > 0) { + TypedEventBusImpl.checkTopicSyntax(topicFilter); + } else { + TypedEventBusImpl.checkTopicSyntax(topicFilter, true); + } + + EventSelector key = new EventSelector(topicFilter, null); + Entry<Integer, Integer> val = new SimpleImmutableEntry<>(minRequired, maxRequired); + long available; + lock.writeLock().lock(); + try { + available = historySize - historyConfiguration.entrySet().stream() + .filter(e -> !e.getKey().getTopicFilter().equals(topicFilter)) + .mapToLong(e -> e.getValue().getKey()).sum(); + if(available < minRequired) { + throw new IllegalStateException("Insufficient space available for " + minRequired + " events"); + } + + Entry<Integer, Integer> old = historyConfiguration.put(key, val); + if(key.isWildcard()) { + if(!val.equals(old)) { + + Consumer<String> action = (minRequired > 0 || maxRequired < historySize) ? + s -> updateRestrictedHistory(s, minRequired, maxRequired) : + topicsWithRestrictedHistories::remove; + for(Entry<String, TopicHistory> e : topicsWithRestrictedHistories.entrySet()) { + if(key.matchesTopic(e.getKey())) { + Entry<Integer, Integer> policy = getEffectiveHistoryStorage(e.getKey()); + if(!e.getValue().policyMatches(policy)) { + action.accept(e.getKey()); + } + } + } + } + } else if(minRequired > 0 || maxRequired < historySize){ + updateRestrictedHistory(topicFilter, minRequired, maxRequired); + } else { + topicsWithRestrictedHistories.remove(topicFilter); + } + } finally { + lock.writeLock().unlock(); + } + return (int) Math.min(maxRequired, available); + } + + private void updateRestrictedHistory(String topicFilter, int minRequired, int maxRequired) { + TopicHistory newHistory = new TopicHistory(minRequired, maxRequired); + TopicHistory oldHistory = topicsWithRestrictedHistories.put(topicFilter, newHistory); + if(oldHistory != null) { + List<MonitorEvent> toRemove = newHistory.copyFrom(oldHistory); + historicEvents.removeAll(toRemove); + } + } + + @Override + public void removeHistoryStorage(String topicFilter) { + EventSelector selector = new EventSelector(topicFilter, null); + lock.readLock().lock(); + try { + historyConfiguration.remove(selector); + } finally { + lock.readLock().unlock(); + } + } } diff --git a/org.apache.aries.typedevent.bus/src/main/java/org/osgi/service/typedevent/monitor/TypedEventMonitor.java b/org.apache.aries.typedevent.bus/src/main/java/org/osgi/service/typedevent/monitor/TypedEventMonitor.java index 37a0372..5331125 100644 --- a/org.apache.aries.typedevent.bus/src/main/java/org/osgi/service/typedevent/monitor/TypedEventMonitor.java +++ b/org.apache.aries.typedevent.bus/src/main/java/org/osgi/service/typedevent/monitor/TypedEventMonitor.java @@ -18,8 +18,11 @@ package org.osgi.service.typedevent.monitor; import java.time.Instant; +import java.util.Map; +import java.util.Map.Entry; import org.osgi.annotation.versioning.ProviderType; +import org.osgi.util.function.Predicate; import org.osgi.util.pushstream.PushStream; /** @@ -94,4 +97,157 @@ public interface TypedEventMonitor { PushStream<MonitorEvent> monitorEvents(Instant history, boolean historyOnly); + /** + * Get a Predicate which will match the supplied topic filter against a + * topic name. + * + * @param topicFilter The topic filter to match against + * @return A predicate that will return true if the topic name being tested + * matches the supplied topic filter. + * @since 1.1 + * @throws NullPointerException if the topic filter is <code>null</code> + * @throws IllegalArgumentException if the topic filter contains invalid + * syntax + */ + Predicate<String> topicFilterMatches(String topicFilter); + + /** + * Test the supplied topic filter against the supplied topic name. + * + * @param topicName The topic name to match against + * @param topicFilter The topic filter to match against + * @return A predicate that will return true if the topic name being tested + * matches the supplied topic filter. + * @since 1.1 + * @throws NullPointerException if the topic filter is <code>null</code> + * @throws IllegalArgumentException if the topic filter or topic name + * contain invalid syntax + */ + boolean topicFilterMatches(String topicName, String topicFilter); + + /** + * Get an estimate of the maximum number of historic events that can be + * stored by the TypedEvent implementation. If there is no fixed limit then + * -1 is returned. If no history storage is supported then zero is returned. + * + * @return The maximum number of historic events that can be stored. + * @since 1.1 + */ + long getMaximumEventStorage(); + + /** + * Get the configured history storage for the Typed Events implementation. + * <p> + * The returned {@link Map} uses topic filter strings as keys. These filter + * strings may contain wildcards. If multiple filter strings are able to + * match then the most specific match applies with the following ordering: + * <ol> + * <li>An exact topic match</li> + * <li>An exact match of the parent topic and a single level wildcard as the + * final token</li> + * <li>An exact match of the parent topic and multi-level wildcard as the + * final token</li> + * </ol> + * <p> + * This ordering is applied recursively starting with the first topic token + * until only one candidate remains. The keys in the returned map are + * ordered such that the first encountered key which matches a given topic + * name is the configuration that will apply to that topic name. + * <p> + * The value associated with each key is an {@link Entry} where the key is + * the minimum required number of stored events for the topic and the value + * is the maximum number of events that will be stored. + * + * @return The configured history storage + * @since 1.1 + */ + Map<String,Entry<Integer,Integer>> getConfiguredHistoryStorage(); + + /** + * Get the configured history storage for a given topic filter. This method + * looks for an exact match in the history configuration. If no + * configuration is found for the supplied topic filter then + * <code>null</code> is returned. + * + * @param topicFilter the topic filter + * @return An {@link Entry} where the key is the minimum required number of + * stored events for the topic and the value is the maximum number + * of events that will be stored. If no configuration is set for the + * topic filter then <code>null</code> will be returned. + * @throws NullPointerException if the topic filter is <code>null</code> + * @throws IllegalArgumentException if the topic filter contains invalid + * syntax + * @since 1.1 + */ + Entry<Integer,Integer> getConfiguredHistoryStorage(String topicFilter); + + /** + * Get the history storage rule that applies to a given topic name. This + * method takes into account the necessary precedence rules to find the + * correct configuration for the named method, and so will never return + * <code>null</code>. + * + * @param topicName the topic name + * @return An {@link Entry} where the key is the minimum required number of + * stored events for the topic and the value is the maximum number + * of events that will be stored. If no configuration is set for the + * topic filter then an entry with key and value set to zero will be + * returned. + * @throws NullPointerException if the topic name is <code>null</code> + * @throws IllegalArgumentException if the topic name contains invalid + * syntax or wildcards + * @since 1.1 + */ + Entry<Integer,Integer> getEffectiveHistoryStorage(String topicName); + + /** + * Configure history storage for a given topic filter. + * <p> + * Minimum storage settings may only be set for exact matches. It is an + * error to use a filter containing wildcards with a non-zero minimum + * history requirement. + * <p> + * If a minimum storage requirement is set then the Typed Events + * implementation must guarantee sufficient storage space to hold those + * events. If, after accounting for all other pre-existing miniumum storage + * requirements, there is insufficent storage left for this new + * configuration then an {@link IllegalStateException} must be thrown. + * + * @param topicFilter the topic filter + * @param minRequired the minimum number of historical events to keep + * available for this filter + * @param maxRequired the maximum number of historical events to keep + * available for this filter + * @return An int indicating the number of events that can be kept for this + * topic given the current configuration. This will always be at + * least <code>minRequired</code> and at most + * <code>maxRequired</code>. + * @throws NullPointerException if the topic filter is <code>null</code> + * @throws IllegalArgumentException if: + * <ul> + * <li>The topic filter contains invalid syntax</li> + * <li><code>minRequired</code> or <code>maxRequired</code> are + * less than <code>0</code>. + * <li>The topic filter contains wildcard(s) and + * <code>minRequired</code> is not <code>0</code>.</li> + * <li><code>minRequired</code> is greater than + * <code>maxRequired</code></li> + * @throws IllegalStateException if there is insufficient available space to + * provide the additional <code>minRequired</code> stored + * events. + * @since 1.1 + */ + int configureHistoryStorage(String topicFilter, int minRequired, + int maxRequired); + + /** + * Delete history storage configuration for a given topic filter. + * + * @param topicFilter the topic filter + * @throws NullPointerException if the topic filter is <code>null</code> + * @throws IllegalArgumentException if the topic filter contains invalid + * syntax + * @since 1.1 + */ + void removeHistoryStorage(String topicFilter); } diff --git a/org.apache.aries.typedevent.bus/src/test/java/org/apache/aries/typedevent/bus/osgi/TypedEventMonitorIntegrationTest.java b/org.apache.aries.typedevent.bus/src/test/java/org/apache/aries/typedevent/bus/osgi/TypedEventMonitorIntegrationTest.java index 8632823..fed370a 100644 --- a/org.apache.aries.typedevent.bus/src/test/java/org/apache/aries/typedevent/bus/osgi/TypedEventMonitorIntegrationTest.java +++ b/org.apache.aries.typedevent.bus/src/test/java/org/apache/aries/typedevent/bus/osgi/TypedEventMonitorIntegrationTest.java @@ -16,6 +16,7 @@ */ package org.apache.aries.typedevent.bus.osgi; +import static java.util.stream.Collectors.toList; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertTrue; @@ -26,9 +27,12 @@ import java.util.Arrays; import java.util.Dictionary; import java.util.Hashtable; import java.util.List; +import java.util.Map.Entry; import java.util.stream.Collectors; import org.apache.aries.typedevent.bus.common.TestEvent; +import org.apache.aries.typedevent.bus.common.TestEvent2; +import org.apache.aries.typedevent.bus.common.TestEvent2.EventType; import org.apache.aries.typedevent.bus.common.TestEventConsumer; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeAll; @@ -45,6 +49,10 @@ import org.osgi.service.typedevent.monitor.MonitorEvent; import org.osgi.service.typedevent.monitor.TypedEventMonitor; import org.osgi.test.common.annotation.InjectBundleContext; import org.osgi.test.common.annotation.InjectService; +import org.osgi.test.common.annotation.Property; +import org.osgi.test.common.annotation.Property.Scalar; +import org.osgi.test.common.annotation.config.WithConfiguration; +import org.osgi.test.junit5.cm.ConfigurationExtension; import org.osgi.test.junit5.context.BundleContextExtension; import org.osgi.test.junit5.service.ServiceExtension; import org.osgi.util.promise.Promise; @@ -53,6 +61,7 @@ import org.osgi.util.promise.Promise; @ExtendWith(BundleContextExtension.class) @ExtendWith(ServiceExtension.class) @ExtendWith(MockitoExtension.class) +@ExtendWith(ConfigurationExtension.class) public class TypedEventMonitorIntegrationTest extends AbstractIntegrationTest { @Mock @@ -424,4 +433,160 @@ public class TypedEventMonitorIntegrationTest extends AbstractIntegrationTest { assertTrue(events.isEmpty()); } + + @WithConfiguration(pid = "org.apache.aries.typedevent.bus", properties = @Property(key = "event.history.enable.at.start", value = "false", scalar = Scalar.Boolean)) + @Test + public void testTopicConfig(@InjectService TypedEventMonitor monitor, + @InjectService TypedEventBus eventBus) throws Exception { + + Entry<Integer, Integer> historyStorage = monitor.getEffectiveHistoryStorage(TEST_EVENT_TOPIC); + assertEquals(0, historyStorage.getKey()); + assertEquals(0, historyStorage.getValue()); + + TestEvent event = new TestEvent(); + event.message = "boo"; + + eventBus.deliver(event); + + event = new TestEvent(); + event.message = "bam"; + + eventBus.deliver(event); + + Thread.sleep(500); + + Promise<List<MonitorEvent>> eventsPromise = monitor.monitorEvents(5, true) + .collect(Collectors.toList()) + .timeout(2000); + + List<MonitorEvent> events = eventsPromise.getValue(); + assertTrue(events.isEmpty()); + + monitor.configureHistoryStorage(TEST_EVENT_TOPIC, 0, 1); + + event = new TestEvent(); + event.message = "boo"; + + eventBus.deliver(event); + + event = new TestEvent(); + event.message = "bam"; + + eventBus.deliver(event); + + Thread.sleep(500); + + + eventsPromise = monitor.monitorEvents(5, true) + .collect(Collectors.toList()) + .timeout(2000); + + events = eventsPromise.getValue(); + + assertEquals(1, events.size(), events.toString()); + + assertEquals(TEST_EVENT_TOPIC, events.get(0).topic); + + assertEquals("bam", events.get(0).eventData.get("message")); + } + + @WithConfiguration(pid = "org.apache.aries.typedevent.bus", properties = @Property(key = "event.history.enable.at.start", value = "false", scalar = Scalar.Boolean)) + @Test + public void testMinimumRetention(@InjectService TypedEventMonitor monitor, + @InjectService TypedEventBus eventBus) throws Exception { + + monitor.configureHistoryStorage(TEST_EVENT_TOPIC, 3, 5); + monitor.configureHistoryStorage("*", 0, Integer.MAX_VALUE); + + TestEvent event = new TestEvent(); + event.message = "boo"; + + eventBus.deliver(event); + + event = new TestEvent(); + event.message = "bam"; + + eventBus.deliver(event); + + event = new TestEvent(); + event.message = "boom"; + + eventBus.deliver(event); + + event = new TestEvent(); + event.message = "foo"; + + eventBus.deliver(event); + + event = new TestEvent(); + event.message = "bar"; + + eventBus.deliver(event); + + event = new TestEvent(); + event.message = "foobar"; + + eventBus.deliver(event); + + Thread.sleep(500); + + Promise<List<MonitorEvent>> eventsPromise = monitor.monitorEvents(6, true) + .collect(Collectors.toList()) + .timeout(2000); + + List<MonitorEvent> events = eventsPromise.getValue(); + assertEquals(5, events.size(), events.toString()); + + assertEquals(TEST_EVENT_TOPIC, events.get(0).topic); + assertEquals(TEST_EVENT_TOPIC, events.get(1).topic); + assertEquals(TEST_EVENT_TOPIC, events.get(2).topic); + assertEquals(TEST_EVENT_TOPIC, events.get(3).topic); + assertEquals(TEST_EVENT_TOPIC, events.get(4).topic); + + assertEquals("bam", events.get(0).eventData.get("message")); + assertEquals("boom", events.get(1).eventData.get("message")); + assertEquals("foo", events.get(2).eventData.get("message")); + assertEquals("bar", events.get(3).eventData.get("message")); + assertEquals("foobar", events.get(4).eventData.get("message")); + + long maximumEventStorage = monitor.getMaximumEventStorage(); + for(long i = 0; i < maximumEventStorage; i++) { + TestEvent2 event2 = new TestEvent2(); + event2.eventType = EventType.GREEN; + event2.subEvent = new TestEvent(); + event2.subEvent.message = "Hello " + i; + eventBus.deliver(event2); + } + + Thread.sleep(500); + + eventsPromise = monitor.monitorEvents((int) (maximumEventStorage + 100), true) + .collect(Collectors.toList()) + .timeout(2000); + events = eventsPromise.getValue(); + assertEquals(maximumEventStorage, events.size(), events.toString()); + + events = events.stream() + .filter(me -> me.topic.equals(TEST_EVENT_TOPIC)) + .collect(toList()); + assertEquals(3, events.size(), events.toString()); + assertEquals("foo", events.get(0).eventData.get("message")); + assertEquals("bar", events.get(1).eventData.get("message")); + assertEquals("foobar", events.get(2).eventData.get("message")); + + monitor.configureHistoryStorage(TEST_EVENT_TOPIC, 1, 2); + + eventsPromise = monitor.monitorEvents((int) maximumEventStorage + 100, true) + .collect(Collectors.toList()) + .timeout(2000); + events = eventsPromise.getValue(); + assertEquals(maximumEventStorage - 1, events.size(), events.toString()); + + events = events.stream() + .filter(me -> me.topic.equals(TEST_EVENT_TOPIC)) + .collect(toList()); + assertEquals(2, events.size(), events.toString()); + assertEquals("bar", events.get(0).eventData.get("message")); + assertEquals("foobar", events.get(1).eventData.get("message")); + } } diff --git a/org.apache.aries.typedevent.bus/test.bndrun b/org.apache.aries.typedevent.bus/test.bndrun index b8cbb94..86dda27 100644 --- a/org.apache.aries.typedevent.bus/test.bndrun +++ b/org.apache.aries.typedevent.bus/test.bndrun @@ -19,7 +19,7 @@ -runfw: org.apache.felix.framework --runee: JavaSE-11 +-runee: JavaSE-17 -runrequires: bnd.identity;id="org.apache.aries.typedevent.bus",\ bnd.identity;id="org.apache.aries.typedevent.bus-tests",\ @@ -48,6 +48,7 @@ org.mockito.mockito-core;version='[5.5.0,5.5.1)',\ org.osgi.test.common;version='[1.2.1,1.2.2)',\ org.osgi.test.junit5;version='[1.2.1,1.2.2)',\ + org.osgi.test.junit5.cm;version='[1.2.1,1.2.2)',\ ch.qos.logback.classic;version='[1.2.3,1.2.4)',\ ch.qos.logback.core;version='[1.2.3,1.2.4)',\ org.apache.aries.component-dsl.component-dsl;version='[1.2.2,1.2.3)',\ diff --git a/typedevent-test-bom/pom.xml b/typedevent-test-bom/pom.xml index 808c481..76285ae 100644 --- a/typedevent-test-bom/pom.xml +++ b/typedevent-test-bom/pom.xml @@ -45,6 +45,12 @@ <version>1.2.1</version> <scope>test</scope> </dependency> + <dependency> + <groupId>org.osgi</groupId> + <artifactId>org.osgi.test.junit5.cm</artifactId> + <version>1.2.1</version> + <scope>test</scope> + </dependency> <dependency> <groupId>org.apache.felix</groupId> <artifactId>org.apache.felix.framework</artifactId>
