This is an automated email from the ASF dual-hosted git repository.
timothyjward pushed a commit to branch feature/v1.1
in repository https://gitbox.apache.org/repos/asf/aries-typedevent.git
The following commit(s) were added to refs/heads/feature/v1.1 by this push:
new ae53309 Update with latest specification draft changes
ae53309 is described below
commit ae53309dbed66d39895b51a3984a865b2a668c0c
Author: Tim Ward <[email protected]>
AuthorDate: Thu Mar 14 12:16:58 2024 +0000
Update with latest specification draft changes
Signed-off-by: Tim Ward <[email protected]>
---
.../aries/typedevent/bus/impl/TopicHistory.java | 6 +-
.../typedevent/bus/impl/TypedEventMonitorImpl.java | 70 +++++-----
.../service/typedevent/monitor/RangePolicy.java | 147 +++++++++++++++++++++
.../typedevent/monitor/TypedEventMonitor.java | 55 ++++----
.../aries/typedevent/bus/impl/RecordConverter.java | 2 +-
.../bus/osgi/TypedEventMonitorIntegrationTest.java | 16 +--
6 files changed, 213 insertions(+), 83 deletions(-)
diff --git
a/org.apache.aries.typedevent.bus/src/main/java/org/apache/aries/typedevent/bus/impl/TopicHistory.java
b/org.apache.aries.typedevent.bus/src/main/java/org/apache/aries/typedevent/bus/impl/TopicHistory.java
index 4dbceff..da7d7bf 100644
---
a/org.apache.aries.typedevent.bus/src/main/java/org/apache/aries/typedevent/bus/impl/TopicHistory.java
+++
b/org.apache.aries.typedevent.bus/src/main/java/org/apache/aries/typedevent/bus/impl/TopicHistory.java
@@ -21,9 +21,9 @@ import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Deque;
import java.util.List;
-import java.util.Map.Entry;
import org.osgi.service.typedevent.monitor.MonitorEvent;
+import org.osgi.service.typedevent.monitor.RangePolicy;
public class TopicHistory {
private final int minRequired;
@@ -55,8 +55,8 @@ public class TopicHistory {
}
}
- public boolean policyMatches(Entry<Integer, Integer> policy) {
- return policy.getKey().intValue() == minRequired &&
policy.getValue().intValue() == maxRequired;
+ public boolean policyMatches(RangePolicy policy) {
+ return policy.getMinimum() == minRequired &&
policy.getMaximum() == maxRequired;
}
public List<MonitorEvent> copyFrom(TopicHistory oldHistory) {
diff --git
a/org.apache.aries.typedevent.bus/src/main/java/org/apache/aries/typedevent/bus/impl/TypedEventMonitorImpl.java
b/org.apache.aries.typedevent.bus/src/main/java/org/apache/aries/typedevent/bus/impl/TypedEventMonitorImpl.java
index 4909206..fe82347 100644
---
a/org.apache.aries.typedevent.bus/src/main/java/org/apache/aries/typedevent/bus/impl/TypedEventMonitorImpl.java
+++
b/org.apache.aries.typedevent.bus/src/main/java/org/apache/aries/typedevent/bus/impl/TypedEventMonitorImpl.java
@@ -18,7 +18,6 @@
package org.apache.aries.typedevent.bus.impl;
import java.time.Instant;
-import java.util.AbstractMap.SimpleImmutableEntry;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
@@ -42,6 +41,7 @@ import java.util.stream.Stream;
import org.osgi.annotation.bundle.Capability;
import org.osgi.namespace.service.ServiceNamespace;
import org.osgi.service.typedevent.monitor.MonitorEvent;
+import org.osgi.service.typedevent.monitor.RangePolicy;
import org.osgi.service.typedevent.monitor.TypedEventMonitor;
import org.osgi.util.function.Predicate;
import org.osgi.util.pushstream.PushEvent;
@@ -56,8 +56,6 @@ import org.osgi.util.pushstream.SimplePushEventSource;
@Capability(namespace = ServiceNamespace.SERVICE_NAMESPACE, attribute =
"objectClass:List<String>=org.osgi.service.typedevent.monitor.TypedEventMonitor",
uses = TypedEventMonitor.class)
public class TypedEventMonitorImpl implements TypedEventMonitor {
- private static final Entry<Integer, Integer> EMPTY = new
SimpleImmutableEntry<>(0,0);
-
private final LinkedList<MonitorEvent> historicEvents = new
LinkedList<MonitorEvent>();
private final ExecutorService monitoringWorker;
@@ -70,7 +68,7 @@ public class TypedEventMonitorImpl implements
TypedEventMonitor {
private final int historySize = 1024;
- private final SortedMap<EventSelector, Entry<Integer, Integer>>
historyConfiguration = new TreeMap<>();
+ private final SortedMap<EventSelector, RangePolicy> historyConfiguration =
new TreeMap<>();
private final Map<String, TopicHistory> topicsWithRestrictedHistories =
new HashMap<>();
@@ -78,7 +76,7 @@ public class TypedEventMonitorImpl implements
TypedEventMonitor {
Object object = props.get("event.history.enable.at.start");
if(object == null || "true".equals(object.toString())) {
- historyConfiguration.put(new EventSelector("*", null), new
SimpleImmutableEntry<>(0, Integer.MAX_VALUE));
+ historyConfiguration.put(new EventSelector("*", null),
RangePolicy.unlimited());
}
monitoringWorker = Executors.newCachedThreadPool();
@@ -98,16 +96,16 @@ public class TypedEventMonitorImpl implements
TypedEventMonitor {
MonitorEvent me = null;
lock.writeLock().lockInterruptibly();
try {
- Entry<Integer, Integer> policy =
doGetEffectiveHistoryStorage(topic);
+ RangePolicy policy = doGetEffectiveHistoryStorage(topic);
- if(policy.getValue() > 0) {
+ if(policy.getMaximum() > 0) {
me = getMonitorEvent(topic, eventData);
historicEvents.addFirst(me);
- if(policy.getValue() < historySize) {
+ if(policy.getMaximum() < historySize) {
TopicHistory th =
topicsWithRestrictedHistories.computeIfAbsent(topic,
- t -> new
TopicHistory(policy.getKey(), policy.getValue()));
+ t -> new
TopicHistory(policy.getMinimum(), policy.getMaximum()));
MonitorEvent old = th.addEvent(me);
if(old != null) {
historicEvents.remove(old);
@@ -265,16 +263,16 @@ public class TypedEventMonitorImpl implements
TypedEventMonitor {
}
@Override
- public long getMaximumEventStorage() {
+ public int getMaximumEventStorage() {
return historySize;
}
@Override
- public Map<String, Entry<Integer, Integer>>
getConfiguredHistoryStorage() {
- Map<String, Entry<Integer, Integer>> copy = new
LinkedHashMap<>();
+ public Map<String, RangePolicy> getConfiguredHistoryStorage() {
+ Map<String, RangePolicy> copy = new LinkedHashMap<>();
lock.readLock().lock();
try {
- for (Entry<EventSelector, Entry<Integer, Integer>> e :
historyConfiguration.entrySet()) {
+ for (Entry<EventSelector, RangePolicy> e :
historyConfiguration.entrySet()) {
copy.put(e.getKey().getTopicFilter(),
e.getValue());
}
} finally {
@@ -284,7 +282,7 @@ public class TypedEventMonitorImpl implements
TypedEventMonitor {
}
@Override
- public Entry<Integer, Integer> getConfiguredHistoryStorage(String
topicFilter) {
+ public RangePolicy getConfiguredHistoryStorage(String topicFilter) {
TypedEventBusImpl.checkTopicSyntax(topicFilter, true);
EventSelector selector = new EventSelector(topicFilter, null);
lock.readLock().lock();
@@ -296,7 +294,7 @@ public class TypedEventMonitorImpl implements
TypedEventMonitor {
}
@Override
- public Entry<Integer, Integer> getEffectiveHistoryStorage(String
topicName) {
+ public RangePolicy getEffectiveHistoryStorage(String topicName) {
TypedEventBusImpl.checkTopicSyntax(topicName);
lock.readLock().lock();
try {
@@ -306,67 +304,61 @@ public class TypedEventMonitorImpl implements
TypedEventMonitor {
}
}
- private Entry<Integer, Integer> doGetEffectiveHistoryStorage(String
topicName) {
+ private RangePolicy doGetEffectiveHistoryStorage(String topicName) {
return historyConfiguration.entrySet().stream()
.filter(e -> e.getKey().matchesTopic(topicName))
.map(Entry::getValue)
.findFirst()
- .orElse(EMPTY);
+ .orElse(RangePolicy.none());
}
@Override
- public int configureHistoryStorage(String topicFilter, int minRequired,
int maxRequired) {
+ public int configureHistoryStorage(String topicFilter, RangePolicy
policy) {
- if(minRequired < 0 || maxRequired < 0) {
- throw new IllegalArgumentException("The minimum and
maxium stored events must be greater than zero");
- }
- if(minRequired > maxRequired) {
- throw new IllegalArgumentException("The minimum stored
events must not be greater than the maximum stored events");
- }
-
- if(minRequired > 0) {
+ if(policy.getMinimum() > 0) {
TypedEventBusImpl.checkTopicSyntax(topicFilter);
} else {
TypedEventBusImpl.checkTopicSyntax(topicFilter, true);
}
EventSelector key = new EventSelector(topicFilter, null);
- Entry<Integer, Integer> val = new
SimpleImmutableEntry<>(minRequired, maxRequired);
+ int min = policy.getMinimum();
+ int max = policy.getMaximum();
long available;
lock.writeLock().lock();
try {
available = historySize -
historyConfiguration.entrySet().stream()
.filter(e ->
!e.getKey().getTopicFilter().equals(topicFilter))
- .mapToLong(e ->
e.getValue().getKey()).sum();
- if(available < minRequired) {
- throw new IllegalStateException("Insufficient
space available for " + minRequired + " events");
+ .mapToInt(e ->
e.getValue().getMinimum()).sum();
+ if(available < min) {
+ throw new IllegalStateException("Insufficient
space available for " + min + " events");
}
- Entry<Integer, Integer> old =
historyConfiguration.put(key, val);
+ RangePolicy old = historyConfiguration.put(key, policy);
if(key.isWildcard()) {
- if(!val.equals(old)) {
+ if(old == null || old.getMinimum() != min ||
old.getMaximum() != max) {
- Consumer<String> action = (minRequired
> 0 || maxRequired < historySize) ?
- s ->
updateRestrictedHistory(s, minRequired, maxRequired) :
+ Consumer<String> action = (min > 0 ||
max < historySize) ?
+ s ->
updateRestrictedHistory(s, min, max) :
topicsWithRestrictedHistories::remove;
for(Entry<String, TopicHistory> e :
topicsWithRestrictedHistories.entrySet()) {
if(key.matchesTopic(e.getKey())) {
- Entry<Integer, Integer>
policy = getEffectiveHistoryStorage(e.getKey());
-
if(!e.getValue().policyMatches(policy)) {
+ RangePolicy
effectivePolicy = getEffectiveHistoryStorage(e.getKey());
+
if(!e.getValue().policyMatches(effectivePolicy)) {
action.accept(e.getKey());
}
}
}
}
- } else if(minRequired > 0 || maxRequired < historySize){
- updateRestrictedHistory(topicFilter,
minRequired, maxRequired);
+ } else if(min > 0 || max < historySize){
+ updateRestrictedHistory(topicFilter, min, max);
} else {
topicsWithRestrictedHistories.remove(topicFilter);
}
} finally {
lock.writeLock().unlock();
}
- return (int) Math.min(maxRequired, available);
+ return (int) Math.min(max, available);
}
private void updateRestrictedHistory(String topicFilter, int
minRequired, int maxRequired) {
diff --git
a/org.apache.aries.typedevent.bus/src/main/java/org/osgi/service/typedevent/monitor/RangePolicy.java
b/org.apache.aries.typedevent.bus/src/main/java/org/osgi/service/typedevent/monitor/RangePolicy.java
new file mode 100644
index 0000000..8727258
--- /dev/null
+++
b/org.apache.aries.typedevent.bus/src/main/java/org/osgi/service/typedevent/monitor/RangePolicy.java
@@ -0,0 +1,147 @@
+/*******************************************************************************
+ * Copyright (c) Contributors to the Eclipse Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * SPDX-License-Identifier: Apache-2.0
+
*******************************************************************************/
+package org.osgi.service.typedevent.monitor;
+
+import static java.lang.Integer.MAX_VALUE;
+
+import org.osgi.annotation.versioning.ProviderType;
+
+/**
+ * A range policy determining how much history should be stored. The minimum
+ * defines a hard limit indicating the minimum number of historical events that
+ * must be kept. The maximum defines an additional limit capping the maximum
+ * number of events that should be kept by the implementation. Events between
+ * the minimum and maximum storage limits may be discarded at any time by the
+ * implementation.
+ *
+ * @ThreadSafe
+ * @author $Id: 9540667fed7c58ae4f8f11390c5291fcebbca596 $
+ * @since 1.1
+ */
+@ProviderType
+public final class RangePolicy {
+
+ private static final RangePolicy NONE = new
RangePolicy(0, 0);
+ private static final RangePolicy UNLIMITED = new
RangePolicy(0,
+ MAX_VALUE);
+
+ private final int min;
+ private final int max;
+
+ private RangePolicy(int min, int max) {
+ if (min < 0 || max < 0 || min > max) {
+ throw new IllegalArgumentException(String.format(
+ "The minimum %d and maximum %d must
both be greater than zero, and the minimum less than or equal to the maximum",
+ Integer.valueOf(min),
Integer.valueOf(max)));
+ }
+ this.min = min;
+ this.max = max;
+ }
+
+ /**
+ * Get the minimum storage requirement
+ * <p>
+ * This defines the minimum number of events that must be stored by the
+ * implementation. If at least <code>min</code> events have been sent
to a
+ * topic using this policy then there must be at least <code>min</code>
+ * events returned by the history.
+ *
+ * @return the minimum number of events that must be retained
+ */
+ public int getMinimum() {
+ return min;
+ }
+
+ /**
+ * Get the maximum storage requirement
+ * <p>
+ * This defines the maximum number of events that should be stored by
the
+ * implementation. The implementation must return at most
<code>max</code>
+ * events for a topic using this policy, and may return fewer subject
to the
+ * minimum constraint defined by {@link #getMinimum()}.
+ *
+ * @return the maximum number of events that should be retained
+ */
+ public int getMaximum() {
+ return max;
+ }
+
+ /**
+ * Create a range policy for the defined range
+ *
+ * @param min the minimum number of events that must be kept in history
+ * @param max the maximum number of events that may be kept in history
+ * @return A configured {@link RangePolicy}
+ */
+ public static RangePolicy range(int min, int max) {
+ return new RangePolicy(min, max);
+ }
+
+ /**
+ * Create a range policy for an exact range. Equivalent to
+ * <code>RangePolicy.range(count, count)</code>
+ *
+ * @param count the number of events that must be kept in history
+ * @return A configured {@link RangePolicy}
+ */
+ public static RangePolicy exact(int count) {
+ return new RangePolicy(count, count);
+ }
+
+ /**
+ * Create a range policy for a minimum history requirement. Equivalent
to
+ * <code>RangePolicy.range(min, Integer.MAX_VALUE)</code>
+ *
+ * @param min the minimum number of events that must be kept in history
+ * @return A configured {@link RangePolicy}
+ */
+ public static RangePolicy atLeast(int min) {
+ return new RangePolicy(min, MAX_VALUE);
+ }
+
+ /**
+ * Create a range policy for a maximum history requirement. Equivalent
to
+ * <code>RangePolicy.range(0, max)</code>
+ *
+ * @param max the maximum number of events that may be kept in history
+ * @return A configured {@link RangePolicy}
+ */
+ public static RangePolicy atMost(int max) {
+ return new RangePolicy(0, max);
+ }
+
+ /**
+ * Get a range policy which stores no events. Equivalent to
+ * <code>RangePolicy.range(0, 0)</code>
+ *
+ * @return A configured {@link RangePolicy}
+ */
+ public static RangePolicy none() {
+ return NONE;
+ }
+
+ /**
+ * Get a range policy which may store unlimited events. Equivalent to
+ * <code>RangePolicy.range(0, Integer.MAX_VALUE)</code>
+ *
+ * @return A configured {@link RangePolicy}
+ */
+ public static RangePolicy unlimited() {
+ return UNLIMITED;
+ }
+}
diff --git
a/org.apache.aries.typedevent.bus/src/main/java/org/osgi/service/typedevent/monitor/TypedEventMonitor.java
b/org.apache.aries.typedevent.bus/src/main/java/org/osgi/service/typedevent/monitor/TypedEventMonitor.java
index 5331125..9c9c8a9 100644
---
a/org.apache.aries.typedevent.bus/src/main/java/org/osgi/service/typedevent/monitor/TypedEventMonitor.java
+++
b/org.apache.aries.typedevent.bus/src/main/java/org/osgi/service/typedevent/monitor/TypedEventMonitor.java
@@ -19,7 +19,6 @@ package org.osgi.service.typedevent.monitor;
import java.time.Instant;
import java.util.Map;
-import java.util.Map.Entry;
import org.osgi.annotation.versioning.ProviderType;
import org.osgi.util.function.Predicate;
@@ -30,7 +29,7 @@ import org.osgi.util.pushstream.PushStream;
* using the EventBus, and that are received from remote EventBus instances
*
* @ThreadSafe
- * @author $Id: 7f7a493225439de463d0ad550f4b6b206b54277c $
+ * @author $Id: 05a263f6544754849171431982a20dae9a2c2da4 $
*/
@ProviderType
public interface TypedEventMonitor {
@@ -133,7 +132,7 @@ public interface TypedEventMonitor {
* @return The maximum number of historic events that can be stored.
* @since 1.1
*/
- long getMaximumEventStorage();
+ int getMaximumEventStorage();
/**
* Get the configured history storage for the Typed Events
implementation.
@@ -154,14 +153,13 @@ public interface TypedEventMonitor {
* ordered such that the first encountered key which matches a given
topic
* name is the configuration that will apply to that topic name.
* <p>
- * The value associated with each key is an {@link Entry} where the key
is
- * the minimum required number of stored events for the topic and the
value
- * is the maximum number of events that will be stored.
+ * The value associated with each key is the {@link RangePolicy}
defining
+ * the number of events that will be stored.
*
* @return The configured history storage
* @since 1.1
*/
- Map<String,Entry<Integer,Integer>> getConfiguredHistoryStorage();
+ Map<String,RangePolicy> getConfiguredHistoryStorage();
/**
* Get the configured history storage for a given topic filter. This
method
@@ -170,16 +168,15 @@ public interface TypedEventMonitor {
* <code>null</code> is returned.
*
* @param topicFilter the topic filter
- * @return An {@link Entry} where the key is the minimum required
number of
- * stored events for the topic and the value is the maximum
number
- * of events that will be stored. If no configuration is set
for the
- * topic filter then <code>null</code> will be returned.
+ * @return The {@link RangePolicy} defining the number of stored events
for
+ * the topic. If no configuration is set for the topic filter
then
+ * <code>null</code> will be returned.
* @throws NullPointerException if the topic filter is <code>null</code>
* @throws IllegalArgumentException if the topic filter contains invalid
* syntax
* @since 1.1
*/
- Entry<Integer,Integer> getConfiguredHistoryStorage(String topicFilter);
+ RangePolicy getConfiguredHistoryStorage(String topicFilter);
/**
* Get the history storage rule that applies to a given topic name. This
@@ -188,17 +185,15 @@ public interface TypedEventMonitor {
* <code>null</code>.
*
* @param topicName the topic name
- * @return An {@link Entry} where the key is the minimum required
number of
- * stored events for the topic and the value is the maximum
number
- * of events that will be stored. If no configuration is set
for the
- * topic filter then an entry with key and value set to zero
will be
- * returned.
+ * @return The {@link RangePolicy} defining the number of stored events
for
+ * the topic. If no configuration is set for the topic filter
then
+ * an {@link RangePolicy#none()} will be returned
* @throws NullPointerException if the topic name is <code>null</code>
* @throws IllegalArgumentException if the topic name contains invalid
* syntax or wildcards
* @since 1.1
*/
- Entry<Integer,Integer> getEffectiveHistoryStorage(String topicName);
+ RangePolicy getEffectiveHistoryStorage(String topicName);
/**
* Configure history storage for a given topic filter.
@@ -214,31 +209,26 @@ public interface TypedEventMonitor {
* configuration then an {@link IllegalStateException} must be thrown.
*
* @param topicFilter the topic filter
- * @param minRequired the minimum number of historical events to keep
- * available for this filter
- * @param maxRequired the maximum number of historical events to keep
- * available for this filter
+ * @param policy the event retention policy to use
* @return An int indicating the number of events that can be kept for
this
* topic given the current configuration. This will always be at
- * least <code>minRequired</code> and at most
- * <code>maxRequired</code>.
+ * least <code>policy.getMin()</code> and at most
+ * <code>policy.getMax()</code>.
* @throws NullPointerException if the topic filter is <code>null</code>
* @throws IllegalArgumentException if:
* <ul>
* <li>The topic filter contains invalid syntax</li>
- * <li><code>minRequired</code> or <code>maxRequired</code>
are
- * less than <code>0</code>.
- * <li>The topic filter contains wildcard(s) and
- * <code>minRequired</code> is not <code>0</code>.</li>
- * <li><code>minRequired</code> is greater than
- * <code>maxRequired</code></li>
+ * <li>The topic filter contains wildcard(s) <em>and</em>
+ * <code>getMaximumEventStorage</code> is not
<code>-1</code>
+ * <em>and</em> <code>policy.getMin()</code> is not
+ * <code>0</code>.</li>
+ * </ul>
* @throws IllegalStateException if there is insufficient available
space to
* provide the additional <code>minRequired</code> stored
* events.
* @since 1.1
*/
- int configureHistoryStorage(String topicFilter, int minRequired,
- int maxRequired);
+ int configureHistoryStorage(String topicFilter, RangePolicy policy);
/**
* Delete history storage configuration for a given topic filter.
@@ -250,4 +240,5 @@ public interface TypedEventMonitor {
* @since 1.1
*/
void removeHistoryStorage(String topicFilter);
+
}
diff --git
a/org.apache.aries.typedevent.bus/src/main/java16/org/apache/aries/typedevent/bus/impl/RecordConverter.java
b/org.apache.aries.typedevent.bus/src/main/java16/org/apache/aries/typedevent/bus/impl/RecordConverter.java
index 1e32c3c..67fe266 100644
---
a/org.apache.aries.typedevent.bus/src/main/java16/org/apache/aries/typedevent/bus/impl/RecordConverter.java
+++
b/org.apache.aries.typedevent.bus/src/main/java16/org/apache/aries/typedevent/bus/impl/RecordConverter.java
@@ -30,7 +30,7 @@ import org.osgi.util.converter.ConverterFunction;
/**
* This class is responsible for converting Record events to and from their
- * "flattened" representations. This version runs on Java 17
+ * "flattened" representations. This version runs on Java 16 and above
*/
public class RecordConverter {
diff --git
a/org.apache.aries.typedevent.bus/src/test/java/org/apache/aries/typedevent/bus/osgi/TypedEventMonitorIntegrationTest.java
b/org.apache.aries.typedevent.bus/src/test/java/org/apache/aries/typedevent/bus/osgi/TypedEventMonitorIntegrationTest.java
index fed370a..0b6ac31 100644
---
a/org.apache.aries.typedevent.bus/src/test/java/org/apache/aries/typedevent/bus/osgi/TypedEventMonitorIntegrationTest.java
+++
b/org.apache.aries.typedevent.bus/src/test/java/org/apache/aries/typedevent/bus/osgi/TypedEventMonitorIntegrationTest.java
@@ -27,7 +27,6 @@ import java.util.Arrays;
import java.util.Dictionary;
import java.util.Hashtable;
import java.util.List;
-import java.util.Map.Entry;
import java.util.stream.Collectors;
import org.apache.aries.typedevent.bus.common.TestEvent;
@@ -46,6 +45,7 @@ import org.osgi.framework.BundleContext;
import org.osgi.service.typedevent.TypedEventBus;
import org.osgi.service.typedevent.TypedEventHandler;
import org.osgi.service.typedevent.monitor.MonitorEvent;
+import org.osgi.service.typedevent.monitor.RangePolicy;
import org.osgi.service.typedevent.monitor.TypedEventMonitor;
import org.osgi.test.common.annotation.InjectBundleContext;
import org.osgi.test.common.annotation.InjectService;
@@ -439,9 +439,9 @@ public class TypedEventMonitorIntegrationTest extends
AbstractIntegrationTest {
public void testTopicConfig(@InjectService TypedEventMonitor monitor,
@InjectService TypedEventBus eventBus) throws Exception {
- Entry<Integer, Integer> historyStorage =
monitor.getEffectiveHistoryStorage(TEST_EVENT_TOPIC);
- assertEquals(0, historyStorage.getKey());
- assertEquals(0, historyStorage.getValue());
+ RangePolicy historyStorage =
monitor.getEffectiveHistoryStorage(TEST_EVENT_TOPIC);
+ assertEquals(0, historyStorage.getMinimum());
+ assertEquals(0, historyStorage.getMaximum());
TestEvent event = new TestEvent();
event.message = "boo";
@@ -462,7 +462,7 @@ public class TypedEventMonitorIntegrationTest extends
AbstractIntegrationTest {
List<MonitorEvent> events = eventsPromise.getValue();
assertTrue(events.isEmpty());
- monitor.configureHistoryStorage(TEST_EVENT_TOPIC, 0, 1);
+ monitor.configureHistoryStorage(TEST_EVENT_TOPIC,
RangePolicy.atMost(1));
event = new TestEvent();
event.message = "boo";
@@ -495,8 +495,8 @@ public class TypedEventMonitorIntegrationTest extends
AbstractIntegrationTest {
public void testMinimumRetention(@InjectService TypedEventMonitor monitor,
@InjectService TypedEventBus eventBus) throws Exception {
- monitor.configureHistoryStorage(TEST_EVENT_TOPIC, 3, 5);
- monitor.configureHistoryStorage("*", 0, Integer.MAX_VALUE);
+ monitor.configureHistoryStorage(TEST_EVENT_TOPIC, RangePolicy.range(3,
5));
+ monitor.configureHistoryStorage("*", RangePolicy.unlimited());
TestEvent event = new TestEvent();
event.message = "boo";
@@ -574,7 +574,7 @@ public class TypedEventMonitorIntegrationTest extends
AbstractIntegrationTest {
assertEquals("bar", events.get(1).eventData.get("message"));
assertEquals("foobar", events.get(2).eventData.get("message"));
- monitor.configureHistoryStorage(TEST_EVENT_TOPIC, 1, 2);
+ monitor.configureHistoryStorage(TEST_EVENT_TOPIC, RangePolicy.range(1,
2));
eventsPromise = monitor.monitorEvents((int) maximumEventStorage + 100,
true)
.collect(Collectors.toList())