This is an automated email from the ASF dual-hosted git repository.
timothyjward pushed a commit to branch feature/v1.1
in repository https://gitbox.apache.org/repos/asf/aries-typedevent.git
The following commit(s) were added to refs/heads/feature/v1.1 by this push:
new 6f5754a Include support for history-only monitoring streams
6f5754a is described below
commit 6f5754a498857cf3f7bb0a34ccadd4792a89d673
Author: Tim Ward <[email protected]>
AuthorDate: Wed Sep 13 12:38:09 2023 +0100
Include support for history-only monitoring streams
Signed-off-by: Tim Ward <[email protected]>
---
.../typedevent/bus/impl/TypedEventMonitorImpl.java | 33 ++++--
.../org/osgi/service/typedevent/TypedEventBus.java | 3 +
.../service/typedevent/TypedEventPublisher.java | 2 +
.../typedevent/monitor/TypedEventMonitor.java | 35 +++++-
.../bus/osgi/TypedEventMonitorIntegrationTest.java | 118 +++++++++++++++++++++
5 files changed, 182 insertions(+), 9 deletions(-)
diff --git
a/org.apache.aries.typedevent.bus/src/main/java/org/apache/aries/typedevent/bus/impl/TypedEventMonitorImpl.java
b/org.apache.aries.typedevent.bus/src/main/java/org/apache/aries/typedevent/bus/impl/TypedEventMonitorImpl.java
index 1d177fe..0e2d0b7 100644
---
a/org.apache.aries.typedevent.bus/src/main/java/org/apache/aries/typedevent/bus/impl/TypedEventMonitorImpl.java
+++
b/org.apache.aries.typedevent.bus/src/main/java/org/apache/aries/typedevent/bus/impl/TypedEventMonitorImpl.java
@@ -100,7 +100,12 @@ public class TypedEventMonitorImpl implements
TypedEventMonitor {
@Override
public PushStream<MonitorEvent> monitorEvents(int history) {
- return psp.buildStream(eventSource(history))
+ return monitorEvents(history, false);
+ }
+
+ @Override
+ public PushStream<MonitorEvent> monitorEvents(int history, boolean
historyOnly) {
+ return psp.buildStream(eventSource(history, historyOnly))
.withBuffer(new ArrayBlockingQueue<>(Math.max(historySize,
history)))
.withPushbackPolicy(PushbackPolicyOption.FIXED,
0).withQueuePolicy(QueuePolicyOption.FAIL)
.withExecutor(monitoringWorker).build();
@@ -108,12 +113,17 @@ public class TypedEventMonitorImpl implements
TypedEventMonitor {
@Override
public PushStream<MonitorEvent> monitorEvents(Instant history) {
- return psp.buildStream(eventSource(history)).withBuffer(new
ArrayBlockingQueue<>(1024))
+ return monitorEvents(history, false);
+ }
+
+ @Override
+ public PushStream<MonitorEvent> monitorEvents(Instant history, boolean
historyOnly) {
+ return psp.buildStream(eventSource(history,
historyOnly)).withBuffer(new ArrayBlockingQueue<>(1024))
.withPushbackPolicy(PushbackPolicyOption.FIXED,
0).withQueuePolicy(QueuePolicyOption.FAIL)
.withExecutor(monitoringWorker).build();
}
- PushEventSource<MonitorEvent> eventSource(int events) {
+ PushEventSource<MonitorEvent> eventSource(int events, boolean historyOnly)
{
return pec -> {
List<MonitorEvent> list;
@@ -125,11 +135,11 @@ public class TypedEventMonitorImpl implements
TypedEventMonitor {
} finally {
lock.readLock().unlock();
}
- return pushBackwards(pec, list);
+ return pushBackwards(pec, list, historyOnly);
};
}
- PushEventSource<MonitorEvent> eventSource(Instant since) {
+ PushEventSource<MonitorEvent> eventSource(Instant since, boolean
historyOnly) {
return pec -> {
List<MonitorEvent> list = new ArrayList<>();
@@ -147,11 +157,11 @@ public class TypedEventMonitorImpl implements
TypedEventMonitor {
} finally {
lock.readLock().unlock();
}
- return pushBackwards(pec, list);
+ return pushBackwards(pec, list, historyOnly);
};
}
- private AutoCloseable pushBackwards(PushEventConsumer<? super
MonitorEvent> pec, List<MonitorEvent> list)
+ private AutoCloseable pushBackwards(PushEventConsumer<? super
MonitorEvent> pec, List<MonitorEvent> list, boolean historyOnly)
throws Exception {
ListIterator<MonitorEvent> li = list.listIterator(list.size());
while (li.hasPrevious()) {
@@ -165,7 +175,14 @@ public class TypedEventMonitorImpl implements
TypedEventMonitor {
};
}
}
- return source.open(pec);
+ if(historyOnly) {
+ try {
+ pec.accept(PushEvent.close());
+ } catch (Exception e) {}
+ return () -> {};
+ } else {
+ return source.open(pec);
+ }
}
<T> T copyOfHistory(Function<Stream<MonitorEvent>, T> events) {
diff --git
a/org.apache.aries.typedevent.bus/src/main/java/org/osgi/service/typedevent/TypedEventBus.java
b/org.apache.aries.typedevent.bus/src/main/java/org/osgi/service/typedevent/TypedEventBus.java
index a73d508..ec89cbe 100644
---
a/org.apache.aries.typedevent.bus/src/main/java/org/osgi/service/typedevent/TypedEventBus.java
+++
b/org.apache.aries.typedevent.bus/src/main/java/org/osgi/service/typedevent/TypedEventBus.java
@@ -84,6 +84,7 @@ public interface TypedEventBus {
* {@link TypedEventPublisher}
* @return A {@link TypedEventPublisher} that will publish events to the
* topic name automatically generated from {@code eventType}
+ * @since 1.1
*/
<T> TypedEventPublisher<T> createPublisher(Class<T> eventType);
@@ -98,6 +99,7 @@ public interface TypedEventBus {
* {@link TypedEventPublisher}
* @return A {@link TypedEventPublisher} that will publish events to the
* supplied topic
+ * @since 1.1
*/
<T> TypedEventPublisher<T> createPublisher(String topic,
Class<T> eventType);
@@ -108,6 +110,7 @@ public interface TypedEventBus {
* @param topic The topic to publish events to
* @return A {@link TypedEventPublisher} that will publish events to the
* supplied topic
+ * @since 1.1
*/
TypedEventPublisher<Object> createPublisher(String topic);
diff --git
a/org.apache.aries.typedevent.bus/src/main/java/org/osgi/service/typedevent/TypedEventPublisher.java
b/org.apache.aries.typedevent.bus/src/main/java/org/osgi/service/typedevent/TypedEventPublisher.java
index f7c7b90..fc306e3 100644
---
a/org.apache.aries.typedevent.bus/src/main/java/org/osgi/service/typedevent/TypedEventPublisher.java
+++
b/org.apache.aries.typedevent.bus/src/main/java/org/osgi/service/typedevent/TypedEventPublisher.java
@@ -35,7 +35,9 @@ import org.osgi.annotation.versioning.ProviderType;
* @ThreadSafe
* @author $Id: cace0c5ed2b2e0d6abdf96f4a30e24b9c1610eed $
* @param <T> The type of events that may be sent using this publisher
+ * @since 1.1
*/
+
@ProviderType
public interface TypedEventPublisher<T> extends AutoCloseable {
/**
diff --git
a/org.apache.aries.typedevent.bus/src/main/java/org/osgi/service/typedevent/monitor/TypedEventMonitor.java
b/org.apache.aries.typedevent.bus/src/main/java/org/osgi/service/typedevent/monitor/TypedEventMonitor.java
index a366b24..37a0372 100644
---
a/org.apache.aries.typedevent.bus/src/main/java/org/osgi/service/typedevent/monitor/TypedEventMonitor.java
+++
b/org.apache.aries.typedevent.bus/src/main/java/org/osgi/service/typedevent/monitor/TypedEventMonitor.java
@@ -42,6 +42,8 @@ public interface TypedEventMonitor {
/**
* Get a stream of events, including up to the requested number of
* historical data events.
+ * <p>
+ * Logically equivalent to <code>monitorEvents(history, false)</code>.
*
* @param history The requested number of historical events, note that
fewer
* than this number of events may be returned if history is
@@ -50,9 +52,25 @@ public interface TypedEventMonitor {
*/
PushStream<MonitorEvent> monitorEvents(int history);
+ /**
+ * Get a stream of events, including up to the requested number of
+ * historical data events.
+ *
+ * @param history The requested number of historical events, note that
fewer
+ * than this number of events may be returned if history is
+ * unavailable, or if insufficient events have been sent.
+ * @param historyOnly If <code>true</code> then the returned stream
will be
+ * closed as soon as the available history has been delivered
+ * @return A stream of event data
+ * @since 1.1
+ */
+ PushStream<MonitorEvent> monitorEvents(int history, boolean
historyOnly);
+
/**
* Get a stream of events, including historical data events prior to the
- * supplied time
+ * supplied time.
+ * <p>
+ * Logically equivalent to <code>monitorEvents(history, false)</code>.
*
* @param history The requested time after which historical events,
should
* be included. Note that events may have been discarded, or
@@ -61,4 +79,19 @@ public interface TypedEventMonitor {
*/
PushStream<MonitorEvent> monitorEvents(Instant history);
+ /**
+ * Get a stream of events, including historical data events prior to the
+ * supplied time.
+ *
+ * @param history The requested time after which historical events,
should
+ * be included. Note that events may have been discarded, or
+ * history unavailable.
+ * @param historyOnly If <code>true</code> then the returned stream
will be
+ * closed as soon as the available history has been delivered
+ * @return A stream of event data
+ * @since 1.1
+ */
+ PushStream<MonitorEvent> monitorEvents(Instant history,
+ boolean historyOnly);
+
}
diff --git
a/org.apache.aries.typedevent.bus/src/test/java/org/apache/aries/typedevent/bus/osgi/TypedEventMonitorIntegrationTest.java
b/org.apache.aries.typedevent.bus/src/test/java/org/apache/aries/typedevent/bus/osgi/TypedEventMonitorIntegrationTest.java
index ee630df..8632823 100644
---
a/org.apache.aries.typedevent.bus/src/test/java/org/apache/aries/typedevent/bus/osgi/TypedEventMonitorIntegrationTest.java
+++
b/org.apache.aries.typedevent.bus/src/test/java/org/apache/aries/typedevent/bus/osgi/TypedEventMonitorIntegrationTest.java
@@ -226,6 +226,58 @@ public class TypedEventMonitorIntegrationTest extends
AbstractIntegrationTest {
assertEquals("bam", events.get(0).eventData.get("message"));
+ }
+
+ /**
+ * Tests that event history is delivered to the monitor and it
+ * closes the stream after
+ *
+ * @throws InterruptedException
+ * @throws InvocationTargetException
+ */
+ @Test
+ public void testTypedEventMonitorHistory1Close(@InjectService
TypedEventMonitor monitor,
+ @InjectService TypedEventBus eventBus) throws
InterruptedException, InvocationTargetException {
+
+ TestEvent event = new TestEvent();
+ event.message = "boo";
+
+ eventBus.deliver(event);
+
+ event = new TestEvent();
+ event.message = "bam";
+
+ eventBus.deliver(event);
+
+ Thread.sleep(500);
+
+ Promise<List<MonitorEvent>> eventsPromise = monitor.monitorEvents(5,
true)
+ .collect(Collectors.toList())
+ .timeout(2000);
+
+ List<MonitorEvent> events = eventsPromise.getValue();
+
+ assertEquals(2, events.size(), events.toString());
+
+ assertEquals(TEST_EVENT_TOPIC, events.get(0).topic);
+ assertEquals(TEST_EVENT_TOPIC, events.get(1).topic);
+
+ assertEquals("boo", events.get(0).eventData.get("message"));
+ assertEquals("bam", events.get(1).eventData.get("message"));
+
+ eventsPromise = monitor.monitorEvents(1, true)
+ .collect(Collectors.toList())
+ .timeout(2000);
+
+ events = eventsPromise.getValue();
+
+ assertEquals(1, events.size());
+
+ assertEquals(TEST_EVENT_TOPIC, events.get(0).topic);
+
+ assertEquals("bam", events.get(0).eventData.get("message"));
+
+
}
/**
@@ -306,4 +358,70 @@ public class TypedEventMonitorIntegrationTest extends
AbstractIntegrationTest {
assertTrue(events.isEmpty());
}
+ /**
+ * Tests that event history is delivered to the monitor and it closes after
+ *
+ * @throws InterruptedException
+ * @throws InvocationTargetException
+ */
+ @Test
+ public void testTypedEventMonitorHistory2Close(@InjectService
TypedEventMonitor monitor,
+ @InjectService TypedEventBus eventBus) throws
InterruptedException, InvocationTargetException {
+
+ Instant beforeFirst = Instant.now().minus(Duration.ofMillis(500));
+
+ TestEvent event = new TestEvent();
+ event.message = "boo";
+
+ eventBus.deliver(event);
+
+ Instant afterFirst = Instant.now().plus(Duration.ofMillis(500));
+
+ Thread.sleep(1000);
+
+ event = new TestEvent();
+ event.message = "bam";
+
+ eventBus.deliver(event);
+
+ Instant afterSecond = Instant.now().plus(Duration.ofMillis(500));
+
+ Thread.sleep(600);
+
+ // No stream time limit, this stream should auto-close
+ Promise<List<MonitorEvent>> eventsPromise =
monitor.monitorEvents(beforeFirst, true)
+ .collect(Collectors.toList())
+ .timeout(2000);
+
+ List<MonitorEvent> events = eventsPromise.getValue();
+
+ assertEquals(2, events.size());
+
+ assertEquals(TEST_EVENT_TOPIC, events.get(0).topic);
+ assertEquals(TEST_EVENT_TOPIC, events.get(1).topic);
+
+ assertEquals("boo", events.get(0).eventData.get("message"));
+ assertEquals("bam", events.get(1).eventData.get("message"));
+
+ eventsPromise = monitor.monitorEvents(afterFirst, true)
+ .collect(Collectors.toList())
+ .timeout(2000);
+
+ events = eventsPromise.getValue();
+
+ assertEquals(1, events.size());
+
+ assertEquals(TEST_EVENT_TOPIC, events.get(0).topic);
+
+ assertEquals("bam", events.get(0).eventData.get("message"));
+
+ eventsPromise = monitor.monitorEvents(afterSecond, true)
+ .collect(Collectors.toList())
+ .timeout(2000);
+
+ events = eventsPromise.getValue();
+
+ assertTrue(events.isEmpty());
+ }
+
}